Support multi client streams (#243)
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Wed, 6 Apr 2022 09:00:45 +0000 (11:00 +0200)
committerGitHub <noreply@github.com>
Wed, 6 Apr 2022 09:00:45 +0000 (11:00 +0200)
* handle brokenpipe error at shutdown

* catch json parse errors in aiohttp

* stability fixes

* remove group player workaround

* fix error when streaming radio

* support for multi client queue streams

music_assistant/controllers/stream.py
music_assistant/helpers/audio.py
music_assistant/helpers/process.py
music_assistant/models/media_items.py
music_assistant/models/player.py
music_assistant/models/player_queue.py

index cfe62cd5d6693b9c945dd3044b48f82da2961e1b..71428644465a3cb767f53ca789a5a555ab754988 100644 (file)
@@ -3,7 +3,6 @@ from __future__ import annotations
 
 import asyncio
 from asyncio import Task
-from dataclasses import dataclass
 from time import time
 from typing import AsyncGenerator, Awaitable, Callable, Dict, List
 from uuid import uuid4
@@ -21,21 +20,12 @@ from music_assistant.helpers.audio import (
 )
 from music_assistant.helpers.process import AsyncProcess
 from music_assistant.helpers.typing import MusicAssistant
-from music_assistant.helpers.util import get_ip
+from music_assistant.helpers.util import create_task, get_ip
 from music_assistant.models.errors import MediaNotFoundError
 from music_assistant.models.media_items import ContentType
 from music_assistant.models.player_queue import PlayerQueue
 
 
-@dataclass(frozen=True)
-class PCMArgs:
-    """Specify raw pcm audio."""
-
-    sample_rate: int
-    bit_depth: int
-    channels: int
-
-
 class StreamController:
     """Controller to stream audio to players."""
 
@@ -47,19 +37,23 @@ class StreamController:
         self._ip: str = get_ip()
         self._subscribers: Dict[str, Dict[str, List[Callable]]] = {}
         self._stream_tasks: Dict[str, Task] = {}
-        self._pcmargs: Dict[str, PCMArgs] = {}
 
     def get_stream_url(self, queue_id: str) -> str:
         """Return the full stream url for the PlayerQueue Stream."""
+        queue = self.mass.players.get_player_queue(queue_id)
+        if queue.player.use_multi_stream:
+            return f"http://{self._ip}:{self._port}/multi/{queue_id}.wav"
         return f"http://{self._ip}:{self._port}/{queue_id}.flac"
 
     async def setup(self) -> None:
         """Async initialize of module."""
         app = web.Application()
 
-        app.router.add_get("/{queue_id}.wav", self.serve_stream_client_pcm)
-        app.router.add_get("/{queue_id}.{format}", self.serve_stream_client)
-        app.router.add_get("/{queue_id}", self.serve_stream_client)
+        app.router.add_get(
+            "/multi/{queue_id}.wav", self.serve_multi_client_queue_stream
+        )
+        app.router.add_get("/{queue_id}.{format}", self.serve_queue_stream)
+        app.router.add_get("/{queue_id}", self.serve_queue_stream)
 
         runner = web.AppRunner(app, access_log=None)
         await runner.setup()
@@ -99,13 +93,13 @@ class StreamController:
 
         self.logger.info("Started stream server on port %s", self._port)
 
-    async def serve_stream_client(self, request: web.Request):
-        """Serve queue audio stream to client (encoded to FLAC or MP3)."""
+    async def serve_queue_stream(self, request: web.Request):
+        """Serve queue audio stream to a single player (encoded to fileformat of choice)."""
         queue_id = request.match_info["queue_id"]
-        clientid = f'{request.remote}_{request.query.get("playerid", str(uuid4()))}'
         fmt = request.match_info.get("format", "flac")
+        queue = self.mass.players.get_player_queue(queue_id)
 
-        if self.mass.players.get_player_queue(queue_id) is None:
+        if queue is None:
             return web.Response(status=404)
 
         # prepare request
@@ -114,65 +108,63 @@ class StreamController:
         )
         await resp.prepare(request)
 
-        pcmargs = await self._get_queue_stream_pcm_args(queue_id)
+        start_streamdetails = await queue.queue_stream_prepare()
         output_fmt = ContentType(fmt)
         sox_args = await get_sox_args_for_pcm_stream(
-            pcmargs.sample_rate,
-            pcmargs.bit_depth,
-            pcmargs.channels,
+            start_streamdetails.sample_rate,
+            start_streamdetails.bit_depth,
+            start_streamdetails.channels,
             output_format=output_fmt,
         )
-        try:
-            # get the raw pcm bytes from the queue stream and on the fly encode as flac
-            # send the flac endoded stream to the subscribers.
-            async with AsyncProcess(sox_args, True) as sox_proc:
-
-                async def reader():
-                    # task that reads flac endoded chunks from the subprocess
-                    chunksize = 32000 if output_fmt == ContentType.MP3 else 256000
-                    async for audio_chunk in sox_proc.iterate_chunks(chunksize):
-                        await resp.write(audio_chunk)
-
-                # feed raw pcm chunks into sox/ffmpeg to encode to flac
-                async def audio_callback(audio_chunk):
-                    if audio_chunk == b"":
-                        sox_proc.write_eof()
+        # get the raw pcm bytes from the queue stream and on the fly encode as to wanted format
+        # send the compressed/endoded stream to the client.
+        async with AsyncProcess(sox_args, True) as sox_proc:
+
+            async def writer():
+                # task that sends the raw pcm audio to the sox/ffmpeg process
+                async for audio_chunk in self._get_queue_stream(
+                    queue,
+                    sample_rate=start_streamdetails.sample_rate,
+                    bit_depth=start_streamdetails.bit_depth,
+                    channels=start_streamdetails.channels,
+                ):
+                    if sox_proc.closed:
                         return
                     await sox_proc.write(audio_chunk)
+                # write eof when last packet is received
+                sox_proc.write_eof()
 
-                # wait for the output task to complete
-                await self.subscribe(queue_id, clientid, audio_callback)
-                await reader()
+            create_task(writer)
+
+            # read bytes from final output
+            chunksize = 32000 if output_fmt == ContentType.MP3 else 90000
+            async for audio_chunk in sox_proc.iterate_chunks(chunksize):
+                await resp.write(audio_chunk)
 
-        finally:
-            await self.unsubscribe(queue_id, clientid)
         return resp
 
-    async def serve_stream_client_pcm(self, request: web.Request):
-        """Serve queue audio stream to client in the raw PCM format."""
+    async def serve_multi_client_queue_stream(self, request: web.Request):
+        """Serve queue audio stream to multiple (group)clients in the raw PCM format."""
         queue_id = request.match_info["queue_id"]
         queue = self.mass.players.get_player_queue(queue_id)
-        clientid = f'{request.remote}_{request.query.get("playerid", str(uuid4()))}'
+        client_id = str(uuid4())
 
         if queue is None:
             return web.Response(status=404)
 
         # prepare request
-        pcmargs = await self._get_queue_stream_pcm_args(queue_id, 32)
-        fmt = f"x-wav;codec=pcm;rate={pcmargs.sample_rate};bitrate={pcmargs.bit_depth};channels={pcmargs.channels}"
         resp = web.StreamResponse(
             status=200,
             reason="OK",
-            headers={"Content-Type": f"audio/{fmt}"},
+            headers={
+                "Content-Type": "audio/x-wav;codec=pcm;rate=44100;bitrate=16;channels=2"
+            },
         )
         await resp.prepare(request)
 
         # write wave header
-        wav_header = create_wave_header(
-            pcmargs.sample_rate,
-            pcmargs.channels,
-            pcmargs.bit_depth,
-        )
+        # multi subscriber queue is (currently) limited to 44100/16 format
+        wav_header = create_wave_header(44100, 2, 16)
         await resp.write(wav_header)
 
         # start delivering audio chunks
@@ -188,93 +180,92 @@ class StreamController:
                 except BrokenPipeError:
                     pass  # race condition
 
-            await self.subscribe(queue_id, clientid, audio_callback)
+            await self.subscribe(queue_id, client_id, audio_callback)
             await last_chunk_received.wait()
         finally:
-            await self.unsubscribe(queue_id, clientid)
+            await self.unsubscribe(queue_id, client_id)
         return resp
 
     async def subscribe(
-        self, queue_id: str, clientid: str, callback: Awaitable
+        self, queue_id: str, client_id: str, callback: Awaitable
     ) -> None:
         """Subscribe client to queue stream."""
-        self._subscribers.setdefault(queue_id, {})
-        if queue_id in self._subscribers[queue_id]:
-            # client is already subscribed ?
-            await self.unsubscribe(queue_id, clientid)
-        self._subscribers[queue_id][clientid] = callback
+        if queue_id not in self._subscribers:
+            self._subscribers[queue_id] = {}
+        self._subscribers[queue_id][client_id] = callback
+
+        queue = self.mass.players.get_player_queue(queue_id)
+        # calculate number of expected clients
+        expected_clients = 0
+        for child_id in queue.player.group_childs:
+            if player := self.mass.players.get_player(child_id):
+                if player.powered:
+                    expected_clients += 1
+        assert expected_clients, "No clients expected for this stream"
+
         stream_task = self._stream_tasks.get(queue_id)
-        if not stream_task or stream_task.cancelled():
-            # first connect, start the stream task
-            task = asyncio.create_task(self.start_queue_stream(queue_id))
+        if stream_task is not None:
+            # a new client connected while we're already streaming, tell the queue to restart
+            stream_task.cancel()
+            await queue.resume()
+            return
+        # we start the stream as soon as we've reached the expected number of clients
+        # TODO: add timeout guard just in case we don't reach the number of expected client
+        if stream_task is None and len(self._subscribers[queue_id]) >= expected_clients:
+            # start the stream task
+            self._stream_tasks[queue_id] = task = asyncio.create_task(
+                self.start_multi_queue_stream(queue_id)
+            )
+            self.logger.debug("Multi client queue stream %s started", queue.queue_id)
 
             def task_done_callback(*args, **kwargs):
                 self._stream_tasks.pop(queue_id, None)
 
             task.add_done_callback(task_done_callback)
-            self._stream_tasks[queue_id] = task
 
-        self.logger.debug("Subscribed client %s to queue stream %s", clientid, queue_id)
+        self.logger.debug(
+            "Subscribed client %s to multi queue stream %s",
+            client_id,
+            queue.queue_id,
+        )
+        return client_id
 
     async def unsubscribe(self, queue_id: str, clientid: str):
         """Unsubscribe client from queue stream."""
         self._subscribers[queue_id].pop(clientid, None)
         self.logger.debug(
-            "Unsubscribed client %s from queue stream %s", clientid, queue_id
+            "Unsubscribed client %s from multi queue stream %s", clientid, queue_id
         )
         if len(self._subscribers[queue_id]) == 0:
             # no more clients, cancel stream task
-            self.logger.debug(
-                "Aborted queue stream %s due to no more clients", queue_id
-            )
             if task := self._stream_tasks.pop(queue_id, None):
+                self.logger.debug(
+                    "Aborted multi queue stream %s due to no more clients", queue_id
+                )
                 task.cancel()
-            self._pcmargs.pop(queue_id, None)
 
-    async def start_queue_stream(self, queue_id: str) -> None:
+    async def start_multi_queue_stream(self, queue_id: str) -> None:
         """Start the Queue stream feeding callbacks of listeners.."""
         queue = self.mass.players.get_player_queue(queue_id)
-        pcmargs = await self._get_queue_stream_pcm_args(queue_id)
-
-        self.logger.info(
-            "Starting Queue stream for Queue %s with args: %s", queue_id, pcmargs
-        )
-        async for chunk in self._get_queue_stream(
-            queue,
-            pcmargs.sample_rate,
-            pcmargs.bit_depth,
-            pcmargs.channels,
-        ):
+        async for chunk in self._get_queue_stream(queue, 44100, 16, 2, resample=True):
             if len(self._subscribers[queue_id].values()) == 0:
-                self.logger.info("Queue stream for Queue %s aborted", queue_id)
+                # just in case of race conditions
                 return
             await asyncio.gather(
                 *[cb(chunk) for cb in list(self._subscribers[queue_id].values())]
             )
-        self.logger.info("Queue stream for Queue %s finished.", queue_id)
         # send empty chunk to inform EOF
         await asyncio.gather(
             *[cb(b"") for cb in list(self._subscribers[queue_id].values())]
         )
 
-    async def _get_queue_stream_pcm_args(
-        self, queue_id: str, forced_bit_depth: int = None
-    ) -> PCMArgs:
-        """Return the current/ext PCM args for the queue stream."""
-        if queue_id in self._pcmargs:
-            return self._pcmargs[queue_id]
-        queue = self.mass.players.get_player_queue(queue_id)
-        next_streamdetails = await queue.queue_stream_prepare()
-        pcmargs = PCMArgs(
-            sample_rate=min(next_streamdetails.sample_rate, queue.max_sample_rate),
-            bit_depth=forced_bit_depth or next_streamdetails.bit_depth,
-            channels=2,
-        )
-        self._pcmargs[queue_id] = pcmargs
-        return pcmargs
-
     async def _get_queue_stream(
-        self, queue: PlayerQueue, sample_rate: int, bit_depth: int, channels: int = 2
+        self,
+        queue: PlayerQueue,
+        sample_rate: int,
+        bit_depth: int,
+        channels: int = 2,
+        resample: bool = False,
     ) -> AsyncGenerator[None, bytes]:
         """Stream the PlayerQueue's tracks as constant feed of PCM raw audio."""
         last_fadeout_data = b""
@@ -282,6 +273,14 @@ class StreamController:
         track_count = 0
         start_timestamp = time()
 
+        pcm_fmt = ContentType.from_bit_depth(bit_depth)
+        self.logger.info(
+            "Starting Queue audio stream for Queue %s (PCM format: %s - sample rate: %s)",
+            queue.player.name,
+            pcm_fmt,
+            sample_rate,
+        )
+
         # stream queue tracks one by one
         while True:
             # get the (next) track in queue
@@ -308,33 +307,31 @@ class StreamController:
                 self.logger.warning("Skip track due to missing streamdetails")
                 continue
 
-            # get the PCM samplerate/bitrate
-            if streamdetails.bit_depth > bit_depth:
+            # check the PCM samplerate/bitrate
+            if not resample and streamdetails.bit_depth > bit_depth:
                 await queue.queue_stream_signal_next()
                 self.logger.debug("Abort queue stream due to bit depth mismatch")
                 await queue.queue_stream_signal_next()
                 break
             if (
-                streamdetails.sample_rate > sample_rate
+                not resample
+                and streamdetails.sample_rate > sample_rate
                 and streamdetails.sample_rate <= queue.max_sample_rate
             ):
                 self.logger.debug("Abort queue stream due to sample rate mismatch")
                 await queue.queue_stream_signal_next()
                 break
 
-            pcm_fmt = ContentType.from_bit_depth(bit_depth)
             sample_size = int(sample_rate * (bit_depth / 8) * channels)  # 1 second
             buffer_size = sample_size * (
                 queue.crossfade_duration or 1
             )  # 1...10 seconds
 
             self.logger.debug(
-                "Start Streaming queue track: %s (%s) for player %s - PCM format: %s - rate: %s",
+                "Start Streaming queue track: %s (%s) for queue %s",
                 queue_track.item_id,
                 queue_track.name,
                 queue.player.name,
-                pcm_fmt.value,
-                sample_rate,
             )
             fade_in_part = b""
             cur_chunk = 0
@@ -464,3 +461,4 @@ class StreamController:
         # end of queue reached, pass last fadeout bits to final output
         yield last_fadeout_data
         # END OF QUEUE STREAM
+        self.logger.info("Queue stream for Queue %s finished.", queue.queue_id)
index fff0e5b8c9c22a2a02bb521ac94eb68273353590..3da9c27439cf471659fe5c62ba7e20a9b4e90982 100644 (file)
@@ -259,11 +259,10 @@ async def get_gain_correct(
     return (track_loudness, gain_correct)
 
 
-def create_wave_header(samplerate=44100, channels=2, bitspersample=16, duration=1800):
+def create_wave_header(samplerate=44100, channels=2, bitspersample=16, duration=None):
     """Generate a wave header from given params."""
     # pylint: disable=no-member
     file = BytesIO()
-    numsamples = samplerate * duration
 
     # Generate format chunk
     format_chunk_spec = b"<4sLHHLLHH"
@@ -279,8 +278,15 @@ def create_wave_header(samplerate=44100, channels=2, bitspersample=16, duration=
         bitspersample,  # 16 bits for two byte samples, etc.
     )
     # Generate data chunk
+    # duration = 3600*6.7
     data_chunk_spec = b"<4sL"
-    datasize = int(numsamples * channels * (bitspersample / 8))
+    if duration is None:
+        # use max value possible
+        datasize = 4254768000  # = 6,7 hours at 44100/16
+    else:
+        # calculate from duration
+        numsamples = samplerate * duration
+        datasize = int(numsamples * channels * (bitspersample / 8))
     data_chunk = struct.pack(
         data_chunk_spec,
         b"data",  # Chunk id
index f2a84852c2404de0c8e3f5a98f1c0ff10b46e27f..2f5b09a3aa39bf6fcc3c67451e531c08c0194f18 100644 (file)
@@ -26,6 +26,7 @@ class AsyncProcess:
         self._proc = None
         self._args = args
         self._enable_write = enable_write
+        self.closed = False
 
     async def __aenter__(self) -> "AsyncProcess":
         """Enter context manager."""
@@ -54,6 +55,7 @@ class AsyncProcess:
 
     async def __aexit__(self, exc_type, exc_value, traceback) -> bool:
         """Exit context manager."""
+        self.closed = True
         if self._proc.returncode is None:
             # prevent subprocess deadlocking, send terminate and read remaining bytes
             if self._enable_write:
index 0bd1579a041db6dcdea678f04feb4f91ea49c52b..7c823c8318336fbedb7604f8c8783976d51345e8 100755 (executable)
@@ -242,6 +242,7 @@ class ContentType(Enum):
     MP3 = "mp3"
     AAC = "aac"
     MPEG = "mpeg"
+    WAV = "wav"
     PCM_S16LE = "s16le"  # PCM signed 16-bit little-endian
     PCM_S24LE = "s24le"  # PCM signed 24-bit little-endian
     PCM_S32LE = "s32le"  # PCM signed 32-bit little-endian
index c00a77c8888bc50c23611701f499f19e24dd81b8..6c1fd4ead6726d32313311406749a644581ad65d 100755 (executable)
@@ -57,6 +57,7 @@ class Player(ABC):
     _attr_device_info: DeviceInfo = DeviceInfo()
     _attr_max_sample_rate: int = 96000
     _attr_active_queue_id: str = ""
+    _attr_use_multi_stream: bool = False
     # mass object will be set by playermanager at register
     mass: MusicAssistant = None  # type: ignore[assignment]
 
@@ -124,6 +125,20 @@ class Player(ABC):
         """
         return self.mass.players.get_player_queue(self._attr_active_queue_id)
 
+    @property
+    def use_multi_stream(self) -> bool:
+        """
+        Return bool if this player needs multistream approach.
+
+        This is used for groupplayers that do not distribute the audio streams over players.
+        Instead this can be used as convenience service where each client receives the same audio
+        at more or less the same time. The player's implementation will be responsible for
+        synchronization of audio on child players (if possible), Music Assistant will only
+        coordinate the start and makes sure that every child received the same audio chunk
+        within the same timespan. Multi stream is currently limited to 44100/16 only.
+        """
+        return self._attr_use_multi_stream
+
     async def play_url(self, url: str) -> None:
         """Play the specified url on the player."""
         raise NotImplementedError
index 0baaf2f05e74a1b77bcc03f70b3eb3e8a11cfa54..7574219f99bab1b4cef25eef73b763517e2c35fd 100644 (file)
@@ -601,6 +601,9 @@ class PlayerQueue:
             queue_track = None
             while len(self._items) > queue_index:
                 queue_track = self._items[queue_index]
+                if queue_track.duration is None:
+                    # in case of a radio stream
+                    queue_track.duration = 86400
                 if elapsed_time_queue > (queue_track.duration + total_time):
                     total_time += queue_track.duration
                     queue_index += 1