From: Marcel van der Veldt Date: Wed, 6 Apr 2022 09:00:45 +0000 (+0200) Subject: Support multi client streams (#243) X-Git-Url: https://git.kitaultman.com/?a=commitdiff_plain;h=c8ae5abc3b7c904583f89ef013e734a762db28a7;p=music-assistant-server.git Support multi client streams (#243) * handle brokenpipe error at shutdown * catch json parse errors in aiohttp * stability fixes * remove group player workaround * fix error when streaming radio * support for multi client queue streams --- diff --git a/music_assistant/controllers/stream.py b/music_assistant/controllers/stream.py index cfe62cd5..71428644 100644 --- a/music_assistant/controllers/stream.py +++ b/music_assistant/controllers/stream.py @@ -3,7 +3,6 @@ from __future__ import annotations import asyncio from asyncio import Task -from dataclasses import dataclass from time import time from typing import AsyncGenerator, Awaitable, Callable, Dict, List from uuid import uuid4 @@ -21,21 +20,12 @@ from music_assistant.helpers.audio import ( ) from music_assistant.helpers.process import AsyncProcess from music_assistant.helpers.typing import MusicAssistant -from music_assistant.helpers.util import get_ip +from music_assistant.helpers.util import create_task, get_ip from music_assistant.models.errors import MediaNotFoundError from music_assistant.models.media_items import ContentType from music_assistant.models.player_queue import PlayerQueue -@dataclass(frozen=True) -class PCMArgs: - """Specify raw pcm audio.""" - - sample_rate: int - bit_depth: int - channels: int - - class StreamController: """Controller to stream audio to players.""" @@ -47,19 +37,23 @@ class StreamController: self._ip: str = get_ip() self._subscribers: Dict[str, Dict[str, List[Callable]]] = {} self._stream_tasks: Dict[str, Task] = {} - self._pcmargs: Dict[str, PCMArgs] = {} def get_stream_url(self, queue_id: str) -> str: """Return the full stream url for the PlayerQueue Stream.""" + queue = self.mass.players.get_player_queue(queue_id) + if queue.player.use_multi_stream: + return f"http://{self._ip}:{self._port}/multi/{queue_id}.wav" return f"http://{self._ip}:{self._port}/{queue_id}.flac" async def setup(self) -> None: """Async initialize of module.""" app = web.Application() - app.router.add_get("/{queue_id}.wav", self.serve_stream_client_pcm) - app.router.add_get("/{queue_id}.{format}", self.serve_stream_client) - app.router.add_get("/{queue_id}", self.serve_stream_client) + app.router.add_get( + "/multi/{queue_id}.wav", self.serve_multi_client_queue_stream + ) + app.router.add_get("/{queue_id}.{format}", self.serve_queue_stream) + app.router.add_get("/{queue_id}", self.serve_queue_stream) runner = web.AppRunner(app, access_log=None) await runner.setup() @@ -99,13 +93,13 @@ class StreamController: self.logger.info("Started stream server on port %s", self._port) - async def serve_stream_client(self, request: web.Request): - """Serve queue audio stream to client (encoded to FLAC or MP3).""" + async def serve_queue_stream(self, request: web.Request): + """Serve queue audio stream to a single player (encoded to fileformat of choice).""" queue_id = request.match_info["queue_id"] - clientid = f'{request.remote}_{request.query.get("playerid", str(uuid4()))}' fmt = request.match_info.get("format", "flac") + queue = self.mass.players.get_player_queue(queue_id) - if self.mass.players.get_player_queue(queue_id) is None: + if queue is None: return web.Response(status=404) # prepare request @@ -114,65 +108,63 @@ class StreamController: ) await resp.prepare(request) - pcmargs = await self._get_queue_stream_pcm_args(queue_id) + start_streamdetails = await queue.queue_stream_prepare() output_fmt = ContentType(fmt) sox_args = await get_sox_args_for_pcm_stream( - pcmargs.sample_rate, - pcmargs.bit_depth, - pcmargs.channels, + start_streamdetails.sample_rate, + start_streamdetails.bit_depth, + start_streamdetails.channels, output_format=output_fmt, ) - try: - # get the raw pcm bytes from the queue stream and on the fly encode as flac - # send the flac endoded stream to the subscribers. - async with AsyncProcess(sox_args, True) as sox_proc: - - async def reader(): - # task that reads flac endoded chunks from the subprocess - chunksize = 32000 if output_fmt == ContentType.MP3 else 256000 - async for audio_chunk in sox_proc.iterate_chunks(chunksize): - await resp.write(audio_chunk) - - # feed raw pcm chunks into sox/ffmpeg to encode to flac - async def audio_callback(audio_chunk): - if audio_chunk == b"": - sox_proc.write_eof() + # get the raw pcm bytes from the queue stream and on the fly encode as to wanted format + # send the compressed/endoded stream to the client. + async with AsyncProcess(sox_args, True) as sox_proc: + + async def writer(): + # task that sends the raw pcm audio to the sox/ffmpeg process + async for audio_chunk in self._get_queue_stream( + queue, + sample_rate=start_streamdetails.sample_rate, + bit_depth=start_streamdetails.bit_depth, + channels=start_streamdetails.channels, + ): + if sox_proc.closed: return await sox_proc.write(audio_chunk) + # write eof when last packet is received + sox_proc.write_eof() - # wait for the output task to complete - await self.subscribe(queue_id, clientid, audio_callback) - await reader() + create_task(writer) + + # read bytes from final output + chunksize = 32000 if output_fmt == ContentType.MP3 else 90000 + async for audio_chunk in sox_proc.iterate_chunks(chunksize): + await resp.write(audio_chunk) - finally: - await self.unsubscribe(queue_id, clientid) return resp - async def serve_stream_client_pcm(self, request: web.Request): - """Serve queue audio stream to client in the raw PCM format.""" + async def serve_multi_client_queue_stream(self, request: web.Request): + """Serve queue audio stream to multiple (group)clients in the raw PCM format.""" queue_id = request.match_info["queue_id"] queue = self.mass.players.get_player_queue(queue_id) - clientid = f'{request.remote}_{request.query.get("playerid", str(uuid4()))}' + client_id = str(uuid4()) if queue is None: return web.Response(status=404) # prepare request - pcmargs = await self._get_queue_stream_pcm_args(queue_id, 32) - fmt = f"x-wav;codec=pcm;rate={pcmargs.sample_rate};bitrate={pcmargs.bit_depth};channels={pcmargs.channels}" resp = web.StreamResponse( status=200, reason="OK", - headers={"Content-Type": f"audio/{fmt}"}, + headers={ + "Content-Type": "audio/x-wav;codec=pcm;rate=44100;bitrate=16;channels=2" + }, ) await resp.prepare(request) # write wave header - wav_header = create_wave_header( - pcmargs.sample_rate, - pcmargs.channels, - pcmargs.bit_depth, - ) + # multi subscriber queue is (currently) limited to 44100/16 format + wav_header = create_wave_header(44100, 2, 16) await resp.write(wav_header) # start delivering audio chunks @@ -188,93 +180,92 @@ class StreamController: except BrokenPipeError: pass # race condition - await self.subscribe(queue_id, clientid, audio_callback) + await self.subscribe(queue_id, client_id, audio_callback) await last_chunk_received.wait() finally: - await self.unsubscribe(queue_id, clientid) + await self.unsubscribe(queue_id, client_id) return resp async def subscribe( - self, queue_id: str, clientid: str, callback: Awaitable + self, queue_id: str, client_id: str, callback: Awaitable ) -> None: """Subscribe client to queue stream.""" - self._subscribers.setdefault(queue_id, {}) - if queue_id in self._subscribers[queue_id]: - # client is already subscribed ? - await self.unsubscribe(queue_id, clientid) - self._subscribers[queue_id][clientid] = callback + if queue_id not in self._subscribers: + self._subscribers[queue_id] = {} + self._subscribers[queue_id][client_id] = callback + + queue = self.mass.players.get_player_queue(queue_id) + # calculate number of expected clients + expected_clients = 0 + for child_id in queue.player.group_childs: + if player := self.mass.players.get_player(child_id): + if player.powered: + expected_clients += 1 + assert expected_clients, "No clients expected for this stream" + stream_task = self._stream_tasks.get(queue_id) - if not stream_task or stream_task.cancelled(): - # first connect, start the stream task - task = asyncio.create_task(self.start_queue_stream(queue_id)) + if stream_task is not None: + # a new client connected while we're already streaming, tell the queue to restart + stream_task.cancel() + await queue.resume() + return + # we start the stream as soon as we've reached the expected number of clients + # TODO: add timeout guard just in case we don't reach the number of expected client + if stream_task is None and len(self._subscribers[queue_id]) >= expected_clients: + # start the stream task + self._stream_tasks[queue_id] = task = asyncio.create_task( + self.start_multi_queue_stream(queue_id) + ) + self.logger.debug("Multi client queue stream %s started", queue.queue_id) def task_done_callback(*args, **kwargs): self._stream_tasks.pop(queue_id, None) task.add_done_callback(task_done_callback) - self._stream_tasks[queue_id] = task - self.logger.debug("Subscribed client %s to queue stream %s", clientid, queue_id) + self.logger.debug( + "Subscribed client %s to multi queue stream %s", + client_id, + queue.queue_id, + ) + return client_id async def unsubscribe(self, queue_id: str, clientid: str): """Unsubscribe client from queue stream.""" self._subscribers[queue_id].pop(clientid, None) self.logger.debug( - "Unsubscribed client %s from queue stream %s", clientid, queue_id + "Unsubscribed client %s from multi queue stream %s", clientid, queue_id ) if len(self._subscribers[queue_id]) == 0: # no more clients, cancel stream task - self.logger.debug( - "Aborted queue stream %s due to no more clients", queue_id - ) if task := self._stream_tasks.pop(queue_id, None): + self.logger.debug( + "Aborted multi queue stream %s due to no more clients", queue_id + ) task.cancel() - self._pcmargs.pop(queue_id, None) - async def start_queue_stream(self, queue_id: str) -> None: + async def start_multi_queue_stream(self, queue_id: str) -> None: """Start the Queue stream feeding callbacks of listeners..""" queue = self.mass.players.get_player_queue(queue_id) - pcmargs = await self._get_queue_stream_pcm_args(queue_id) - - self.logger.info( - "Starting Queue stream for Queue %s with args: %s", queue_id, pcmargs - ) - async for chunk in self._get_queue_stream( - queue, - pcmargs.sample_rate, - pcmargs.bit_depth, - pcmargs.channels, - ): + async for chunk in self._get_queue_stream(queue, 44100, 16, 2, resample=True): if len(self._subscribers[queue_id].values()) == 0: - self.logger.info("Queue stream for Queue %s aborted", queue_id) + # just in case of race conditions return await asyncio.gather( *[cb(chunk) for cb in list(self._subscribers[queue_id].values())] ) - self.logger.info("Queue stream for Queue %s finished.", queue_id) # send empty chunk to inform EOF await asyncio.gather( *[cb(b"") for cb in list(self._subscribers[queue_id].values())] ) - async def _get_queue_stream_pcm_args( - self, queue_id: str, forced_bit_depth: int = None - ) -> PCMArgs: - """Return the current/ext PCM args for the queue stream.""" - if queue_id in self._pcmargs: - return self._pcmargs[queue_id] - queue = self.mass.players.get_player_queue(queue_id) - next_streamdetails = await queue.queue_stream_prepare() - pcmargs = PCMArgs( - sample_rate=min(next_streamdetails.sample_rate, queue.max_sample_rate), - bit_depth=forced_bit_depth or next_streamdetails.bit_depth, - channels=2, - ) - self._pcmargs[queue_id] = pcmargs - return pcmargs - async def _get_queue_stream( - self, queue: PlayerQueue, sample_rate: int, bit_depth: int, channels: int = 2 + self, + queue: PlayerQueue, + sample_rate: int, + bit_depth: int, + channels: int = 2, + resample: bool = False, ) -> AsyncGenerator[None, bytes]: """Stream the PlayerQueue's tracks as constant feed of PCM raw audio.""" last_fadeout_data = b"" @@ -282,6 +273,14 @@ class StreamController: track_count = 0 start_timestamp = time() + pcm_fmt = ContentType.from_bit_depth(bit_depth) + self.logger.info( + "Starting Queue audio stream for Queue %s (PCM format: %s - sample rate: %s)", + queue.player.name, + pcm_fmt, + sample_rate, + ) + # stream queue tracks one by one while True: # get the (next) track in queue @@ -308,33 +307,31 @@ class StreamController: self.logger.warning("Skip track due to missing streamdetails") continue - # get the PCM samplerate/bitrate - if streamdetails.bit_depth > bit_depth: + # check the PCM samplerate/bitrate + if not resample and streamdetails.bit_depth > bit_depth: await queue.queue_stream_signal_next() self.logger.debug("Abort queue stream due to bit depth mismatch") await queue.queue_stream_signal_next() break if ( - streamdetails.sample_rate > sample_rate + not resample + and streamdetails.sample_rate > sample_rate and streamdetails.sample_rate <= queue.max_sample_rate ): self.logger.debug("Abort queue stream due to sample rate mismatch") await queue.queue_stream_signal_next() break - pcm_fmt = ContentType.from_bit_depth(bit_depth) sample_size = int(sample_rate * (bit_depth / 8) * channels) # 1 second buffer_size = sample_size * ( queue.crossfade_duration or 1 ) # 1...10 seconds self.logger.debug( - "Start Streaming queue track: %s (%s) for player %s - PCM format: %s - rate: %s", + "Start Streaming queue track: %s (%s) for queue %s", queue_track.item_id, queue_track.name, queue.player.name, - pcm_fmt.value, - sample_rate, ) fade_in_part = b"" cur_chunk = 0 @@ -464,3 +461,4 @@ class StreamController: # end of queue reached, pass last fadeout bits to final output yield last_fadeout_data # END OF QUEUE STREAM + self.logger.info("Queue stream for Queue %s finished.", queue.queue_id) diff --git a/music_assistant/helpers/audio.py b/music_assistant/helpers/audio.py index fff0e5b8..3da9c274 100644 --- a/music_assistant/helpers/audio.py +++ b/music_assistant/helpers/audio.py @@ -259,11 +259,10 @@ async def get_gain_correct( return (track_loudness, gain_correct) -def create_wave_header(samplerate=44100, channels=2, bitspersample=16, duration=1800): +def create_wave_header(samplerate=44100, channels=2, bitspersample=16, duration=None): """Generate a wave header from given params.""" # pylint: disable=no-member file = BytesIO() - numsamples = samplerate * duration # Generate format chunk format_chunk_spec = b"<4sLHHLLHH" @@ -279,8 +278,15 @@ def create_wave_header(samplerate=44100, channels=2, bitspersample=16, duration= bitspersample, # 16 bits for two byte samples, etc. ) # Generate data chunk + # duration = 3600*6.7 data_chunk_spec = b"<4sL" - datasize = int(numsamples * channels * (bitspersample / 8)) + if duration is None: + # use max value possible + datasize = 4254768000 # = 6,7 hours at 44100/16 + else: + # calculate from duration + numsamples = samplerate * duration + datasize = int(numsamples * channels * (bitspersample / 8)) data_chunk = struct.pack( data_chunk_spec, b"data", # Chunk id diff --git a/music_assistant/helpers/process.py b/music_assistant/helpers/process.py index f2a84852..2f5b09a3 100644 --- a/music_assistant/helpers/process.py +++ b/music_assistant/helpers/process.py @@ -26,6 +26,7 @@ class AsyncProcess: self._proc = None self._args = args self._enable_write = enable_write + self.closed = False async def __aenter__(self) -> "AsyncProcess": """Enter context manager.""" @@ -54,6 +55,7 @@ class AsyncProcess: async def __aexit__(self, exc_type, exc_value, traceback) -> bool: """Exit context manager.""" + self.closed = True if self._proc.returncode is None: # prevent subprocess deadlocking, send terminate and read remaining bytes if self._enable_write: diff --git a/music_assistant/models/media_items.py b/music_assistant/models/media_items.py index 0bd1579a..7c823c83 100755 --- a/music_assistant/models/media_items.py +++ b/music_assistant/models/media_items.py @@ -242,6 +242,7 @@ class ContentType(Enum): MP3 = "mp3" AAC = "aac" MPEG = "mpeg" + WAV = "wav" PCM_S16LE = "s16le" # PCM signed 16-bit little-endian PCM_S24LE = "s24le" # PCM signed 24-bit little-endian PCM_S32LE = "s32le" # PCM signed 32-bit little-endian diff --git a/music_assistant/models/player.py b/music_assistant/models/player.py index c00a77c8..6c1fd4ea 100755 --- a/music_assistant/models/player.py +++ b/music_assistant/models/player.py @@ -57,6 +57,7 @@ class Player(ABC): _attr_device_info: DeviceInfo = DeviceInfo() _attr_max_sample_rate: int = 96000 _attr_active_queue_id: str = "" + _attr_use_multi_stream: bool = False # mass object will be set by playermanager at register mass: MusicAssistant = None # type: ignore[assignment] @@ -124,6 +125,20 @@ class Player(ABC): """ return self.mass.players.get_player_queue(self._attr_active_queue_id) + @property + def use_multi_stream(self) -> bool: + """ + Return bool if this player needs multistream approach. + + This is used for groupplayers that do not distribute the audio streams over players. + Instead this can be used as convenience service where each client receives the same audio + at more or less the same time. The player's implementation will be responsible for + synchronization of audio on child players (if possible), Music Assistant will only + coordinate the start and makes sure that every child received the same audio chunk + within the same timespan. Multi stream is currently limited to 44100/16 only. + """ + return self._attr_use_multi_stream + async def play_url(self, url: str) -> None: """Play the specified url on the player.""" raise NotImplementedError diff --git a/music_assistant/models/player_queue.py b/music_assistant/models/player_queue.py index 0baaf2f0..7574219f 100644 --- a/music_assistant/models/player_queue.py +++ b/music_assistant/models/player_queue.py @@ -601,6 +601,9 @@ class PlayerQueue: queue_track = None while len(self._items) > queue_index: queue_track = self._items[queue_index] + if queue_track.duration is None: + # in case of a radio stream + queue_track.duration = 86400 if elapsed_time_queue > (queue_track.duration + total_time): total_time += queue_track.duration queue_index += 1