From 74fe1a7a56f4214ef7bb53682c95603adfe0f584 Mon Sep 17 00:00:00 2001 From: Marcel van der Veldt Date: Fri, 8 Apr 2022 02:05:39 +0200 Subject: [PATCH] allow output format choice for multi client queue stream --- music_assistant/controllers/stream.py | 90 ++++++++++++++++---------- music_assistant/models/player_queue.py | 4 +- 2 files changed, 58 insertions(+), 36 deletions(-) diff --git a/music_assistant/controllers/stream.py b/music_assistant/controllers/stream.py index ea6823b4..c3aea3e2 100644 --- a/music_assistant/controllers/stream.py +++ b/music_assistant/controllers/stream.py @@ -10,7 +10,6 @@ from aiohttp import web from music_assistant.constants import EventType from music_assistant.helpers.audio import ( check_audio_support, - create_wave_header, crossfade_pcm_parts, get_media_stream, get_sox_args_for_pcm_stream, @@ -44,7 +43,6 @@ class StreamController: ) -> str: """Return the full stream url for the PlayerQueue Stream.""" if child_player: - fmt = "wav" return f"http://{self._ip}:{self._port}/{queue_id}/{child_player}.{fmt}" return f"http://{self._ip}:{self._port}/{queue_id}.{fmt}" @@ -144,6 +142,7 @@ class StreamController: """Serve queue audio stream to multiple (group)clients in the raw PCM format.""" queue_id = request.match_info["queue_id"] player_id = request.match_info["player_id"] + fmt = request.match_info.get("format", "flac") queue = self.mass.players.get_player_queue(queue_id) player = self.mass.players.get_player(player_id) @@ -154,17 +153,10 @@ class StreamController: resp = web.StreamResponse( status=200, reason="OK", - headers={ - "Content-Type": "audio/x-wav;codec=pcm;rate=44100;bitrate=16;channels=2" - }, + headers={"Content-Type": fmt}, ) await resp.prepare(request) - # write wave header - # multi subscriber queue is (currently) limited to 44100/16 format - wav_header = create_wave_header(44100, 2, 16) - await resp.write(wav_header) - # start delivering audio chunks await self.subscribe_client(queue_id, player_id) try: @@ -209,7 +201,7 @@ class StreamController: task.cancel() async def start_multi_client_queue_stream( - self, queue_id: str, expected_clients: Set[str] + self, queue_id: str, expected_clients: Set[str], output_fmt: ContentType ) -> None: """Start the Queue stream feeding callbacks of listeners..""" assert queue_id not in self._stream_tasks, "already running!" @@ -219,14 +211,14 @@ class StreamController: # create queue for expected clients self._client_queues.setdefault(queue_id, {}) for child_id in expected_clients: - self._client_queues[queue_id][child_id] = asyncio.Queue(5) + self._client_queues[queue_id][child_id] = asyncio.Queue(10) self._stream_tasks[queue_id] = asyncio.create_task( - self.__multi_client_queue_stream_runner(queue_id) + self.__multi_client_queue_stream_runner(queue_id, output_fmt) ) async def unpause_synced_start(): - # unpause if we got all clients (or timeout) + # unpause if we got all clients (or timeout) for (more or less) synced start expected_clients = len(self._client_queues[queue_id]) time_started = self._time_started[queue_id] while True: @@ -267,10 +259,11 @@ class StreamController: while len(self._client_queues.get(queue_id, {})) != 0: await asyncio.sleep(0.1) - async def __multi_client_queue_stream_runner(self, queue_id: str): + async def __multi_client_queue_stream_runner( + self, queue_id: str, output_fmt: ContentType + ): """Distribute audio chunks over connected clients in a multi client queue stream.""" queue = self.mass.players.get_player_queue(queue_id) - chunks_sent = 0 def cleanup_client_queue(player_id: str): if client_queue := self._client_queues[queue_id].get(player_id, None): @@ -280,27 +273,56 @@ class StreamController: client_queue.task_done() client_queue.put_nowait(b"") + start_streamdetails = await queue.queue_stream_prepare() + sox_args = await get_sox_args_for_pcm_stream( + start_streamdetails.sample_rate, + start_streamdetails.bit_depth, + start_streamdetails.channels, + output_format=output_fmt, + ) self.logger.debug("Multi client queue stream %s started", queue.queue_id) try: - async for chunk in self._get_queue_stream( - queue, 44100, 16, 2, resample=True - ): - chunks_sent += 1 - coros = [] - disconnected_clients = set() - for player_id, client_queue in self._client_queues[queue_id].items(): - if ( - chunks_sent >= 4 - and player_id not in self._subscribers[queue_id] - ): - # assume client did not connect or got disconnected somehow - disconnected_clients.add(player_id) - else: - coros.append(client_queue.put(chunk)) - await asyncio.gather(*coros) - for player_id in disconnected_clients: - cleanup_client_queue(player_id) + # get the raw pcm bytes from the queue stream and on the fly encode as to wanted format + # send the compressed/endoded stream to the client. + async with AsyncProcess(sox_args, True) as sox_proc: + + async def writer(): + # task that sends the raw pcm audio to the sox/ffmpeg process + async for audio_chunk in self._get_queue_stream( + queue, + sample_rate=start_streamdetails.sample_rate, + bit_depth=start_streamdetails.bit_depth, + channels=start_streamdetails.channels, + ): + if sox_proc.closed: + return + await sox_proc.write(audio_chunk) + # write eof when last packet is received + sox_proc.write_eof() + + async def reader(): + # read bytes from final output + chunks_sent = 0 + chunksize = 32000 if output_fmt == ContentType.MP3 else 90000 + async for chunk in sox_proc.iterate_chunks(chunksize): + chunks_sent += 1 + coros = [] + for player_id in list(self._client_queues[queue_id].keys()): + if ( + chunks_sent >= 20 + and player_id not in self._subscribers[queue_id] + ): + # assume client did not connect or got disconnected somehow + cleanup_client_queue(player_id) + self._client_queues[queue_id].pop(player_id, None) + else: + coros.append( + self._client_queues[queue_id][player_id].put(chunk) + ) + await asyncio.gather(*coros) + + await asyncio.gather(*[writer(), reader()]) # send empty chunk to inform EOF await asyncio.gather( *[cq.put(b"") for cq in self._client_queues[queue_id].values()] diff --git a/music_assistant/models/player_queue.py b/music_assistant/models/player_queue.py index 63cc47df..e516f069 100644 --- a/music_assistant/models/player_queue.py +++ b/music_assistant/models/player_queue.py @@ -15,7 +15,7 @@ from music_assistant.constants import EventType from music_assistant.helpers.audio import get_stream_details from music_assistant.helpers.typing import MusicAssistant from music_assistant.models.errors import MediaNotFoundError, QueueEmpty -from music_assistant.models.media_items import MediaType, StreamDetails +from music_assistant.models.media_items import ContentType, MediaType, StreamDetails from .player import Player, PlayerState @@ -392,7 +392,7 @@ class PlayerQueue: tasks.append(child_player.play_url(player_url)) tasks.append(child_player.pause()) await self.mass.players.streams.start_multi_client_queue_stream( - self.queue_id, expected_clients + self.queue_id, expected_clients, ContentType.FLAC ) await asyncio.gather(*tasks) else: -- 2.34.1