From: Marcel van der Veldt Date: Thu, 21 Mar 2024 23:43:05 +0000 (+0100) Subject: Flow stream enhancements (#1162) X-Git-Url: https://git.kitaultman.com/?a=commitdiff_plain;h=8b1468d7087a724acfd0aafe84b8e5faf8a03b58;p=music-assistant-server.git Flow stream enhancements (#1162) --- diff --git a/music_assistant/server/controllers/streams.py b/music_assistant/server/controllers/streams.py index cde50047..89fefd41 100644 --- a/music_assistant/server/controllers/streams.py +++ b/music_assistant/server/controllers/streams.py @@ -13,13 +13,17 @@ import logging import time import urllib.parse from collections.abc import AsyncGenerator -from contextlib import asynccontextmanager from typing import TYPE_CHECKING import shortuuid from aiohttp import web -from music_assistant.common.helpers.util import get_ip, select_free_port, try_parse_bool +from music_assistant.common.helpers.util import ( + empty_queue, + get_ip, + select_free_port, + try_parse_bool, +) from music_assistant.common.models.config_entries import ( ConfigEntry, ConfigValueOption, @@ -85,8 +89,8 @@ class QueueStreamJob: The whole idea here is that the (pcm) audio source can be sent to multiple players at once. For example for (slimproto/airplay) syncgroups and universal group. - All client players receive the exact same audio chunks from the source audio, - then encoded and/or resampled to the player's preferences. + All client players receive the exact same PCM audio chunks from the source audio, + which then can be optionally encoded and/or resampled to the player's preferences. In case a stream is restarted (e.g. when seeking), a new QueueStreamJob will be created. """ @@ -104,43 +108,54 @@ class QueueStreamJob: self.mass = mass self.pcm_audio_source = pcm_audio_source self.pcm_format = pcm_format - self.auto_start = auto_start self.expected_players: set[str] = set() self.job_id = shortuuid.uuid() self.bytes_streamed: int = 0 - self.logger = self.mass.streams.logger.getChild(f"stream_job.{self.job_id}") - self._subscribed_players: dict[str, AsyncProcess] = {} + self.logger = self.mass.streams.logger + self.subscribed_players: dict[str, asyncio.Queue[bytes]] = {} self._finished = False - self._running = False - self.allow_start = asyncio.Event() + self.allow_start = auto_start + self._all_clients_connected = asyncio.Event() self._audio_task = asyncio.create_task(self._stream_job_runner()) @property def finished(self) -> bool: """Return if this StreamJob is finished.""" - return self._finished or self._audio_task and self._audio_task.done() + return self._finished or (self._audio_task and self._audio_task.done()) @property def pending(self) -> bool: """Return if this Job is pending start.""" - return not self.finished and not self.running + return not self.finished and not self._all_clients_connected.is_set() @property def running(self) -> bool: """Return if this Job is running.""" - return self._running and self._audio_task and not self._audio_task.done() + return ( + self._all_clients_connected.is_set() + and self._audio_task + and not self._audio_task.done() + ) def start(self) -> None: """Start running (send audio chunks to connected players).""" if self.finished: raise RuntimeError("Task is already finished") - self.allow_start.set() + self.allow_start = True + if self.expected_players and len(self.subscribed_players) >= len(self.expected_players): + self._all_clients_connected.set() def stop(self) -> None: """Stop running this job.""" if self._audio_task and not self._audio_task.done(): self._audio_task.cancel() + if not self._finished: + # we need to make sure that we close the async generator + task = asyncio.create_task(self.pcm_audio_source.__anext__()) + task.cancel() self._finished = True + for sub_queue in self.subscribed_players.values(): + empty_queue(sub_queue) def resolve_stream_url(self, player_id: str, output_codec: ContentType) -> str: """Resolve the childplayer specific stream URL to this streamjob.""" @@ -167,44 +182,27 @@ class QueueStreamJob: self, player_id: str, output_format: AudioFormat, chunk_size: int | None = None ) -> AsyncGenerator[bytes, None]: """Subscribe consumer and iterate player-specific audio.""" - ffmpeg_args = get_ffmpeg_args( + async for chunk in get_ffmpeg_stream( + audio_input=self.subscribe(player_id), input_format=self.pcm_format, output_format=output_format, filter_params=get_player_filter_params(self.mass, player_id), - extra_args=[], - input_path="-", - output_path="-", - loglevel="info" if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL) else "quiet", - ) - # launch ffmpeg process with player specific settings - # the stream_job_runner will start pushing pcm chunks to the stdin - # we then read the players-specific (encoded) output chunks - # from ffmpeg stdout and yield them - async with AsyncProcess( - ffmpeg_args, enable_stdin=True, enable_stdout=True, enable_stderr=False - ) as ffmpeg_proc, self.subscribe(player_id, ffmpeg_proc): - # read final chunks from ffmpeg's stdout - iterator = ( - ffmpeg_proc.iter_chunked(chunk_size) if chunk_size else ffmpeg_proc.iter_any() - ) - async for chunk in iterator: - try: - yield chunk - except (BrokenPipeError, ConnectionResetError): - # race condition? - break + chunk_size=chunk_size, + ): + yield chunk async def stream_to_custom_output_path( - self, player_id: str, output_format: AudioFormat, output_path: str + self, player_id: str, output_format: AudioFormat, output_path: str | int ) -> None: """Subscribe consumer and instruct ffmpeg to send the audio to the given output path.""" + custom_file_pointer = isinstance(output_path, int) ffmpeg_args = get_ffmpeg_args( input_format=self.pcm_format, output_format=output_format, filter_params=get_player_filter_params(self.mass, player_id), extra_args=[], input_path="-", - output_path=output_path, + output_path="-" if custom_file_pointer else output_path, loglevel="info" if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL) else "quiet", ) # launch ffmpeg process with player specific settings @@ -213,72 +211,94 @@ class QueueStreamJob: async with AsyncProcess( ffmpeg_args, enable_stdin=True, - enable_stdout=False, + enable_stdout=custom_file_pointer, enable_stderr=False, - ) as ffmpeg_proc, self.subscribe(player_id, ffmpeg_proc): + custom_stdin=self.subscribe(player_id), + custom_stdout=output_path if custom_file_pointer else None, + name="ffmpeg_custom_output_path", + ) as ffmpeg_proc: # we simply wait for the process to exit await ffmpeg_proc.wait() - @asynccontextmanager - async def subscribe( - self, player_id: str, ffmpeg_proc: AsyncProcess - ) -> AsyncGenerator[QueueStreamJob]: - """Subscribe consumer's (output) ffmpeg process.""" - if self.running: - # client subscribes while we're already started - # that will probably cause side effects but let it go - self.logger.warning( - "Player %s is joining while the stream is already started!", player_id - ) + async def subscribe(self, player_id: str) -> AsyncGenerator[bytes, None]: + """Subscribe consumer and iterate incoming chunks on the queue.""" try: - self._subscribed_players[player_id] = ffmpeg_proc - self.logger.debug("Subscribed player %s", player_id) - if self.auto_start and len(self._subscribed_players) == len(self.expected_players): - self.allow_start.set() - yield self + self.subscribed_players[player_id] = sub_queue = asyncio.Queue(2) + + if self._all_clients_connected.is_set(): + # client subscribes while we're already started + self.logger.warning( + "Client %s is joining while the stream is already started", player_id + ) + + self.logger.debug("Subscribed client %s", player_id) + + if ( + self.expected_players + and self.allow_start + and len(self.subscribed_players) == len(self.expected_players) + ): + # we reached the number of expected subscribers, set event + # so that chunks can be pushed + self._all_clients_connected.set() + + # keep reading audio chunks from the queue until we receive an empty one + while True: + chunk = await sub_queue.get() + if chunk == b"": + # EOF chunk received + break + yield chunk finally: - self._subscribed_players.pop(player_id, None) + self.subscribed_players.pop(player_id, None) self.logger.debug("Unsubscribed client %s", player_id) # check if this was the last subscriber and we should cancel - await asyncio.sleep(5) - if len(self._subscribed_players) == 0 and not self.finished: + await asyncio.sleep(2) + if len(self.subscribed_players) == 0 and self._audio_task and not self.finished: self.logger.debug("Cleaning up, all clients disappeared...") self.stop() + async def _put_chunk(self, chunk: bytes) -> None: + """Put chunk of data to all subscribers.""" + async with asyncio.TaskGroup() as tg: + for sub_queue in list(self.subscribed_players.values()): + # put this chunk on the player's subqueue + tg.create_task(sub_queue.put(chunk)) + self.bytes_streamed += len(chunk) + async def _stream_job_runner(self) -> None: """Feed audio chunks to StreamJob subscribers.""" - await self.allow_start.wait() - retries = 50 - while retries: - retries -= 1 - await asyncio.sleep(0.1) - if len(self._subscribed_players) != len(self.expected_players): - continue - await asyncio.sleep(0.2) - if len(self._subscribed_players) != len(self.expected_players): - continue - break - - self.logger.debug( - "Starting stream job %s with %s out of %s connected clients", - self.job_id, - len(self._subscribed_players), - len(self.expected_players), - ) + chunk_num = 0 async for chunk in self.pcm_audio_source: - num_subscribers = len(self._subscribed_players) - if num_subscribers == 0: - break - async with asyncio.TaskGroup() as tg: - for ffmpeg_proc in list(self._subscribed_players.values()): - tg.create_task(ffmpeg_proc.write(chunk)) + chunk_num += 1 + if chunk_num == 1: + # wait until all expected clients are connected + try: + async with asyncio.timeout(10): + await self._all_clients_connected.wait() + except TimeoutError: + if len(self.subscribed_players) == 0: + self.logger.exception( + "Abort multi client stream job %s: " + "client(s) did not connect within timeout", + self.job_id, + ) + break + # not all clients connected but timeout expired, set flag and move on + # with all clients that did connect + self._all_clients_connected.set() + else: + self.logger.debug( + "Starting queue stream job %s with %s (out of %s) connected clients", + self.job_id, + len(self.subscribed_players), + len(self.expected_players), + ) - # write EOF at end of queue stream - async with asyncio.TaskGroup() as tg: - for ffmpeg_proc in list(self._subscribed_players.values()): - tg.create_task(ffmpeg_proc.write_eof()) - self.logger.debug("Finished stream job %s", self.job_id) - self._finished = True + await self._put_chunk(chunk) + + # mark EOF with empty chunk + await self._put_chunk(b"") def parse_pcm_info(content_type: str) -> tuple[int, int, int]: @@ -585,9 +605,13 @@ class StreamsController(CoreController): default_sample_rate=stream_job.pcm_format.sample_rate, default_bit_depth=stream_job.pcm_format.bit_depth, ) - # prepare request, add some DLNA/UPNP compatible headers - enable_icy = request.headers.get("Icy-MetaData", "") == "1" + # play it safe: only allow icy metadata for mp3 and aac + enable_icy = request.headers.get( + "Icy-MetaData", "" + ) == "1" and output_format.content_type in (ContentType.MP3, ContentType.AAC) icy_meta_interval = 16384 * 4 if output_format.content_type.is_lossless() else 16384 + + # prepare request, add some DLNA/UPNP compatible headers headers = { **DEFAULT_STREAM_HEADERS, "Content-Type": f"audio/{output_format.output_format_str}", @@ -606,6 +630,19 @@ class StreamsController(CoreController): if request.method != "GET": return resp + # some players (e.g. dlna, sonos) misbehave and do multiple GET requests + # to the stream in an attempt to get the audio details such as duration + # which is a bit pointless for our duration-less queue stream + # and it completely messes with the subscription logic + if player_id in stream_job.subscribed_players: + self.logger.warning( + "Player %s is making multiple requests " + "to the same stream, playback may be disturbed!", + player_id, + ) + elif "rincon" in player_id.lower(): + await asyncio.sleep(0.1) + # all checks passed, start streaming! self.logger.debug( "Start serving Queue flow audio stream for queue %s to player %s", @@ -912,7 +949,6 @@ class StreamsController(CoreController): if content_type == ContentType.PCM: # resolve generic pcm type content_type = ContentType.from_bit_depth(output_bit_depth) - else: output_sample_rate = min(default_sample_rate, queue_player.max_sample_rate) player_max_bit_depth = 24 if queue_player.supports_24bit else 16 diff --git a/music_assistant/server/helpers/audio.py b/music_assistant/server/helpers/audio.py index 321b9755..720d3ac5 100644 --- a/music_assistant/server/helpers/audio.py +++ b/music_assistant/server/helpers/audio.py @@ -335,7 +335,6 @@ async def get_media_stream( # noqa: PLR0915 Other than stripping silence at end and beginning and optional volume normalization this is the pure, unaltered audio data as PCM chunks. """ - logger = LOGGER.getChild("media_stream") bytes_sent = 0 is_radio = streamdetails.media_type == MediaType.RADIO or not streamdetails.duration if is_radio or streamdetails.seek_position: @@ -377,27 +376,28 @@ async def get_media_stream( # noqa: PLR0915 ) finished = False - logger.debug("start media stream for: %s", streamdetails.uri) - writer_task: asyncio.Task | None = None ffmpeg_proc = AsyncProcess( - ffmpeg_args, enable_stdin=streamdetails.direct is None, enable_stderr=True + ffmpeg_args, + enable_stdin=streamdetails.direct is None, + enable_stderr=True, + name="ffmpeg_media_stream", ) await ffmpeg_proc.start() + logger = LOGGER.getChild("media_stream") + logger.debug("start media stream for: %s", streamdetails.uri) async def writer() -> None: """Task that grabs the source audio and feeds it to ffmpeg.""" - logger.log(VERBOSE_LOG_LEVEL, "writer started for %s", streamdetails.uri) music_prov = mass.get_provider(streamdetails.provider) seek_pos = streamdetails.seek_position if streamdetails.can_seek else 0 async for audio_chunk in music_prov.get_audio_stream(streamdetails, seek_pos): await ffmpeg_proc.write(audio_chunk) # write eof when last packet is received await ffmpeg_proc.write_eof() - logger.log(VERBOSE_LOG_LEVEL, "writer finished for %s", streamdetails.uri) if streamdetails.direct is None: - writer_task = asyncio.create_task(writer()) + ffmpeg_proc.attached_tasks.append(asyncio.create_task(writer())) # get pcm chunks from stdout # we always stay one chunk behind to properly detect end of chunks @@ -464,8 +464,7 @@ async def get_media_stream( # noqa: PLR0915 streamdetails.uri, seconds_streamed, ) - if writer_task and not writer_task.done(): - writer_task.cancel() + # use communicate to read stderr and wait for exit # read log for loudness measurement (or errors) _, stderr = await ffmpeg_proc.communicate() @@ -674,7 +673,6 @@ async def get_ffmpeg_stream( Takes care of resampling and/or recoding if needed, according to player preferences. """ - logger = LOGGER.getChild("ffmpeg_stream") use_stdin = not isinstance(audio_input, str) ffmpeg_args = get_ffmpeg_args( input_format=input_format, @@ -683,44 +681,20 @@ async def get_ffmpeg_stream( extra_args=extra_args or [], input_path="-" if use_stdin else audio_input, output_path="-", + loglevel="info" if LOGGER.isEnabledFor(VERBOSE_LOG_LEVEL) else "quiet", ) - - writer_task: asyncio.Task | None = None - ffmpeg_proc = AsyncProcess( - ffmpeg_args, enable_stdin=use_stdin, enable_stdout=True, enable_stderr=True - ) - await ffmpeg_proc.start() - - # feed stdin with pcm audio chunks from origin - async def writer() -> None: - async for chunk in audio_input: - if ffmpeg_proc.closed: - return - await ffmpeg_proc.write(chunk) - await ffmpeg_proc.write_eof() - - try: - if not isinstance(audio_input, str): - writer_task = asyncio.create_task(writer()) - + async with AsyncProcess( + ffmpeg_args, + enable_stdin=use_stdin, + enable_stdout=True, + enable_stderr=False, + custom_stdin=audio_input if use_stdin else None, + name="player_ffmpeg_stream", + ) as ffmpeg_proc: # read final chunks from stdout chunk_size = chunk_size or get_chunksize(output_format, 1) async for chunk in ffmpeg_proc.iter_chunked(chunk_size): - try: - yield chunk - except (BrokenPipeError, ConnectionResetError): - # race condition - break - finally: - if writer_task and not writer_task.done(): - writer_task.cancel() - # use communicate to read stderr and wait for exit - _, stderr = await ffmpeg_proc.communicate() - if ffmpeg_proc.returncode != 0: - # ffmpeg has a non zero returncode meaning trouble - logger.warning("FFMPEG ERROR\n%s", stderr.decode()) - else: - logger.log(VERBOSE_LOG_LEVEL, stderr.decode()) + yield chunk async def check_audio_support() -> tuple[bool, bool, str]: diff --git a/music_assistant/server/helpers/didl_lite.py b/music_assistant/server/helpers/didl_lite.py index c3a27314..69845357 100644 --- a/music_assistant/server/helpers/didl_lite.py +++ b/music_assistant/server/helpers/didl_lite.py @@ -6,7 +6,7 @@ import datetime from typing import TYPE_CHECKING from music_assistant.common.models.enums import MediaType -from music_assistant.constants import MASS_LOGO_ONLINE +from music_assistant.constants import MASS_LOGO_ONLINE, UGP_PREFIX if TYPE_CHECKING: from music_assistant.common.models.queue_item import QueueItem @@ -15,28 +15,27 @@ if TYPE_CHECKING: # ruff: noqa: E501 -def create_didl_metadata( - mass: MusicAssistant, url: str, queue_item: QueueItem | None = None -) -> str: +def create_didl_metadata(mass: MusicAssistant, url: str, queue_item: QueueItem) -> str: """Create DIDL metadata string from url and (optional) QueueItem.""" ext = url.split(".")[-1].split("?")[0] - if queue_item is None: + if "flow" in url or queue_item.queue_id.startswith(UGP_PREFIX): + # flow stream return ( '' f'' - "Music Assistant" + f"Music Assistant" f"{escape_string(MASS_LOGO_ONLINE)}" + f"{queue_item.queue_id}" "object.item.audioItem.audioBroadcast" f"audio/{ext}" f'{escape_string(url)}' "" "" ) - is_radio = queue_item.media_type != MediaType.TRACK or not queue_item.duration image_url = ( mass.metadata.get_image_url(queue_item.image) if queue_item.image else MASS_LOGO_ONLINE ) - if is_radio: + if queue_item.media_type != MediaType.TRACK or not queue_item.duration: # radio or other non-track item return ( '' diff --git a/music_assistant/server/helpers/process.py b/music_assistant/server/helpers/process.py index db40ac74..d4cbc312 100644 --- a/music_assistant/server/helpers/process.py +++ b/music_assistant/server/helpers/process.py @@ -40,6 +40,9 @@ class AsyncProcess: enable_stdin: bool = False, enable_stdout: bool = True, enable_stderr: bool = False, + custom_stdin: AsyncGenerator[bytes, None] | int | None = None, + custom_stdout: int | None = None, + name: str | None = None, ) -> None: """Initialize AsyncProcess.""" self.proc: asyncio.subprocess.Process | None = None @@ -48,10 +51,14 @@ class AsyncProcess: self._enable_stdout = enable_stdout self._enable_stderr = enable_stderr self._close_called = False - self._stdin_lock = asyncio.Lock() - self._stdout_lock = asyncio.Lock() - self._stderr_lock = asyncio.Lock() self._returncode: bool | None = None + self._name = name or self._args[0].split(os.sep)[-1] + self.attached_tasks: list[asyncio.Task] = [] + self._custom_stdin = custom_stdin + if not isinstance(custom_stdin, int | None): + self._custom_stdin = None + self.attached_tasks.append(asyncio.create_task(self._feed_stdin(custom_stdin))) + self._custom_stdout = custom_stdout @property def closed(self) -> bool: @@ -81,22 +88,20 @@ class AsyncProcess: """Exit context manager.""" await self.close() self._returncode = self.returncode - del self.proc - del self._stdin_lock - del self._stdout_lock - del self._returncode async def start(self) -> None: """Perform Async init of process.""" + stdin = self._custom_stdin if self._custom_stdin is not None else asyncio.subprocess.PIPE + stdout = self._custom_stdout if self._custom_stdout is not None else asyncio.subprocess.PIPE self.proc = await asyncio.create_subprocess_exec( *self._args, - stdin=asyncio.subprocess.PIPE if self._enable_stdin else None, - stdout=asyncio.subprocess.PIPE if self._enable_stdout else None, + stdin=stdin if self._enable_stdin else None, + stdout=stdout if self._enable_stdout else None, stderr=asyncio.subprocess.PIPE if self._enable_stderr else None, - close_fds=True, + limit=4000000, + pipesize=256000, ) - proc_name_simple = self._args[0].split(os.sep)[-1] - LOGGER.debug("Started %s with PID %s", proc_name_simple, self.proc.pid) + LOGGER.debug("Started %s with PID %s", self._name, self.proc.pid) async def iter_chunked(self, n: int = DEFAULT_CHUNKSIZE) -> AsyncGenerator[bytes, None]: """Yield chunks of n size from the process stdout.""" @@ -116,11 +121,8 @@ class AsyncProcess: async def readexactly(self, n: int) -> bytes: """Read exactly n bytes from the process stdout (or less if eof).""" - if self._close_called or self.proc.stdout.at_eof(): - return b"" try: - async with self._stdout_lock: - return await self.proc.stdout.readexactly(n) + return await self.proc.stdout.readexactly(n) except asyncio.IncompleteReadError as err: return err.partial @@ -131,36 +133,21 @@ class AsyncProcess: and may return less or equal bytes than requested, but at least one byte. If EOF was received before any byte is read, this function returns empty byte object. """ - if self._close_called or self.proc.stdout.at_eof(): - return b"" - if self.proc.stdout.at_eof(): - return b"" - async with self._stdout_lock: - return await self.proc.stdout.read(n) + return await self.proc.stdout.read(n) async def write(self, data: bytes) -> None: """Write data to process stdin.""" if self._close_called or self.proc.stdin.is_closing(): - return - if not self.proc or self.proc.returncode is not None: - raise RuntimeError("Process not started or already exited") - async with self._stdin_lock: - self.proc.stdin.write(data) - with suppress(BrokenPipeError): - await self.proc.stdin.drain() + raise asyncio.CancelledError("write called while process already done") + self.proc.stdin.write(data) + with suppress(BrokenPipeError, ConnectionResetError): + await self.proc.stdin.drain() async def write_eof(self) -> None: """Write end of file to to process stdin.""" - if not self._enable_stdin: - raise RuntimeError("STDIN is not enabled") - if not self.proc or self.proc.returncode is not None: - raise RuntimeError("Process not started or already exited") - if self._close_called or self.proc.stdin.is_closing(): - return try: - async with self._stdin_lock: - if self.proc.stdin.can_write_eof(): - self.proc.stdin.write_eof() + if self.proc.stdin.can_write_eof(): + self.proc.stdin.write_eof() except ( AttributeError, AssertionError, @@ -174,20 +161,31 @@ class AsyncProcess: async def close(self) -> int: """Close/terminate the process and wait for exit.""" self._close_called = True - if self.proc.returncode is None: + # close any/all attached (writer) tasks + for task in self.attached_tasks: + if not task.done(): + task.cancel() + with suppress(asyncio.CancelledError): + await task + # send communicate until we exited + while self.proc.returncode is None: # make sure the process is cleaned up try: # we need to use communicate to ensure buffers are flushed - await asyncio.wait_for(self.proc.communicate(), 5) + await asyncio.wait_for(self.proc.communicate(), 10) except TimeoutError: LOGGER.debug( - "Process with PID %s did not stop within 5 seconds. Sending terminate...", + "Process %s with PID %s did not stop in time. Sending terminate...", + self._name, self.proc.pid, ) self.proc.terminate() - await self.proc.communicate() + await asyncio.sleep(0.5) LOGGER.debug( - "Process with PID %s stopped with returncode %s", self.proc.pid, self.proc.returncode + "Process %s with PID %s stopped with returncode %s", + self._name, + self.proc.pid, + self.proc.returncode, ) return self.proc.returncode @@ -199,19 +197,27 @@ class AsyncProcess: async def communicate(self, input_data: bytes | None = None) -> tuple[bytes, bytes]: """Write bytes to process and read back results.""" - if self.closed: - return (b"", b"") - async with self._stdout_lock, self._stdin_lock, self._stderr_lock: - stdout, stderr = await self.proc.communicate(input_data) + stdout, stderr = await self.proc.communicate(input_data) return (stdout, stderr) async def read_stderr(self) -> AsyncGenerator[bytes, None]: """Read lines from the stderr stream.""" - async with self._stderr_lock: - async for line in self.proc.stderr: - if self.closed: - break - yield line + async for line in self.proc.stderr: + yield line + + async def _feed_stdin(self, custom_stdin: AsyncGenerator[bytes, None]) -> None: + """Feed stdin with chunks from an AsyncGenerator.""" + try: + async for chunk in custom_stdin: + if self._close_called or self.proc.stdin.is_closing(): + return + await self.write(chunk) + await self.write_eof() + except asyncio.CancelledError: + # make sure the stdin generator is also properly closed + # by propagating a cancellederror within + task = asyncio.create_task(custom_stdin.__anext__()) + task.cancel() async def check_output(shell_cmd: str) -> tuple[int, bytes]: diff --git a/music_assistant/server/providers/airplay/__init__.py b/music_assistant/server/providers/airplay/__init__.py index 3d9a6404..89e9a5bd 100644 --- a/music_assistant/server/providers/airplay/__init__.py +++ b/music_assistant/server/providers/airplay/__init__.py @@ -236,7 +236,7 @@ class AirplayStream: "-port", str(self.airplay_player.discovery_info.port), "-wait", - str(2000 - sync_adjust), + str(3000 - sync_adjust), "-volume", str(mass_player.volume_level), *extra_args, @@ -252,15 +252,25 @@ class AirplayStream: if platform.system() == "Darwin": os.environ["DYLD_LIBRARY_PATH"] = "/usr/local/lib" + # connect cliraop stdin with ffmpeg stdout using os pipes + read, write = os.pipe() + # launch ffmpeg, feeding (player specific) audio chunks on stdout + self._audio_reader_task = asyncio.create_task( + stream_job.stream_to_custom_output_path( + player_id=player_id, output_format=AIRPLAY_PCM_FORMAT, output_path=write + ) + ) self._cliraop_proc = AsyncProcess( cliraop_args, enable_stdin=True, enable_stdout=False, enable_stderr=True, + custom_stdin=read, ) await self._cliraop_proc.start() self._log_reader_task = asyncio.create_task(self._log_watcher()) - self._audio_reader_task = asyncio.create_task(self._audio_reader()) + + # self._audio_reader_task = asyncio.create_task(self._audio_reader()) async def stop(self, wait: bool = True): """Stop playback and cleanup.""" @@ -303,36 +313,48 @@ class AirplayStream: """Monitor stderr for the running CLIRaop process.""" airplay_player = self.airplay_player mass_player = self.mass.players.get(airplay_player.player_id) + queue = self.mass.player_queues.get_active_queue(mass_player.active_source) logger = airplay_player.logger lost_packets = 0 + prev_metadata_checksum: str = "" + prev_progress_report: float = 0 async for line in self._cliraop_proc.read_stderr(): line = line.decode().strip() # noqa: PLW2901 if not line: continue if "elapsed milliseconds:" in line: - # do not log this line, its too verbose + # this is received more or less every second while playing millis = int(line.split("elapsed milliseconds: ")[1]) mass_player.elapsed_time = millis / 1000 mass_player.elapsed_time_last_updated = time.time() - continue + # send metadata to player(s) if needed + # NOTE: this must all be done in separate tasks to not disturb audio + now = time.time() + if queue and queue.current_item and queue.current_item.streamdetails: + metadata_checksum = ( + queue.current_item.streamdetails.stream_title + or queue.current_item.queue_item_id + ) + if prev_metadata_checksum != metadata_checksum: + prev_metadata_checksum = metadata_checksum + prev_progress_report = now + self.mass.create_task(self._send_metadata(queue)) + # send the progress report every 5 seconds + elif now - prev_progress_report >= 5: + prev_progress_report = now + self.mass.create_task(self._send_progress(queue)) if "set pause" in line or "Pause at" in line: - logger.debug("raop streaming paused") mass_player.state = PlayerState.PAUSED self.mass.players.update(airplay_player.player_id) - continue if "Restarted at" in line or "restarting w/ pause" in line: - logger.debug("raop streaming restarted after pause") mass_player.state = PlayerState.PLAYING self.mass.players.update(airplay_player.player_id) - continue if "restarting w/o pause" in line: # streaming has started - logger.debug("raop streaming started") mass_player.state = PlayerState.PLAYING mass_player.elapsed_time = 0 mass_player.elapsed_time_last_updated = time.time() self.mass.players.update(airplay_player.player_id) - continue if "lost packet out of backlog" in line: lost_packets += 1 if lost_packets == 50: @@ -341,8 +363,7 @@ class AirplayStream: await self.mass.player_queues.stop(queue.queue_id) else: logger.debug(line) - continue - # verbose log everything else + logger.log(VERBOSE_LOG_LEVEL, line) # if we reach this point, the process exited @@ -350,49 +371,6 @@ class AirplayStream: mass_player.state = PlayerState.IDLE self.mass.players.update(airplay_player.player_id) - async def _audio_reader(self) -> None: - """Send audio chunks to the cliraop process.""" - logger = self.airplay_player.logger - mass_player = self.mass.players.get(self.airplay_player.player_id, True) - queue = self.mass.player_queues.get_active_queue(mass_player.active_source) - logger.debug( - "Starting RAOP stream for Queue %s to %s", - queue.display_name, - mass_player.display_name, - ) - prev_metadata_checksum: str = "" - prev_progress_report: float = 0 - - async for chunk in self.stream_job.iter_player_audio( - self.airplay_player.player_id, AIRPLAY_PCM_FORMAT - ): - if self._stop_requested: - return - await self._cliraop_proc.write(chunk) - # send metadata to player(s) if needed - # NOTE: this must all be done in separate tasks to not disturb audio - now = time.time() - if queue and queue.current_item and queue.current_item.streamdetails: - metadata_checksum = ( - queue.current_item.streamdetails.stream_title - or queue.current_item.queue_item_id - ) - if prev_metadata_checksum != metadata_checksum: - prev_metadata_checksum = metadata_checksum - prev_progress_report = now - self.mass.create_task(self._send_metadata(queue)) - # send the progress report every 5 seconds - elif now - prev_progress_report >= 5: - prev_progress_report = now - self.mass.create_task(self._send_progress(queue)) - # send EOF - await self._cliraop_proc.write_eof() - logger.debug( - "Finished RAOP stream for Queue %s to %s", - queue.display_name, - mass_player.display_name, - ) - async def _send_metadata(self, queue: PlayerQueue) -> None: """Send metadata to player (and connected sync childs).""" if not self.running: diff --git a/music_assistant/server/providers/dlna/__init__.py b/music_assistant/server/providers/dlna/__init__.py index 4f7994a1..2181d78d 100644 --- a/music_assistant/server/providers/dlna/__init__.py +++ b/music_assistant/server/providers/dlna/__init__.py @@ -360,9 +360,7 @@ class DLNAPlayerProvider(PlayerProvider): # always clear queue (by sending stop) first if dlna_player.device.can_stop: await self.cmd_stop(player_id) - didl_metadata = create_didl_metadata( - self.mass, url, queue_item if not use_flow_mode else None - ) + didl_metadata = create_didl_metadata(self.mass, url, queue_item) title = queue_item.name if queue_item else "Music Assistant" await dlna_player.device.async_set_transport_uri(url, title, didl_metadata) # Play it diff --git a/music_assistant/server/providers/sonos/__init__.py b/music_assistant/server/providers/sonos/__init__.py index 0737d52d..dc52846d 100644 --- a/music_assistant/server/providers/sonos/__init__.py +++ b/music_assistant/server/providers/sonos/__init__.py @@ -36,7 +36,6 @@ from music_assistant.common.models.player import DeviceInfo, Player from music_assistant.constants import CONF_CROSSFADE, VERBOSE_LOG_LEVEL from music_assistant.server.helpers.didl_lite import create_didl_metadata from music_assistant.server.models.player_provider import PlayerProvider -from music_assistant.server.providers.ugp import UGP_PREFIX from .player import SonosPlayer @@ -352,16 +351,13 @@ class SonosPlayerProvider(PlayerProvider): ) raise PlayerCommandFailed(msg) - is_flow_stream = queue_item.queue_item_id == "flow" or queue_item.queue_id.startswith( - UGP_PREFIX - ) url = self.mass.streams.resolve_stream_url( - player_id, queue_item=queue_item, output_codec=ContentType.FLAC + player_id, + queue_item=queue_item, + output_codec=ContentType.FLAC, ) self.mass.create_task( - sonos_player.soco.play_uri, - url, - meta=create_didl_metadata(self.mass, url, None if is_flow_stream else queue_item), + sonos_player.soco.play_uri, url, meta=create_didl_metadata(self.mass, url, queue_item) ) async def enqueue_next_queue_item(self, player_id: str, queue_item: QueueItem) -> None: diff --git a/music_assistant/server/server.py b/music_assistant/server/server.py index 1f1d2c73..645e3abb 100644 --- a/music_assistant/server/server.py +++ b/music_assistant/server/server.py @@ -317,7 +317,7 @@ class MusicAssistant: if target is None: msg = "Target is missing" raise RuntimeError(msg) - if existing := self._tracked_tasks.get(task_id): + if task_id and (existing := self._tracked_tasks.get(task_id)): # prevent duplicate tasks if task_id is given and already present return existing if asyncio.iscoroutinefunction(target): @@ -328,19 +328,25 @@ class MusicAssistant: task = target else: # assume normal callable (non coroutine or awaitable) + # that needs to be run in the executor task = self.loop.create_task(asyncio.to_thread(target, *args, **kwargs)) def task_done_callback(_task: asyncio.Future | asyncio.Task) -> None: _task_id = task.task_id self._tracked_tasks.pop(_task_id) - # print unhandled exceptions - if LOGGER.isEnabledFor(logging.DEBUG) and not _task.cancelled() and _task.exception(): - task_name = _task.get_name() if hasattr(_task, "get_name") else _task - LOGGER.exception( - "Exception in task %s - target: %s", + # log unhandled exceptions + if ( + LOGGER.isEnabledFor(logging.DEBUG) + and not _task.cancelled() + and (err := _task.exception()) + ): + task_name = _task.get_name() if hasattr(_task, "get_name") else str(_task) + LOGGER.warning( + "Exception in task %s - target: %s: %s", task_name, str(target), - exc_info=task.exception(), + str(err), + exc_info=err if LOGGER.isEnabledFor(logging.DEBUG) else None, ) if task_id is None: