allow output format choice for multi client queue stream
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Fri, 8 Apr 2022 00:05:39 +0000 (02:05 +0200)
committerMarcel van der Veldt <m.vanderveldt@outlook.com>
Fri, 8 Apr 2022 00:05:39 +0000 (02:05 +0200)
music_assistant/controllers/stream.py
music_assistant/models/player_queue.py

index ea6823b46a1aa9975ee26dfcbcd1c1e59c7426ce..c3aea3e2787396ca77df1b221d8fcbd3d10b7fba 100644 (file)
@@ -10,7 +10,6 @@ from aiohttp import web
 from music_assistant.constants import EventType
 from music_assistant.helpers.audio import (
     check_audio_support,
-    create_wave_header,
     crossfade_pcm_parts,
     get_media_stream,
     get_sox_args_for_pcm_stream,
@@ -44,7 +43,6 @@ class StreamController:
     ) -> str:
         """Return the full stream url for the PlayerQueue Stream."""
         if child_player:
-            fmt = "wav"
             return f"http://{self._ip}:{self._port}/{queue_id}/{child_player}.{fmt}"
         return f"http://{self._ip}:{self._port}/{queue_id}.{fmt}"
 
@@ -144,6 +142,7 @@ class StreamController:
         """Serve queue audio stream to multiple (group)clients in the raw PCM format."""
         queue_id = request.match_info["queue_id"]
         player_id = request.match_info["player_id"]
+        fmt = request.match_info.get("format", "flac")
         queue = self.mass.players.get_player_queue(queue_id)
         player = self.mass.players.get_player(player_id)
 
@@ -154,17 +153,10 @@ class StreamController:
         resp = web.StreamResponse(
             status=200,
             reason="OK",
-            headers={
-                "Content-Type": "audio/x-wav;codec=pcm;rate=44100;bitrate=16;channels=2"
-            },
+            headers={"Content-Type": fmt},
         )
         await resp.prepare(request)
 
-        # write wave header
-        # multi subscriber queue is (currently) limited to 44100/16 format
-        wav_header = create_wave_header(44100, 2, 16)
-        await resp.write(wav_header)
-
         # start delivering audio chunks
         await self.subscribe_client(queue_id, player_id)
         try:
@@ -209,7 +201,7 @@ class StreamController:
                 task.cancel()
 
     async def start_multi_client_queue_stream(
-        self, queue_id: str, expected_clients: Set[str]
+        self, queue_id: str, expected_clients: Set[str], output_fmt: ContentType
     ) -> None:
         """Start the Queue stream feeding callbacks of listeners.."""
         assert queue_id not in self._stream_tasks, "already running!"
@@ -219,14 +211,14 @@ class StreamController:
         # create queue for expected clients
         self._client_queues.setdefault(queue_id, {})
         for child_id in expected_clients:
-            self._client_queues[queue_id][child_id] = asyncio.Queue(5)
+            self._client_queues[queue_id][child_id] = asyncio.Queue(10)
 
         self._stream_tasks[queue_id] = asyncio.create_task(
-            self.__multi_client_queue_stream_runner(queue_id)
+            self.__multi_client_queue_stream_runner(queue_id, output_fmt)
         )
 
         async def unpause_synced_start():
-            # unpause if we got all clients (or timeout)
+            # unpause if we got all clients (or timeout) for (more or less) synced start
             expected_clients = len(self._client_queues[queue_id])
             time_started = self._time_started[queue_id]
             while True:
@@ -267,10 +259,11 @@ class StreamController:
         while len(self._client_queues.get(queue_id, {})) != 0:
             await asyncio.sleep(0.1)
 
-    async def __multi_client_queue_stream_runner(self, queue_id: str):
+    async def __multi_client_queue_stream_runner(
+        self, queue_id: str, output_fmt: ContentType
+    ):
         """Distribute audio chunks over connected clients in a multi client queue stream."""
         queue = self.mass.players.get_player_queue(queue_id)
-        chunks_sent = 0
 
         def cleanup_client_queue(player_id: str):
             if client_queue := self._client_queues[queue_id].get(player_id, None):
@@ -280,27 +273,56 @@ class StreamController:
                     client_queue.task_done()
                 client_queue.put_nowait(b"")
 
+        start_streamdetails = await queue.queue_stream_prepare()
+        sox_args = await get_sox_args_for_pcm_stream(
+            start_streamdetails.sample_rate,
+            start_streamdetails.bit_depth,
+            start_streamdetails.channels,
+            output_format=output_fmt,
+        )
         self.logger.debug("Multi client queue stream %s started", queue.queue_id)
         try:
-            async for chunk in self._get_queue_stream(
-                queue, 44100, 16, 2, resample=True
-            ):
-                chunks_sent += 1
-                coros = []
-                disconnected_clients = set()
-                for player_id, client_queue in self._client_queues[queue_id].items():
-                    if (
-                        chunks_sent >= 4
-                        and player_id not in self._subscribers[queue_id]
-                    ):
-                        # assume client did not connect or got disconnected somehow
-                        disconnected_clients.add(player_id)
-                    else:
-                        coros.append(client_queue.put(chunk))
-                await asyncio.gather(*coros)
-                for player_id in disconnected_clients:
-                    cleanup_client_queue(player_id)
 
+            # get the raw pcm bytes from the queue stream and on the fly encode as to wanted format
+            # send the compressed/endoded stream to the client.
+            async with AsyncProcess(sox_args, True) as sox_proc:
+
+                async def writer():
+                    # task that sends the raw pcm audio to the sox/ffmpeg process
+                    async for audio_chunk in self._get_queue_stream(
+                        queue,
+                        sample_rate=start_streamdetails.sample_rate,
+                        bit_depth=start_streamdetails.bit_depth,
+                        channels=start_streamdetails.channels,
+                    ):
+                        if sox_proc.closed:
+                            return
+                        await sox_proc.write(audio_chunk)
+                    # write eof when last packet is received
+                    sox_proc.write_eof()
+
+                async def reader():
+                    # read bytes from final output
+                    chunks_sent = 0
+                    chunksize = 32000 if output_fmt == ContentType.MP3 else 90000
+                    async for chunk in sox_proc.iterate_chunks(chunksize):
+                        chunks_sent += 1
+                        coros = []
+                        for player_id in list(self._client_queues[queue_id].keys()):
+                            if (
+                                chunks_sent >= 20
+                                and player_id not in self._subscribers[queue_id]
+                            ):
+                                # assume client did not connect or got disconnected somehow
+                                cleanup_client_queue(player_id)
+                                self._client_queues[queue_id].pop(player_id, None)
+                            else:
+                                coros.append(
+                                    self._client_queues[queue_id][player_id].put(chunk)
+                                )
+                        await asyncio.gather(*coros)
+
+                await asyncio.gather(*[writer(), reader()])
             # send empty chunk to inform EOF
             await asyncio.gather(
                 *[cq.put(b"") for cq in self._client_queues[queue_id].values()]
index 63cc47dfd234abb201fb572798c7a7f7a376510e..e516f0698caaf0b94e54fee6a3b2b0e071d6d24d 100644 (file)
@@ -15,7 +15,7 @@ from music_assistant.constants import EventType
 from music_assistant.helpers.audio import get_stream_details
 from music_assistant.helpers.typing import MusicAssistant
 from music_assistant.models.errors import MediaNotFoundError, QueueEmpty
-from music_assistant.models.media_items import MediaType, StreamDetails
+from music_assistant.models.media_items import ContentType, MediaType, StreamDetails
 
 from .player import Player, PlayerState
 
@@ -392,7 +392,7 @@ class PlayerQueue:
                         tasks.append(child_player.play_url(player_url))
                         tasks.append(child_player.pause())
             await self.mass.players.streams.start_multi_client_queue_stream(
-                self.queue_id, expected_clients
+                self.queue_id, expected_clients, ContentType.FLAC
             )
             await asyncio.gather(*tasks)
         else: