From 21ed1acacaf84915647354dcfb392f6ebcd27c75 Mon Sep 17 00:00:00 2001 From: Marcel van der Veldt Date: Thu, 21 Mar 2024 01:49:27 +0100 Subject: [PATCH] Performance and stability improvements to streaming (#1156) --- music_assistant/__main__.py | 9 - music_assistant/client/client.py | 15 +- music_assistant/server/controllers/streams.py | 372 ++++++++---------- .../server/controllers/webserver.py | 5 +- music_assistant/server/helpers/audio.py | 34 +- music_assistant/server/helpers/auth.py | 8 +- music_assistant/server/helpers/process.py | 138 +++++-- music_assistant/server/helpers/tags.py | 2 +- .../server/providers/airplay/__init__.py | 174 +++----- .../airplay/bin/cliraop-linux-aarch64 | Bin 2894032 -> 2894032 bytes .../airplay/bin/cliraop-linux-x86_64 | Bin 2968832 -> 2968832 bytes .../providers/airplay/bin/cliraop-macos-arm64 | Bin 189488 -> 189488 bytes .../server/providers/chromecast/__init__.py | 4 +- .../server/providers/dlna/__init__.py | 4 +- .../server/providers/fully_kiosk/__init__.py | 2 +- .../server/providers/hass_players/__init__.py | 4 +- .../server/providers/slimproto/__init__.py | 66 ++-- .../server/providers/snapcast/__init__.py | 134 ++++--- .../server/providers/snapcast/manifest.json | 8 +- .../server/providers/sonos/__init__.py | 19 +- .../server/providers/spotify/__init__.py | 2 +- .../server/providers/ugp/__init__.py | 25 +- music_assistant/server/server.py | 12 +- requirements_all.txt | 2 +- 24 files changed, 547 insertions(+), 492 deletions(-) diff --git a/music_assistant/__main__.py b/music_assistant/__main__.py index 7d1cf1f1..712adc3f 100644 --- a/music_assistant/__main__.py +++ b/music_assistant/__main__.py @@ -29,14 +29,6 @@ MAX_LOG_FILESIZE = 1000000 * 10 # 10 MB ALPINE_RELEASE_FILE = "/etc/alpine-release" -class VerboseLogger(logging.Logger): - """Custom python logger with included verbose log level.""" - - def verbose(self, msg, *args, **kwargs): - """Log a verbose message.""" - self.log(VERBOSE_LOG_LEVEL, msg, *args, **kwargs) - - def get_arguments(): """Arguments handling.""" parser = argparse.ArgumentParser(description="MusicAssistant") @@ -103,7 +95,6 @@ def setup_logger(data_path: str, level: str = "DEBUG"): logger = logging.getLogger() logger.addHandler(file_handler) logging.addLevelName(VERBOSE_LOG_LEVEL, "VERBOSE") - logging.setLoggerClass(VerboseLogger) # apply the configured global log level to the (root) music assistant logger logging.getLogger(ROOT_LOGGER_NAME).setLevel(level) diff --git a/music_assistant/client/client.py b/music_assistant/client/client.py index d3daa74d..d8e70e31 100644 --- a/music_assistant/client/client.py +++ b/music_assistant/client/client.py @@ -9,11 +9,7 @@ import uuid from collections.abc import Callable from typing import TYPE_CHECKING, Any -from music_assistant.client.exceptions import ( - ConnectionClosed, - InvalidServerVersion, - InvalidState, -) +from music_assistant.client.exceptions import ConnectionClosed, InvalidServerVersion, InvalidState from music_assistant.common.models.api import ( ChunkedResultMessage, CommandMessage, @@ -299,9 +295,12 @@ class MusicAssistantClient: return self async def __aexit__( - self, exc_type: Exception, exc_value: str, traceback: TracebackType - ) -> None: - """Disconnect from the server and exit.""" + self, + exc_type: type[BaseException] | None, + exc_val: BaseException | None, + exc_tb: TracebackType | None, + ) -> bool | None: + """Exit context manager.""" await self.disconnect() def __repr__(self) -> str: diff --git a/music_assistant/server/controllers/streams.py b/music_assistant/server/controllers/streams.py index f0f706ff..1b9e2fd9 100644 --- a/music_assistant/server/controllers/streams.py +++ b/music_assistant/server/controllers/streams.py @@ -13,17 +13,13 @@ import logging import time import urllib.parse from collections.abc import AsyncGenerator +from contextlib import asynccontextmanager from typing import TYPE_CHECKING import shortuuid from aiohttp import web -from music_assistant.common.helpers.util import ( - empty_queue, - get_ip, - select_free_port, - try_parse_bool, -) +from music_assistant.common.helpers.util import get_ip, select_free_port, try_parse_bool from music_assistant.common.models.config_entries import ( ConfigEntry, ConfigValueOption, @@ -41,15 +37,18 @@ from music_assistant.constants import ( CONF_OUTPUT_CHANNELS, CONF_PUBLISH_IP, SILENCE_FILE, + VERBOSE_LOG_LEVEL, ) from music_assistant.server.helpers.audio import LOGGER as AUDIO_LOGGER from music_assistant.server.helpers.audio import ( check_audio_support, crossfade_pcm_parts, + get_ffmpeg_args, get_ffmpeg_stream, get_media_stream, get_player_filter_params, ) +from music_assistant.server.helpers.process import AsyncProcess from music_assistant.server.helpers.util import get_ips from music_assistant.server.helpers.webserver import Webserver from music_assistant.server.models.core_controller import CoreController @@ -65,28 +64,30 @@ if TYPE_CHECKING: DEFAULT_STREAM_HEADERS = { "transferMode.dlna.org": "Streaming", "contentFeatures.dlna.org": "DLNA.ORG_OP=00;DLNA.ORG_CI=0;DLNA.ORG_FLAGS=0d500000000000000000000000000000", # noqa: E501 - "Cache-Control": "no-cache", + "Cache-Control": "no-cache,must-revalidate", + "Pragma": "no-cache", "Connection": "close", "Accept-Ranges": "none", - "icy-name": "Music Assistant", - "icy-pub": "0", + "Icy-Name": "Music Assistant", + "Icy-Url": "https://music-assistant.io", } -FLOW_MAX_SAMPLE_RATE = 96000 -FLOW_MAX_BIT_DEPTH = 24 +FLOW_DEFAULT_SAMPLE_RATE = 48000 +FLOW_DEFAULT_BIT_DEPTH = 24 # pylint:disable=too-many-locals -class MultiClientQueueStreamJob: - """Representation of a (multiclient) Audio Queue stream job/task. +class QueueStreamJob: + """ + Representation of a (multiclient) Audio stream job/task. - The whole idea here is that the queue stream audio can be sent to multiple + The whole idea here is that the (pcm) audio source can be sent to multiple players at once. For example for (slimproto/airplay) syncgroups and universal group. - all client players receive the exact same audio chunks from the source audio, - encoded and/or resampled to the player's preferences. - A StreamJob is tied to a Queue and streams the queue flow stream, + + All client players receive the exact same audio chunks from the source audio, + then encoded and/or resampled to the player's preferences. In case a stream is restarted (e.g. when seeking), - a new MultiClientQueueStreamJob will be created. + a new QueueStreamJob will be created. """ _audio_task: asyncio.Task | None = None @@ -96,21 +97,23 @@ class MultiClientQueueStreamJob: mass: MusicAssistant, pcm_audio_source: AsyncGenerator[bytes, None], pcm_format: AudioFormat, - expected_players: set[str], + auto_start: bool = False, ) -> None: - """Initialize MultiClientQueueStreamJob instance.""" + """Initialize QueueStreamJob instance.""" self.mass = mass self.pcm_audio_source = pcm_audio_source self.pcm_format = pcm_format - self.expected_players = expected_players + self.expected_players: set[str] = set() self.job_id = shortuuid.uuid() self.bytes_streamed: int = 0 self.logger = self.mass.streams.logger.getChild(f"stream_job.{self.job_id}") - self._subscribed_players: dict[str, asyncio.Queue] = {} + self._subscribed_players: dict[str, AsyncProcess] = {} self._finished = False self._running = False - self._allow_start = asyncio.Event() + self.allow_start = asyncio.Event() self._audio_task = asyncio.create_task(self._stream_job_runner()) + if auto_start: + self.allow_start.set() @property def finished(self) -> bool: @@ -120,7 +123,7 @@ class MultiClientQueueStreamJob: @property def pending(self) -> bool: """Return if this Job is pending start.""" - return not self.finished and not self._audio_task + return not self.finished and not self.running @property def running(self) -> bool: @@ -131,84 +134,122 @@ class MultiClientQueueStreamJob: """Start running (send audio chunks to connected players).""" if self.finished: raise RuntimeError("Task is already finished") - self._allow_start.set() + self.allow_start.set() def stop(self) -> None: """Stop running this job.""" - if self._audio_task and self._audio_task.done(): - return - if self._audio_task: + if self._audio_task and not self._audio_task.done(): self._audio_task.cancel() self._finished = True - def resolve_stream_url(self, child_player_id: str, output_codec: ContentType) -> str: + def resolve_stream_url(self, player_id: str, output_codec: ContentType) -> str: """Resolve the childplayer specific stream URL to this streamjob.""" fmt = output_codec.value # handle raw pcm if output_codec.is_pcm(): - player = self.mass.streams.mass.players.get(child_player_id) + player = self.mass.streams.mass.players.get(player_id) player_max_bit_depth = 24 if player.supports_24bit else 16 output_sample_rate = min(self.pcm_format.sample_rate, player.max_sample_rate) output_bit_depth = min(self.pcm_format.bit_depth, player_max_bit_depth) output_channels = self.mass.config.get_raw_player_config_value( - child_player_id, CONF_OUTPUT_CHANNELS, "stereo" + player_id, CONF_OUTPUT_CHANNELS, "stereo" ) channels = 1 if output_channels != "stereo" else 2 fmt += ( f";codec=pcm;rate={output_sample_rate};" f"bitrate={output_bit_depth};channels={channels}" ) - url = f"{self.mass.streams._server.base_url}/multi/{self.job_id}/{child_player_id}.{fmt}" - self.expected_players.add(child_player_id) + url = f"{self.mass.streams._server.base_url}/flow/{self.job_id}/{player_id}.{fmt}" + self.expected_players.add(player_id) return url - async def subscribe( + async def iter_player_audio( self, player_id: str, output_format: AudioFormat, chunk_size: int | None = None ) -> AsyncGenerator[bytes, None]: - """Subscribe consumer and iterate chunks on the queue encoded to given output format.""" - async for chunk in get_ffmpeg_stream( - audio_input=self._subscribe_pcm(player_id), + """Subscribe consumer and iterate player-specific audio.""" + ffmpeg_args = get_ffmpeg_args( input_format=self.pcm_format, output_format=output_format, filter_params=get_player_filter_params(self.mass, player_id), - chunk_size=chunk_size, - ): - yield chunk + extra_args=[], + input_path="-", + output_path="-", + loglevel="info" if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL) else "quiet", + ) + # launch ffmpeg process with player specific settings + # the stream_job_runner will start pushing pcm chunks to the stdin + # we then read the players-specific (encoded) output chunks + # from ffmpeg stdout and yield them + async with AsyncProcess( + ffmpeg_args, enable_stdin=True, enable_stdout=True, enable_stderr=False + ) as ffmpeg_proc, self.subscribe(player_id, ffmpeg_proc): + # read final chunks from ffmpeg's stdout + iterator = ( + ffmpeg_proc.iter_chunked(chunk_size) if chunk_size else ffmpeg_proc.iter_any() + ) + async for chunk in iterator: + try: + yield chunk + except (BrokenPipeError, ConnectionResetError): + # race condition? + break - async def _subscribe_pcm(self, player_id: str) -> AsyncGenerator[bytes, None]: - """Subscribe consumer and iterate incoming (raw pcm) chunks on the queue.""" + async def stream_to_custom_output_path( + self, player_id: str, output_format: AudioFormat, output_path: str + ) -> None: + """Subscribe consumer and instruct ffmpeg to send the audio to the given output path.""" + ffmpeg_args = get_ffmpeg_args( + input_format=self.pcm_format, + output_format=output_format, + filter_params=get_player_filter_params(self.mass, player_id), + extra_args=[], + input_path="-", + output_path=output_path, + loglevel="info" if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL) else "quiet", + ) + # launch ffmpeg process with player specific settings + # the stream_job_runner will start pushing pcm chunks to the stdin + # the ffmpeg process will send the output directly to the given path (e.g. tcp socket) + async with AsyncProcess( + ffmpeg_args, + enable_stdin=True, + enable_stdout=False, + enable_stderr=False, + ) as ffmpeg_proc, self.subscribe(player_id, ffmpeg_proc): + # we simply wait for the process to exit + await ffmpeg_proc.wait() + + @asynccontextmanager + async def subscribe( + self, player_id: str, ffmpeg_proc: AsyncProcess + ) -> AsyncGenerator[QueueStreamJob]: + """Subscribe consumer's (output) ffmpeg process.""" + if self.running: + # client subscribes while we're already started + # that will probably cause side effects but let it go + self.logger.warning( + "Player %s is joining while the stream is already started!", player_id + ) try: - self._subscribed_players[player_id] = queue = asyncio.Queue(2) - - if self.running: - # client subscribes while we're already started - # that will probably cause side effects but let it go - self.logger.warning( - "Player %s is joining while the stream is already started!", player_id - ) - else: - self.logger.debug("Subscribed player %s", player_id) - - # yield from queue until finished - while not self._finished: - yield await queue.get() + self._subscribed_players[player_id] = ffmpeg_proc + self.logger.debug("Subscribed player %s", player_id) + yield self finally: - if sub_queue := self._subscribed_players.pop(player_id, None): - empty_queue(sub_queue) + self._subscribed_players.pop(player_id, None) self.logger.debug("Unsubscribed client %s", player_id) # check if this was the last subscriber and we should cancel - await asyncio.sleep(2) + await asyncio.sleep(5) if len(self._subscribed_players) == 0 and not self.finished: self.logger.debug("Cleaning up, all clients disappeared...") self.stop() async def _stream_job_runner(self) -> None: """Feed audio chunks to StreamJob subscribers.""" - await self._allow_start.wait() + await self.allow_start.wait() retries = 50 while retries: retries -= 1 - await asyncio.sleep(0.2) + await asyncio.sleep(0.1) if len(self._subscribed_players) != len(self.expected_players): continue await asyncio.sleep(0.2) @@ -217,15 +258,24 @@ class MultiClientQueueStreamJob: break self.logger.debug( - "Starting multi client stream job %s with %s out of %s connected clients", + "Starting stream job %s with %s out of %s connected clients", self.job_id, len(self._subscribed_players), len(self.expected_players), ) async for chunk in self.pcm_audio_source: + num_subscribers = len(self._subscribed_players) + if num_subscribers == 0: + break async with asyncio.TaskGroup() as tg: - for listener_queue in list(self._subscribed_players.values()): - tg.create_task(listener_queue.put(chunk)) + for ffmpeg_proc in list(self._subscribed_players.values()): + tg.create_task(ffmpeg_proc.write(chunk)) + + # write EOF at end of queue stream + async with asyncio.TaskGroup() as tg: + for ffmpeg_proc in list(self._subscribed_players.values()): + tg.create_task(ffmpeg_proc.write_eof()) + self.logger.debug("Finished stream job %s", self.job_id) self._finished = True @@ -249,12 +299,12 @@ class StreamsController(CoreController): """Initialize instance.""" super().__init__(*args, **kwargs) self._server = Webserver(self.logger, enable_dynamic_routes=True) - self.multi_client_jobs: dict[str, MultiClientQueueStreamJob] = {} + self.stream_jobs: dict[str, QueueStreamJob] = {} self.register_dynamic_route = self._server.register_dynamic_route self.unregister_dynamic_route = self._server.unregister_dynamic_route self.manifest.name = "Streamserver" self.manifest.description = ( - "Music Assistant's core server that is responsible for " + "Music Assistant's core controller that is responsible for " "streaming audio to players on the local network as well as " "some player specific local control callbacks." ) @@ -339,12 +389,7 @@ class StreamsController(CoreController): static_routes=[ ( "*", - "/multi/{job_id}/{player_id}.{fmt}", - self.serve_multi_subscriber_stream, - ), - ( - "*", - "/flow/{queue_id}/{queue_item_id}.{fmt}", + "/flow/{job_id}/{player_id}.{fmt}", self.serve_queue_flow_stream, ), ( @@ -369,7 +414,7 @@ class StreamsController(CoreController): """Cleanup on exit.""" await self._server.close() - async def resolve_stream_url( + def resolve_stream_url( self, player_id: str, queue_item: QueueItem, @@ -381,16 +426,35 @@ class StreamsController(CoreController): # handle announcement item if queue_item.media_type == MediaType.ANNOUNCEMENT: return queue_item.queue_item_id - # handle request for multi client queue stream - stream_job = self.multi_client_jobs.get(queue_item.queue_id) - if queue_item.queue_item_id == "flow" or stream_job and stream_job.pending: + # handle request for (multi client) queue flow stream + if queue_item.queue_item_id in ("flow", queue_item.queue_id) or flow_mode: + # note: this will return an existing streamjonb if that was already created + # e.g. in case of universal group player + pcm_format = AudioFormat( + content_type=ContentType.from_bit_depth(24), + sample_rate=FLOW_DEFAULT_SAMPLE_RATE, + bit_depth=FLOW_DEFAULT_BIT_DEPTH, + ) + stream_job = self.create_stream_job( + queue_item.queue_id, + pcm_audio_source=self.get_flow_stream( + self.mass.player_queues.get(queue_item.queue_id), + start_queue_item=queue_item, + pcm_format=pcm_format, + ), + pcm_format=pcm_format, + auto_start=True, + ) + return stream_job.resolve_stream_url(player_id, output_codec) + # handle raw pcm without exact format specifiers if output_codec.is_pcm() and ";" not in fmt: fmt += f";codec=pcm;rate={44100};bitrate={16};channels={2}" query_params = {} - base_path = "flow" if flow_mode else "single" - url = f"{self._server.base_url}/{base_path}/{queue_item.queue_id}/{queue_item.queue_item_id}.{fmt}" # noqa: E501 + url = ( + f"{self._server.base_url}/single/{queue_item.queue_id}/{queue_item.queue_item_id}.{fmt}" + ) # we add a timestamp as basic checksum # most importantly this is to invalidate any caches # but also to handle edge cases such as single track repeat @@ -398,41 +462,29 @@ class StreamsController(CoreController): url += "?" + urllib.parse.urlencode(query_params) return url - async def create_multi_client_stream_job( + def create_stream_job( self, queue_id: str, - start_queue_item: QueueItem, - pcm_bit_depth: int = 24, - pcm_sample_rate: int = 48000, - expected_players: set[str] | None = None, - ) -> MultiClientQueueStreamJob: + pcm_audio_source: AsyncGenerator[bytes, None], + pcm_format: AudioFormat, + auto_start: bool = False, + ) -> QueueStreamJob: """ - Create a MultiClientQueueStreamJob for the given queue.. + Create a QueueStreamJob for the given queue.. This is called by player/sync group implementations to start streaming the queue audio to multiple players at once. """ - if existing_job := self.multi_client_jobs.get(queue_id, None): + if existing_job := self.stream_jobs.get(queue_id, None): if existing_job.pending: return existing_job # cleanup existing job first existing_job.stop() - queue = self.mass.player_queues.get(queue_id) - pcm_format = AudioFormat( - content_type=ContentType.from_bit_depth(pcm_bit_depth), - sample_rate=pcm_sample_rate, - bit_depth=pcm_bit_depth, - channels=2, - ) - self.multi_client_jobs[queue_id] = stream_job = MultiClientQueueStreamJob( + self.stream_jobs[queue_id] = stream_job = QueueStreamJob( self.mass, - pcm_audio_source=self.get_flow_stream( - queue=queue, - start_queue_item=start_queue_item, - pcm_format=pcm_format, - ), + pcm_audio_source=pcm_audio_source, pcm_format=pcm_format, - expected_players=expected_players or set(), + auto_start=auto_start, ) return stream_job @@ -507,99 +559,8 @@ class StreamsController(CoreController): async def serve_queue_flow_stream(self, request: web.Request) -> web.Response: """Stream Queue Flow audio to player.""" self._log_request(request) - queue_id = request.match_info["queue_id"] - if not (queue := self.mass.player_queues.get(queue_id)): - raise web.HTTPNotFound(reason=f"Unknown Queue: {queue_id}") - start_queue_item_id = request.match_info["queue_item_id"] - start_queue_item = self.mass.player_queues.get_item(queue_id, start_queue_item_id) - if not start_queue_item: - raise web.HTTPNotFound(reason=f"Unknown Queue item: {start_queue_item_id}") - queue_player = self.mass.players.get(queue_id) - # work out output format/details - output_format = await self._get_output_format( - output_format_str=request.match_info["fmt"], - queue_player=queue_player, - default_sample_rate=FLOW_MAX_SAMPLE_RATE, - default_bit_depth=FLOW_MAX_BIT_DEPTH, - ) - # prepare request, add some DLNA/UPNP compatible headers - enable_icy = request.headers.get("Icy-MetaData", "") == "1" - icy_meta_interval = 16384 * 4 if output_format.content_type.is_lossless() else 16384 - headers = { - **DEFAULT_STREAM_HEADERS, - "Content-Type": f"audio/{output_format.output_format_str}", - } - if enable_icy: - headers["icy-metaint"] = str(icy_meta_interval) - - resp = web.StreamResponse( - status=200, - reason="OK", - headers=headers, - ) - await resp.prepare(request) - - # return early if this is not a GET request - if request.method != "GET": - return resp - - # all checks passed, start streaming! - self.logger.debug("Start serving Queue flow audio stream for %s", queue_player.name) - - pcm_format = AudioFormat( - content_type=ContentType.from_bit_depth(output_format.bit_depth), - sample_rate=output_format.sample_rate, - bit_depth=output_format.bit_depth, - channels=2, - ) - async for chunk in get_ffmpeg_stream( - audio_input=self.get_flow_stream( - queue=queue, - start_queue_item=start_queue_item, - pcm_format=pcm_format, - ), - input_format=pcm_format, - output_format=output_format, - filter_params=get_player_filter_params(self.mass, queue_player.player_id), - chunk_size=icy_meta_interval if enable_icy else None, - ): - try: - await resp.write(chunk) - except (BrokenPipeError, ConnectionResetError): - break - - if not enable_icy: - continue - - # if icy metadata is enabled, send the icy metadata after the chunk - if ( - # use current item here and not buffered item, otherwise - # the icy metadata will be too much ahead - (current_item := queue.current_item) - and current_item.streamdetails - and current_item.streamdetails.stream_title - ): - title = current_item.streamdetails.stream_title - elif queue and current_item and current_item.name: - title = current_item.name - else: - title = "Music Assistant" - metadata = f"StreamTitle='{title}';".encode() - if current_item and current_item.image: - metadata += f"StreamURL='{current_item.image.path}'".encode() - while len(metadata) % 16 != 0: - metadata += b"\x00" - length = len(metadata) - length_b = chr(int(length / 16)).encode() - await resp.write(length_b + metadata) - - return resp - - async def serve_multi_subscriber_stream(self, request: web.Request) -> web.Response: - """Stream Queue Flow audio to a child player within a multi subscriber setup.""" - self._log_request(request) job_id = request.match_info["job_id"] - for queue_id, stream_job in self.multi_client_jobs.items(): + for queue_id, stream_job in self.stream_jobs.items(): if stream_job.job_id == job_id: break else: @@ -608,7 +569,6 @@ class StreamsController(CoreController): raise web.HTTPNotFound(reason=f"StreamJob {job_id} already finished") if not (queue := self.mass.player_queues.get(queue_id)): raise web.HTTPNotFound(reason=f"Unknown Queue: {queue_id}") - player_id = request.match_info["player_id"] child_player = self.mass.players.get(player_id) if not child_player: @@ -643,11 +603,11 @@ class StreamsController(CoreController): # all checks passed, start streaming! self.logger.debug( - "Start serving multi-subscriber Queue flow audio stream for queue %s to player %s", + "Start serving Queue flow audio stream for queue %s to player %s", queue.display_name, child_player.display_name, ) - async for chunk in stream_job.subscribe( + async for chunk in stream_job.iter_player_audio( player_id, output_format, chunk_size=icy_meta_interval if enable_icy else None ): try: @@ -784,7 +744,6 @@ class StreamsController(CoreController): assert pcm_format.content_type.is_pcm() queue_track = None last_fadeout_part = b"" - total_bytes_written = 0 queue.flow_mode = True use_crossfade = self.mass.config.get_raw_player_config_value( queue.queue_id, CONF_CROSSFADE, False @@ -845,6 +804,7 @@ class StreamsController(CoreController): ): # ALWAYS APPEND CHUNK TO BUFFER buffer += chunk + del chunk if len(buffer) < buffer_size: # buffer is not full enough, move on continue @@ -874,10 +834,9 @@ class StreamsController(CoreController): #### OTHER: enough data in buffer, feed to output else: - chunk_size = len(chunk) - yield buffer[:chunk_size] - bytes_written += chunk_size - buffer = buffer[chunk_size:] + yield buffer[:pcm_sample_size] + bytes_written += pcm_sample_size + buffer = buffer[pcm_sample_size:] #### HANDLE END OF TRACK if last_fadeout_part: @@ -896,28 +855,27 @@ class StreamsController(CoreController): # no crossfade enabled, just yield the (entire) buffer last part yield buffer bytes_written += len(buffer) - # clear vars - buffer = b"" # update duration details based on the actual pcm data we sent # this also accounts for crossfade and silence stripping - queue_track.streamdetails.seconds_streamed = bytes_written / pcm_sample_size + seconds_streamed = (bytes_written + len(last_fadeout_part)) / pcm_sample_size + queue_track.streamdetails.seconds_streamed = seconds_streamed queue_track.streamdetails.duration = ( - queue_track.streamdetails.seconds_skipped - or 0 + queue_track.streamdetails.seconds_streamed + queue_track.streamdetails.seek_position + seconds_streamed ) - total_bytes_written += bytes_written self.logger.debug( "Finished Streaming queue track: %s (%s) on queue %s - seconds streamed: %s", queue_track.streamdetails.uri, queue_track.name, queue.display_name, - queue_track.streamdetails.seconds_streamed, + seconds_streamed, ) # end of queue flow: make sure we yield the last_fadeout_part if last_fadeout_part: yield last_fadeout_part + del last_fadeout_part + del buffer self.logger.info("Finished Queue Flow stream for Queue %s", queue.display_name) def _log_request(self, request: web.Request) -> None: diff --git a/music_assistant/server/controllers/webserver.py b/music_assistant/server/controllers/webserver.py index 6140b035..12a0987c 100644 --- a/music_assistant/server/controllers/webserver.py +++ b/music_assistant/server/controllers/webserver.py @@ -163,13 +163,13 @@ class WebserverController(CoreController): # also host the audio preview service routes.append(("GET", "/preview", self.serve_preview_stream)) # start the webserver + default_publish_ip = await get_ip() if self.mass.running_as_hass_addon: # if we're running on the HA supervisor the webserver is secured by HA ingress # we only start the webserver on the internal docker network and ingress connects # to that internally and exposes the webUI securely # if a user also wants to expose a the webserver non securely on his internal # network he/she should explicitly do so (and know the risks) - default_publish_ip = await get_ip() self.publish_port = DEFAULT_SERVER_PORT if config.get_value(CONF_EXPOSE_SERVER): bind_ip = "0.0.0.0" @@ -183,7 +183,8 @@ class WebserverController(CoreController): else: base_url = config.get_value(CONF_BASE_URL) self.publish_port = config.get_value(CONF_BIND_PORT) - self.publish_ip = bind_ip = config.get_value(CONF_BIND_IP) + self.publish_ip = default_publish_ip + bind_ip = config.get_value(CONF_BIND_IP) await self._server.setup( bind_ip=bind_ip, bind_port=self.publish_port, diff --git a/music_assistant/server/helpers/audio.py b/music_assistant/server/helpers/audio.py index 32f21912..cb2c9857 100644 --- a/music_assistant/server/helpers/audio.py +++ b/music_assistant/server/helpers/audio.py @@ -192,7 +192,7 @@ async def analyze_loudness(mass: MusicAssistant, streamdetails: StreamDetails) - item_name = f"{streamdetails.provider}/{streamdetails.item_id}" LOGGER.debug("Start analyzing EBU R128 loudness for %s", item_name) # calculate EBU R128 integrated loudness with ffmpeg - ffmpeg_args = _get_ffmpeg_args( + ffmpeg_args = get_ffmpeg_args( input_format=streamdetails.audio_format, output_format=streamdetails.audio_format, filter_params=["loudnorm=print_format=json"], @@ -215,7 +215,7 @@ async def analyze_loudness(mass: MusicAssistant, streamdetails: StreamDetails) - if chunk_count == 600: # safety guard: max (more or less) 10 minutes of audio may be analyzed! break - ffmpeg_proc.write_eof() + await ffmpeg_proc.write_eof() _, stderr = await ffmpeg_proc.communicate() if loudness_details := _parse_loudnorm(stderr): @@ -392,7 +392,6 @@ async def get_media_stream( # noqa: PLR0915 """ logger = LOGGER.getChild("media_stream") bytes_sent = 0 - streamdetails.seconds_skipped = streamdetails.seek_position is_radio = streamdetails.media_type == MediaType.RADIO or not streamdetails.duration if is_radio or streamdetails.seek_position: strip_silence_begin = False @@ -400,7 +399,7 @@ async def get_media_stream( # noqa: PLR0915 pcm_sample_size = int(pcm_format.sample_rate * (pcm_format.bit_depth / 8) * 2) chunk_size = pcm_sample_size * (1 if is_radio else 2) expected_chunks = int((streamdetails.duration or 0) / 2) - if expected_chunks < 60: + if expected_chunks < 10: strip_silence_end = False # collect all arguments for ffmpeg @@ -424,7 +423,7 @@ async def get_media_stream( # noqa: PLR0915 filter_params.append(filter_rule) if streamdetails.fade_in: filter_params.append("afade=type=in:start_time=0:duration=3") - ffmpeg_args = _get_ffmpeg_args( + ffmpeg_args = get_ffmpeg_args( input_format=streamdetails.audio_format, output_format=pcm_format, filter_params=filter_params, @@ -449,7 +448,7 @@ async def get_media_stream( # noqa: PLR0915 async for audio_chunk in music_prov.get_audio_stream(streamdetails, seek_pos): await ffmpeg_proc.write(audio_chunk) # write eof when last packet is received - ffmpeg_proc.write_eof() + await ffmpeg_proc.write_eof() logger.log(VERBOSE_LOG_LEVEL, "writer finished for %s", streamdetails.uri) if streamdetails.direct is None: @@ -485,11 +484,9 @@ async def get_media_stream( # noqa: PLR0915 if prev_chunk: yield prev_chunk bytes_sent += len(prev_chunk) - prev_chunk = chunk - # all chunks received, strip silence of last part - + # all chunks received, strip silence of last part if needed and yield remaining bytes if strip_silence_end and prev_chunk: final_chunk = await strip_silence( mass, @@ -500,9 +497,6 @@ async def get_media_stream( # noqa: PLR0915 ) else: final_chunk = prev_chunk - - # ensure the final chunk is sent - # its important this is done here at the end so we can catch errors first yield final_chunk bytes_sent += len(final_chunk) del final_chunk @@ -727,9 +721,9 @@ async def get_ffmpeg_stream( Takes care of resampling and/or recoding if needed, according to player preferences. """ - logger = LOGGER.getChild("media_stream") + logger = LOGGER.getChild("ffmpeg_stream") use_stdin = not isinstance(audio_input, str) - ffmpeg_args = _get_ffmpeg_args( + ffmpeg_args = get_ffmpeg_args( input_format=input_format, output_format=output_format, filter_params=filter_params or [], @@ -750,7 +744,7 @@ async def get_ffmpeg_stream( if ffmpeg_proc.closed: return await ffmpeg_proc.write(chunk) - ffmpeg_proc.write_eof() + await ffmpeg_proc.write_eof() try: if not isinstance(audio_input, str): @@ -768,7 +762,6 @@ async def get_ffmpeg_stream( if writer_task and not writer_task.done(): writer_task.cancel() # use communicate to read stderr and wait for exit - # read log for loudness measurement (or errors) _, stderr = await ffmpeg_proc.communicate() if ffmpeg_proc.returncode != 0: # ffmpeg has a non zero returncode meaning trouble @@ -787,7 +780,7 @@ async def check_audio_support() -> tuple[bool, bool, str]: version = output.decode().split("ffmpeg version ")[1].split(" ")[0].split("-")[0] libsoxr_support = "enable-libsoxr" in output.decode() result = (ffmpeg_present, libsoxr_support, version) - # store in global cache for easy access by '_get_ffmpeg_args' + # store in global cache for easy access by 'get_ffmpeg_args' await set_global_cache_values({"ffmpeg_support": result}) return result @@ -830,7 +823,7 @@ async def get_preview_stream( async for audio_chunk in music_prov.get_audio_stream(streamdetails, 30): await ffmpeg_proc.write(audio_chunk) # write eof when last packet is received - ffmpeg_proc.write_eof() + await ffmpeg_proc.write_eof() if not streamdetails.direct: writer_task = asyncio.create_task(writer()) @@ -935,13 +928,14 @@ def get_player_filter_params( return filter_params -def _get_ffmpeg_args( +def get_ffmpeg_args( input_format: AudioFormat, output_format: AudioFormat, filter_params: list[str], extra_args: list[str], input_path: str = "-", output_path: str = "-", + loglevel: str = "info", ) -> list[str]: """Collect all args to send to the ffmpeg process.""" ffmpeg_present, libsoxr_support, version = get_global_cache_value("ffmpeg_support") @@ -962,7 +956,7 @@ def _get_ffmpeg_args( "ffmpeg", "-hide_banner", "-loglevel", - "info", + loglevel, "-ignore_unknown", "-protocol_whitelist", "file,http,https,tcp,tls,crypto,pipe,data,fd", diff --git a/music_assistant/server/helpers/auth.py b/music_assistant/server/helpers/auth.py index a057abc2..2700aaf6 100644 --- a/music_assistant/server/helpers/auth.py +++ b/music_assistant/server/helpers/auth.py @@ -3,6 +3,7 @@ from __future__ import annotations import asyncio +from types import TracebackType from typing import TYPE_CHECKING from aiohttp.web import Request, Response @@ -40,7 +41,12 @@ class AuthenticationHelper: ) return self - async def __aexit__(self, exc_type, exc_value, traceback) -> bool: + async def __aexit__( + self, + exc_type: type[BaseException] | None, + exc_val: BaseException | None, + exc_tb: TracebackType | None, + ) -> bool | None: """Exit context manager.""" self.mass.streams.unregister_dynamic_route(f"/callback/{self.session_id}", "GET") diff --git a/music_assistant/server/helpers/process.py b/music_assistant/server/helpers/process.py index 1e9c5272..db40ac74 100644 --- a/music_assistant/server/helpers/process.py +++ b/music_assistant/server/helpers/process.py @@ -1,14 +1,18 @@ -"""Implementation of a (truly) non blocking subprocess. +""" +AsyncProcess. -The subprocess implementation in asyncio can (still) sometimes cause deadlocks, -even when properly handling reading/writes from different tasks. +Wrapper around asyncio subprocess to help with using pipe streams and +taking care of properly closing the process in case of exit (on both success and failures), +without deadlocking. """ from __future__ import annotations import asyncio import logging +import os from contextlib import suppress +from types import TracebackType from typing import TYPE_CHECKING if TYPE_CHECKING: @@ -22,73 +26,101 @@ DEFAULT_CHUNKSIZE = 128000 class AsyncProcess: - """Implementation of a (truly) non blocking subprocess.""" + """ + AsyncProcess. + + Wrapper around asyncio subprocess to help with using pipe streams and + taking care of properly closing the process in case of exit (on both success and failures), + without deadlocking. + """ def __init__( self, - args: list, + args: list[str], enable_stdin: bool = False, enable_stdout: bool = True, enable_stderr: bool = False, ) -> None: - """Initialize.""" - self._proc = None + """Initialize AsyncProcess.""" + self.proc: asyncio.subprocess.Process | None = None self._args = args self._enable_stdin = enable_stdin self._enable_stdout = enable_stdout self._enable_stderr = enable_stderr + self._close_called = False + self._stdin_lock = asyncio.Lock() + self._stdout_lock = asyncio.Lock() + self._stderr_lock = asyncio.Lock() + self._returncode: bool | None = None @property def closed(self) -> bool: """Return if the process was closed.""" - return self.returncode is not None + return self._close_called or self.returncode is not None @property def returncode(self) -> int | None: """Return the erturncode of the process.""" - if self._proc is None: + if self._returncode is not None: + return self._returncode + if self.proc is None: return None - return self._proc.returncode + return self.proc.returncode async def __aenter__(self) -> AsyncProcess: """Enter context manager.""" await self.start() return self - async def __aexit__(self, exc_type, exc_value, traceback) -> bool: + async def __aexit__( + self, + exc_type: type[BaseException] | None, + exc_val: BaseException | None, + exc_tb: TracebackType | None, + ) -> bool | None: """Exit context manager.""" await self.close() + self._returncode = self.returncode + del self.proc + del self._stdin_lock + del self._stdout_lock + del self._returncode async def start(self) -> None: """Perform Async init of process.""" - self._proc = await asyncio.create_subprocess_exec( + self.proc = await asyncio.create_subprocess_exec( *self._args, stdin=asyncio.subprocess.PIPE if self._enable_stdin else None, stdout=asyncio.subprocess.PIPE if self._enable_stdout else None, stderr=asyncio.subprocess.PIPE if self._enable_stderr else None, close_fds=True, ) + proc_name_simple = self._args[0].split(os.sep)[-1] + LOGGER.debug("Started %s with PID %s", proc_name_simple, self.proc.pid) async def iter_chunked(self, n: int = DEFAULT_CHUNKSIZE) -> AsyncGenerator[bytes, None]: """Yield chunks of n size from the process stdout.""" while True: chunk = await self.readexactly(n) - yield chunk - if len(chunk) < n: + if len(chunk) == 0: break + yield chunk async def iter_any(self, n: int = DEFAULT_CHUNKSIZE) -> AsyncGenerator[bytes, None]: """Yield chunks as they come in from process stdout.""" while True: chunk = await self.read(n) - if chunk == b"": + if len(chunk) == 0: break yield chunk async def readexactly(self, n: int) -> bytes: """Read exactly n bytes from the process stdout (or less if eof).""" + if self._close_called or self.proc.stdout.at_eof(): + return b"" try: - return await self._proc.stdout.readexactly(n) + async with self._stdout_lock: + return await self.proc.stdout.readexactly(n) except asyncio.IncompleteReadError as err: return err.partial @@ -99,25 +131,36 @@ class AsyncProcess: and may return less or equal bytes than requested, but at least one byte. If EOF was received before any byte is read, this function returns empty byte object. """ - return await self._proc.stdout.read(n) + if self._close_called or self.proc.stdout.at_eof(): + return b"" + if self.proc.stdout.at_eof(): + return b"" + async with self._stdout_lock: + return await self.proc.stdout.read(n) async def write(self, data: bytes) -> None: """Write data to process stdin.""" - if self.closed or self._proc.stdin.is_closing(): + if self._close_called or self.proc.stdin.is_closing(): return - self._proc.stdin.write(data) - with suppress(BrokenPipeError): - await self._proc.stdin.drain() - - def write_eof(self) -> None: + if not self.proc or self.proc.returncode is not None: + raise RuntimeError("Process not started or already exited") + async with self._stdin_lock: + self.proc.stdin.write(data) + with suppress(BrokenPipeError): + await self.proc.stdin.drain() + + async def write_eof(self) -> None: """Write end of file to to process stdin.""" if not self._enable_stdin: - return - if self.closed or self._proc.stdin.is_closing(): + raise RuntimeError("STDIN is not enabled") + if not self.proc or self.proc.returncode is not None: + raise RuntimeError("Process not started or already exited") + if self._close_called or self.proc.stdin.is_closing(): return try: - if self._proc.stdin.can_write_eof(): - self._proc.stdin.write_eof() + async with self._stdin_lock: + if self.proc.stdin.can_write_eof(): + self.proc.stdin.write_eof() except ( AttributeError, AssertionError, @@ -130,32 +173,45 @@ class AsyncProcess: async def close(self) -> int: """Close/terminate the process and wait for exit.""" - if self.returncode is not None: - return self.returncode - # make sure the process is cleaned up - self._proc.terminate() - try: - async with asyncio.timeout(10): - await self.communicate() - except (TimeoutError, asyncio.CancelledError): - self._proc.kill() - return await self.wait() + self._close_called = True + if self.proc.returncode is None: + # make sure the process is cleaned up + try: + # we need to use communicate to ensure buffers are flushed + await asyncio.wait_for(self.proc.communicate(), 5) + except TimeoutError: + LOGGER.debug( + "Process with PID %s did not stop within 5 seconds. Sending terminate...", + self.proc.pid, + ) + self.proc.terminate() + await self.proc.communicate() + LOGGER.debug( + "Process with PID %s stopped with returncode %s", self.proc.pid, self.proc.returncode + ) + return self.proc.returncode async def wait(self) -> int: """Wait for the process and return the returncode.""" if self.returncode is not None: return self.returncode - return await self._proc.wait() + return await self.proc.wait() async def communicate(self, input_data: bytes | None = None) -> tuple[bytes, bytes]: """Write bytes to process and read back results.""" - stdout, stderr = await self._proc.communicate(input_data) + if self.closed: + return (b"", b"") + async with self._stdout_lock, self._stdin_lock, self._stderr_lock: + stdout, stderr = await self.proc.communicate(input_data) return (stdout, stderr) async def read_stderr(self) -> AsyncGenerator[bytes, None]: """Read lines from the stderr stream.""" - async for line in self._proc.stderr: - yield line + async with self._stderr_lock: + async for line in self.proc.stderr: + if self.closed: + break + yield line async def check_output(shell_cmd: str) -> tuple[int, bytes]: diff --git a/music_assistant/server/helpers/tags.py b/music_assistant/server/helpers/tags.py index d917ef30..73cfeb12 100644 --- a/music_assistant/server/helpers/tags.py +++ b/music_assistant/server/helpers/tags.py @@ -451,7 +451,7 @@ async def get_embedded_image(input_file: str | AsyncGenerator[bytes, None]) -> b if ffmpeg_proc.closed: break await ffmpeg_proc.write(chunk) - ffmpeg_proc.write_eof() + await ffmpeg_proc.write_eof() # feed the file contents to the process stdin if file_path == "-": diff --git a/music_assistant/server/providers/airplay/__init__.py b/music_assistant/server/providers/airplay/__init__.py index 58faa52d..0daee42e 100644 --- a/music_assistant/server/providers/airplay/__init__.py +++ b/music_assistant/server/providers/airplay/__init__.py @@ -7,7 +7,6 @@ import os import platform import socket import time -from collections.abc import AsyncGenerator from contextlib import suppress from dataclasses import dataclass from random import randint, randrange @@ -42,12 +41,8 @@ from music_assistant.common.models.media_items import AudioFormat from music_assistant.common.models.player import DeviceInfo, Player from music_assistant.common.models.player_queue import PlayerQueue from music_assistant.constants import CONF_SYNC_ADJUST, VERBOSE_LOG_LEVEL -from music_assistant.server.helpers.audio import ( - get_ffmpeg_stream, - get_media_stream, - get_player_filter_params, -) -from music_assistant.server.helpers.process import check_output +from music_assistant.server.helpers.audio import get_media_stream +from music_assistant.server.helpers.process import AsyncProcess, check_output from music_assistant.server.models.player_provider import PlayerProvider from music_assistant.server.providers.ugp import UGP_PREFIX @@ -56,6 +51,7 @@ if TYPE_CHECKING: from music_assistant.common.models.provider import ProviderManifest from music_assistant.common.models.queue_item import QueueItem from music_assistant.server import MusicAssistant + from music_assistant.server.controllers.streams import QueueStreamJob from music_assistant.server.models import ProviderInstanceType DOMAIN = "airplay" @@ -109,6 +105,10 @@ CONF_CREDENTIALS = "credentials" CACHE_KEY_PREV_VOLUME = "airplay_prev_volume" FALLBACK_VOLUME = 20 +AIRPLAY_PCM_FORMAT = AudioFormat( + content_type=ContentType.from_bit_depth(16), sample_rate=44100, bit_depth=16 +) + async def setup( mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig @@ -178,23 +178,23 @@ def get_primary_ip_address(discovery_info: AsyncServiceInfo) -> str | None: return None -class AirplayStreamJob: +class AirplayStream: """Object that holds the details of a stream job.""" def __init__(self, prov: AirplayProvider, airplay_player: AirPlayPlayer) -> None: - """Initialize AirplayStreamJob.""" + """Initialize AirplayStream.""" self.prov = prov self.mass = prov.mass self.airplay_player = airplay_player # always generate a new active remote id to prevent race conditions - # with the named pipe used to send commands + # with the named pipe used to send audio self.active_remote_id: str = str(randint(1000, 8000)) self.start_ntp: int | None = None # use as checksum self.prevent_playback: bool = False - self._audio_iterator: AsyncGenerator[bytes, None] | None = None + self.stream_job: QueueStreamJob | None = None self._log_reader_task: asyncio.Task | None = None self._audio_reader_task: asyncio.Task | None = None - self._cliraop_proc: asyncio.subprocess.Process | None = None + self._cliraop_proc: AsyncProcess | None = None self._stop_requested = False @property @@ -206,10 +206,10 @@ class AirplayStreamJob: and self._cliraop_proc.returncode is None ) - async def start(self, start_ntp: int, audio_iterator: AsyncGenerator[bytes, None]) -> None: + async def start(self, start_ntp: int, stream_job: QueueStreamJob) -> None: """Initialize CLIRaop process for a player.""" self.start_ntp = start_ntp - self._audio_iterator = audio_iterator + self.stream_job = stream_job extra_args = [] player_id = self.airplay_player.player_id mass_player = self.mass.players.get(player_id) @@ -220,7 +220,6 @@ class AirplayStreamJob: for prop in ("et", "md", "am", "pk", "pw"): if prop_value := self.airplay_player.discovery_info.decoded_properties.get(prop): extra_args += [f"-{prop}", prop_value] - sync_adjust = self.mass.config.get_raw_player_config_value(player_id, CONF_SYNC_ADJUST, 0) if device_password := self.mass.config.get_raw_player_config_value( player_id, CONF_PASSWORD, None @@ -231,7 +230,7 @@ class AirplayStreamJob: elif self.prov.logger.isEnabledFor(VERBOSE_LOG_LEVEL): extra_args += ["-debug", "10"] - args = [ + cliraop_args = [ self.prov.cliraop_bin, "-ntpstart", str(start_ntp), @@ -247,18 +246,20 @@ class AirplayStreamJob: "-activeremote", self.active_remote_id, "-udn", - str(self.airplay_player.discovery_info.name), + self.airplay_player.discovery_info.name, self.airplay_player.address, "-", ] if platform.system() == "Darwin": os.environ["DYLD_LIBRARY_PATH"] = "/usr/local/lib" - self._cliraop_proc = await asyncio.create_subprocess_exec( - *args, - stdin=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - close_fds=True, + + self._cliraop_proc = AsyncProcess( + cliraop_args, + enable_stdin=True, + enable_stdout=False, + enable_stderr=True, ) + await self._cliraop_proc.start() self._log_reader_task = asyncio.create_task(self._log_watcher()) self._audio_reader_task = asyncio.create_task(self._audio_reader()) @@ -270,26 +271,22 @@ class AirplayStreamJob: # send stop with cli command await self.send_cli_command("ACTION=STOP") - async def wait_for_stop() -> None: + async def _stop() -> None: # always stop the audio feeder if self._audio_reader_task and not self._audio_reader_task.done(): with suppress(asyncio.CancelledError): self._audio_reader_task.cancel() - # make sure stdin is drained (otherwise we'll deadlock) - if self._cliraop_proc and self._cliraop_proc.returncode is None: - if self._cliraop_proc.stdin.can_write_eof(): - self._cliraop_proc.stdin.write_eof() - with suppress(BrokenPipeError): - await self._cliraop_proc.stdin.drain() + await self._cliraop_proc.write_eof() + # the process should exit gracefully after the stop request was processed await asyncio.wait_for(self._cliraop_proc.wait(), 30) - task = self.mass.create_task(wait_for_stop()) + task = self.mass.create_task(_stop()) if wait: await task async def send_cli_command(self, command: str) -> None: """Send an interactive command to the running CLIRaop binary.""" - if not (self._cliraop_proc and self._cliraop_proc.returncode is None): + if not self._cliraop_proc or self._cliraop_proc.closed: return named_pipe = f"/tmp/fifo-{self.active_remote_id}" # noqa: S108 @@ -309,7 +306,7 @@ class AirplayStreamJob: mass_player = self.mass.players.get(airplay_player.player_id) logger = airplay_player.logger lost_packets = 0 - async for line in self._cliraop_proc.stderr: + async for line in self._cliraop_proc.read_stderr(): line = line.decode().strip() # noqa: PLW2901 if not line: continue @@ -350,16 +347,12 @@ class AirplayStreamJob: logger.log(VERBOSE_LOG_LEVEL, line) # if we reach this point, the process exited - logger.debug( - "CLIRaop process stopped with errorcode %s", - self._cliraop_proc.returncode, - ) if airplay_player.active_stream == self: mass_player.state = PlayerState.IDLE self.mass.players.update(airplay_player.player_id) async def _audio_reader(self) -> None: - """Read audio chunks and send them to the cliraop process.""" + """Send audio chunks to the cliraop process.""" logger = self.airplay_player.logger mass_player = self.mass.players.get(self.airplay_player.player_id, True) queue = self.mass.player_queues.get_active_queue(mass_player.active_source) @@ -370,16 +363,13 @@ class AirplayStreamJob: ) prev_metadata_checksum: str = "" prev_progress_report: float = 0 - async for chunk in self._audio_iterator: - if not self.running: - return - self._cliraop_proc.stdin.write(chunk) - try: - await self._cliraop_proc.stdin.drain() - except (BrokenPipeError, ConnectionResetError): - break - if not self.running: + + async for chunk in self.stream_job.iter_player_audio( + self.airplay_player.player_id, AIRPLAY_PCM_FORMAT + ): + if self._stop_requested: return + await self._cliraop_proc.write(chunk) # send metadata to player(s) if needed # NOTE: this must all be done in separate tasks to not disturb audio now = time.time() @@ -397,10 +387,7 @@ class AirplayStreamJob: prev_progress_report = now self.mass.create_task(self._send_progress(queue)) # send EOF - if self._cliraop_proc.returncode is None and not self._cliraop_proc.stdin.is_closing(): - self._cliraop_proc.stdin.write_eof() - with suppress(BrokenPipeError, ConnectionResetError): - await self._cliraop_proc.stdin.drain() + await self._cliraop_proc.write_eof() logger.debug( "Finished RAOP stream for Queue %s to %s", queue.display_name, @@ -465,7 +452,7 @@ class AirPlayPlayer: discovery_info: AsyncServiceInfo address: str logger: logging.Logger - active_stream: AirplayStreamJob | None = None + active_stream: AirplayStream | None = None class AirplayProvider(PlayerProvider): @@ -620,39 +607,30 @@ class AirplayProvider(PlayerProvider): for airplay_player in self._get_sync_clients(player_id): if airplay_player.active_stream and airplay_player.active_stream.running: await airplay_player.active_stream.stop(wait=False) - pcm_format = AudioFormat( - content_type=ContentType.PCM_S16LE, - sample_rate=44100, - bit_depth=16, - channels=2, - ) - if queue_item.media_type == MediaType.ANNOUNCEMENT: - # stream announcement url directly - stream_job = None - elif ( - queue_item.queue_id.startswith(UGP_PREFIX) - and (stream_job := self.mass.streams.multi_client_jobs.get(queue_item.queue_id)) - and stream_job.pending - ): - # handle special case for UGP multi client stream - pass - elif player.group_childs: - # create a new multi client flow stream - stream_job = await self.mass.streams.create_multi_client_stream_job( - queue_item.queue_id, - queue_item, - pcm_bit_depth=16, - pcm_sample_rate=44100, - ) + + if queue_item.queue_id.startswith(UGP_PREFIX): + # special case: we got forwarded a request from the UGP + # use the existing stream job that was already created by UGP + stream_job = self.mass.streams.stream_jobs[queue_item.queue_id] else: - # for a single player we just consume the flow stream directly - stream_job = None + if queue_item.media_type == MediaType.ANNOUNCEMENT: + # stream announcement url directly + audio_source = get_media_stream( + self.mass, queue_item.streamdetails, pcm_format=AIRPLAY_PCM_FORMAT + ) + else: + queue = self.mass.player_queues.get(queue_item.queue_id) + audio_source = self.mass.streams.get_flow_stream( + queue, start_queue_item=queue_item, pcm_format=AIRPLAY_PCM_FORMAT + ) + stream_job = self.mass.streams.create_stream_job( + queue_item.queue_id, pcm_audio_source=audio_source, pcm_format=AIRPLAY_PCM_FORMAT + ) - # Python is not suitable for realtime audio streaming. - # So, I've decided to go the fancy route here. I've created a small binary - # written in C based on libraop to do the actual timestamped playback. - # the raw pcm audio is fed to the stdin of this cliraop binary and we can - # send some commands over a named pipe. + # Python is not suitable for realtime audio streaming so we do the actual streaming + # of (RAOP) audio using a small executable written in C based on libraop to do the actual + # timestamped playback. The raw pcm audio is fed to a named pipe, read by the executable + # and we can send some ingteractie commands to the process stdin. # get current ntp before we start _, stdout = await check_output(f"{self.cliraop_bin} -ntp") @@ -661,32 +639,10 @@ class AirplayProvider(PlayerProvider): # setup Raop process for player and its sync childs async with asyncio.TaskGroup() as tg: for airplay_player in self._get_sync_clients(player_id): - if stream_job: - stream_job.expected_players.add(airplay_player.player_id) - audio_iterator = stream_job.subscribe( - player_id=airplay_player.player_id, - output_format=pcm_format, - ) - elif queue_item.media_type == MediaType.ANNOUNCEMENT: - # stream announcement url directly - audio_iterator = get_media_stream( - self.mass, queue_item.streamdetails, pcm_format=pcm_format - ) - else: - queue = self.mass.player_queues.get_active_queue(queue_item.queue_id) - audio_iterator = get_ffmpeg_stream( - self.mass.streams.get_flow_stream( - queue, - start_queue_item=queue_item, - pcm_format=pcm_format, - ), - input_format=pcm_format, - output_format=pcm_format, - filter_params=get_player_filter_params(self.mass, airplay_player.player_id), - ) - airplay_player.active_stream = AirplayStreamJob(self, airplay_player) - tg.create_task(airplay_player.active_stream.start(start_ntp, audio_iterator)) - if stream_job and queue_item.queue_item_id != "flow": + stream_job.expected_players.add(airplay_player.player_id) + airplay_player.active_stream = AirplayStream(self, airplay_player) + tg.create_task(airplay_player.active_stream.start(start_ntp, stream_job)) + if not queue_item.queue_id.startswith(UGP_PREFIX): stream_job.start() async def cmd_volume_set(self, player_id: str, volume_level: int) -> None: @@ -929,7 +885,7 @@ class AirplayProvider(PlayerProvider): self.mass.create_task(self.mass.players.cmd_volume_down(player_id)) elif path == "/ctrl-int/1/shuffle_songs": queue = self.mass.player_queues.get(player_id) - self.mass.create_task( + self.mass.loop.call_soon( self.mass.player_queues.set_shuffle( active_queue.queue_id, not queue.shuffle_enabled ) diff --git a/music_assistant/server/providers/airplay/bin/cliraop-linux-aarch64 b/music_assistant/server/providers/airplay/bin/cliraop-linux-aarch64 index 4507cb42bf99075522770c042ddea70c9f9d81e5..2e0a8e326566f53a3c36f345f2dd50135c24f218 100755 GIT binary patch delta 229 zcmWN=yAHu%0D#f>>sC~?>aKOa6fa>HgT!Joup4WTNDLl9;uS2mt_BeiL(O(?AYu{U zNsjk8`aB+I!uXk|gT1^B&dTtmjTfK!_pWXnv2}!x!gYjXUc_qc?yl0L@7MRE>Gm!A z8DNkhhAA_`C}UI@r^*C1>P*sLifLw;WsZ3kSfojdC6-xXl{MDcV3RGjX|uzw`<8o8 F>knAuU6cR- delta 229 zcmWN=xemc_0Knn&-#Y3(>L^;bIy#w55|6+rv6(E^?uNdAguye2LDC11GzJ5+#RC|1 z_xqCX`MO9?Ue-c;%>BvloUym7v1U!Z@9B2ccaCr!AyIgakPTmKj^5!eFK0?;zxcR; zsxZVbBaBjIjBzHIWQu8Km}QQ6YAmqG63f(CVU;x+G+AeZO}5x(hh6q)vCjdA9C?9i Gzs*13R9zbY diff --git a/music_assistant/server/providers/airplay/bin/cliraop-linux-x86_64 b/music_assistant/server/providers/airplay/bin/cliraop-linux-x86_64 index 8661219a46a91138450853158af63773d39049b5..ed7ddfbc1eb964bd81ab4702aa31bd8dc561a0cb 100755 GIT binary patch delta 232 zcmWl|HwwZ47=TgYZ|^1c-s>H_fUANxP;hq25OByO1aWfl1cHMS2RC=oONcnSWcBmG z(|2b@=W!@2#1KaUNhnAmjSN&|kwYE@6j4GM6;x3}9Stc%=cDPnA7HXTc_8fjDR^(VAk&drnBrI*7Va6On;e|X8qoNI+7`t ziP`PqukG3~OnW()^?-)FPGRC@1G9c-G5IoqHF)PT?FE~`oyYVREVTW10aIzN!g|iz z?_(R68s7iCb@tI{)vj704Wl{CJUWbZ`8zommu{av;T}_i!|L$oVm>qUzb-EbbZNO7 z8{*RBuHNJ!Rl&=5H_7~B^^cprKbMxW?1*$Icb}EGy^`su`PrpEuQSEzM&A6KB&6fD zJ>Wi50<*%MM~+>6i8gLw(fto^@H~&)e(T}7XRiDT)0V0^uYBeFXZnRlObYcaAFF-V z|LH9By_~Rx`Pz(0Kh`HHEeZT4(c#Q{M}_N5%!HY%tS*S}IWR5!nc_jcHOIav3viyD zb$5|}|C~cM*~P5iX03Qt(|_;2r1m7?rs9|rZyRi#Qmw=P7x|Pe<^PBPV^n%As2LLj9(&YdE delta 453 zcmdmRf_noHtzZ>czJ3=M!%?fL3qGnHYW{4Ozj+61WH3|KjLpK4QQRylQ-U5e7p814 zOkqqv%KT39=Jwl~OudX?R<<_NUMVn3+l}cg7xR0d?C$`kv+N+&^wSYcf0=`(f7pIH zk|~yn`7KaVJBDd52eZEMhwZOZn0VR1T7PFT`7(i6)4g+<_JW1D^O)X(g|`1LU@Fa3 zxR-wJU3$?b>C{b!JG*@A4~WTGPmMV|D^v86#Ph7^ymiwj++%8RIHo23xvzPq#MOE( z-O!_xXSGk9#4nVzrK~Xifw|T0#3e_RZVFw9pRpu9<-o+CSfw7rKDLkytw86pTP!>i z3wJne54g{iz^u@$zQ1}^g5j0U-2t3>CJ}E>3++0%OLF;1m#h3Q+fp5-PQUPoNumCl zoLx{`5_vrk3>jQozw`YU#aQGnqa)9B?Q o9vzhpy{uj None: """Handle enqueuing of the next queue item on the player.""" dlna_player = self.dlnaplayers[player_id] - url = await self.mass.streams.resolve_stream_url( + url = self.mass.streams.resolve_stream_url( player_id, queue_item=queue_item, output_codec=ContentType.FLAC, diff --git a/music_assistant/server/providers/fully_kiosk/__init__.py b/music_assistant/server/providers/fully_kiosk/__init__.py index d51e6589..2c296b2a 100644 --- a/music_assistant/server/providers/fully_kiosk/__init__.py +++ b/music_assistant/server/providers/fully_kiosk/__init__.py @@ -186,7 +186,7 @@ class FullyKioskProvider(PlayerProvider): """Handle PLAY MEDIA on given player.""" player = self.mass.players.get(player_id) enforce_mp3 = await self.mass.config.get_player_config_value(player_id, CONF_ENFORCE_MP3) - url = await self.mass.streams.resolve_stream_url( + url = self.mass.streams.resolve_stream_url( player_id, queue_item=queue_item, output_codec=ContentType.MP3 if enforce_mp3 else ContentType.FLAC, diff --git a/music_assistant/server/providers/hass_players/__init__.py b/music_assistant/server/providers/hass_players/__init__.py index 10a275d6..dd726857 100644 --- a/music_assistant/server/providers/hass_players/__init__.py +++ b/music_assistant/server/providers/hass_players/__init__.py @@ -253,7 +253,7 @@ class HomeAssistantPlayers(PlayerProvider): """Handle PLAY MEDIA on given player.""" use_flow_mode = await self.mass.config.get_player_config_value(player_id, CONF_FLOW_MODE) enforce_mp3 = await self.mass.config.get_player_config_value(player_id, CONF_ENFORCE_MP3) - url = await self.mass.streams.resolve_stream_url( + url = self.mass.streams.resolve_stream_url( player_id, queue_item=queue_item, output_codec=ContentType.MP3 if enforce_mp3 else ContentType.FLAC, @@ -288,7 +288,7 @@ class HomeAssistantPlayers(PlayerProvider): This will NOT be called if the end of the queue is reached (and repeat disabled). This will NOT be called if the player is using flow mode to playback the queue. """ - url = await self.mass.streams.resolve_stream_url( + url = self.mass.streams.resolve_stream_url( player_id, queue_item=queue_item, output_codec=ContentType.FLAC, diff --git a/music_assistant/server/providers/slimproto/__init__.py b/music_assistant/server/providers/slimproto/__init__.py index 6e20cf3a..08caea85 100644 --- a/music_assistant/server/providers/slimproto/__init__.py +++ b/music_assistant/server/providers/slimproto/__init__.py @@ -44,6 +44,7 @@ from music_assistant.common.models.enums import ( RepeatMode, ) from music_assistant.common.models.errors import MusicAssistantError, SetupFailedError +from music_assistant.common.models.media_items import AudioFormat from music_assistant.common.models.player import DeviceInfo, Player from music_assistant.constants import ( CONF_CROSSFADE, @@ -55,6 +56,7 @@ from music_assistant.constants import ( VERBOSE_LOG_LEVEL, ) from music_assistant.server.models.player_provider import PlayerProvider +from music_assistant.server.providers.ugp import UGP_PREFIX if TYPE_CHECKING: from aioslimproto.models import SlimEvent @@ -79,10 +81,10 @@ STATE_MAP = { REPEATMODE_MAP = {RepeatMode.OFF: 0, RepeatMode.ONE: 1, RepeatMode.ALL: 2} # sync constants -MIN_DEVIATION_ADJUST = 6 # 6 milliseconds +MIN_DEVIATION_ADJUST = 8 # 5 milliseconds MIN_REQ_PLAYPOINTS = 8 # we need at least 8 measurements -DEVIATION_JUMP_IGNORE = 2000 # ignore a sudden unrealistic jump -MAX_SKIP_AHEAD_MS = 500 # 0.5 seconds +DEVIATION_JUMP_IGNORE = 5000 # ignore a sudden unrealistic jump +MAX_SKIP_AHEAD_MS = 800 # 0.8 seconds @dataclass @@ -108,10 +110,10 @@ DEFAULT_VISUALIZATION = SlimVisualisationType.SPECTRUM_ANALYZER.value CONF_ENTRY_DISPLAY = ConfigEntry( key=CONF_DISPLAY, type=ConfigEntryType.BOOLEAN, - default_value=True, + default_value=False, required=False, label="Enable display support", - description="Enable/disable native display support on " "squeezebox or squeezelite32 hardware.", + description="Enable/disable native display support on squeezebox or squeezelite32 hardware.", advanced=True, ) CONF_ENTRY_VISUALIZATION = ConfigEntry( @@ -216,6 +218,7 @@ class SlimprotoProvider(PlayerProvider): slimproto: SlimServer _sync_playpoints: dict[str, deque[SyncPlayPoint]] + _do_not_resync_before: dict[str, float] @property def supported_features(self) -> tuple[ProviderFeature, ...]: @@ -225,6 +228,7 @@ class SlimprotoProvider(PlayerProvider): async def handle_async_init(self) -> None: """Handle async initialization of the provider.""" self._sync_playpoints = {} + self._do_not_resync_before = {} self._resync_handle: asyncio.TimerHandle | None = None control_port = self.config.get_value(CONF_PORT) telnet_port = self.config.get_value(CONF_CLI_TELNET_PORT) @@ -335,17 +339,30 @@ class SlimprotoProvider(PlayerProvider): if player.synced_to: msg = "A synced player cannot receive play commands directly" raise RuntimeError(msg) - enforce_mp3 = await self.mass.config.get_player_config_value(player_id, CONF_ENFORCE_MP3) + if player.group_childs: - # player has sync members, we need to start a multi slimplayer stream job - stream_job = await self.mass.streams.create_multi_client_stream_job( + # player has sync members, we need to start a (multi-player) stream job + # to make sure that all clients receive the exact same audio + pcm_format = AudioFormat( + content_type=ContentType.from_bit_depth(24), sample_rate=48000, bit_depth=24 + ) + queue = self.mass.player_queues.get(queue_item.queue_id) + stream_job = self.mass.streams.create_stream_job( queue_id=queue_item.queue_id, - start_queue_item=queue_item, + pcm_audio_source=self.mass.streams.get_flow_stream( + queue, + start_queue_item=queue_item, + pcm_format=pcm_format, + ), + pcm_format=pcm_format, ) # forward command to player and any connected sync members sync_clients = self._get_sync_clients(player_id) async with asyncio.TaskGroup() as tg: for slimplayer in sync_clients: + enforce_mp3 = await self.mass.config.get_player_config_value( + slimplayer.player_id, CONF_ENFORCE_MP3 + ) tg.create_task( self._handle_play_url( slimplayer, @@ -358,18 +375,19 @@ class SlimprotoProvider(PlayerProvider): auto_play=False, ) ) - if queue_item.queue_item_id != "flow": + if not queue_item.queue_id.startswith(UGP_PREFIX): stream_job.start() else: # regular, single player playback slimplayer = self.slimproto.get_player(player_id) if not slimplayer: return - url = await self.mass.streams.resolve_stream_url( + enforce_mp3 = await self.mass.config.get_player_config_value( + player_id, CONF_ENFORCE_MP3 + ) + url = self.mass.streams.resolve_stream_url( player_id, queue_item=queue_item, - # for now just hardcode flac as we assume that every (modern) - # slimproto based player can handle that just fine output_codec=ContentType.MP3 if enforce_mp3 else ContentType.FLAC, flow_mode=False, ) @@ -386,7 +404,7 @@ class SlimprotoProvider(PlayerProvider): if not (slimplayer := self.slimproto.get_player(player_id)): return enforce_mp3 = await self.mass.config.get_player_config_value(player_id, CONF_ENFORCE_MP3) - url = await self.mass.streams.resolve_stream_url( + url = self.mass.streams.resolve_stream_url( player_id, queue_item=queue_item, output_codec=ContentType.MP3 if enforce_mp3 else ContentType.FLAC, @@ -682,9 +700,9 @@ class SlimprotoProvider(PlayerProvider): self.mass.player_queues.set_shuffle(queue.queue_id, not queue.shuffle_enabled) slimplayer.extra_data["playlist shuffle"] = int(queue.shuffle_enabled) slimplayer.signal_update() - elif event.data == "button jump_fwd": + elif event.data in ("button jump_fwd", "button fwd"): await self.mass.player_queues.next(queue.queue_id) - elif event.data == "button jump_rew": + elif event.data in ("button jump_rew", "button rew"): await self.mass.player_queues.previous(queue.queue_id) elif event.data.startswith("time "): # seek request @@ -715,12 +733,15 @@ class SlimprotoProvider(PlayerProvider): sync_playpoints = self._sync_playpoints[slimplayer.player_id] active_queue = self.mass.player_queues.get_active_queue(slimplayer.player_id) - stream_job = self.mass.streams.multi_client_jobs.get(active_queue.queue_id) + stream_job = self.mass.streams.stream_jobs.get(active_queue.queue_id) if not stream_job: # should not happen, but just in case return now = time.time() + if now < self._do_not_resync_before[slimplayer.player_id]: + return + last_playpoint = sync_playpoints[-1] if sync_playpoints else None if last_playpoint and (now - last_playpoint.timestamp) > 10: # last playpoint is too old, invalidate @@ -741,7 +762,8 @@ class SlimprotoProvider(PlayerProvider): # we can now append the current playpoint to our list sync_playpoints.append(SyncPlayPoint(now, stream_job.job_id, diff)) - if len(sync_playpoints) < MIN_REQ_PLAYPOINTS: + min_req_playpoints = 2 if sync_master.elapsed_seconds < 2 else MIN_REQ_PLAYPOINTS + if len(sync_playpoints) < min_req_playpoints: return # get the average diff @@ -753,22 +775,20 @@ class SlimprotoProvider(PlayerProvider): # resync the player by skipping ahead or pause for x amount of (milli)seconds sync_playpoints.clear() + self._do_not_resync_before[player.player_id] = now + 5 if avg_diff > MAX_SKIP_AHEAD_MS: # player lagging behind more than MAX_SKIP_AHEAD_MS, # we need to correct the sync_master self.logger.debug("%s resync: pauseFor %sms", sync_master.name, delta) self.mass.create_task(sync_master.pause_for(delta)) - sync_master._elapsed_milliseconds -= delta elif avg_diff > 0: # handle player lagging behind, fix with skip_ahead self.logger.debug("%s resync: skipAhead %sms", player.display_name, delta) self.mass.create_task(slimplayer.skip_over(delta)) - sync_master._elapsed_milliseconds += delta else: # handle player is drifting too far ahead, use pause_for to adjust self.logger.debug("%s resync: pauseFor %sms", player.display_name, delta) self.mass.create_task(slimplayer.pause_for(delta)) - sync_master._elapsed_milliseconds -= delta async def _handle_buffer_ready(self, slimplayer: SlimClient) -> None: """Handle buffer ready event, player has buffered a (new) track. @@ -799,12 +819,13 @@ class SlimprotoProvider(PlayerProvider): async with asyncio.TaskGroup() as tg: for _client in self._get_sync_clients(player.player_id): self._sync_playpoints.setdefault( - _client.player_id, deque(maxlen=MIN_REQ_PLAYPOINTS * 2) + _client.player_id, deque(maxlen=MIN_REQ_PLAYPOINTS) ).clear() # NOTE: Officially you should do an unpause_at based on the player timestamp # but I did not have any good results with that. # Instead just start playback on all players and let the sync logic work out # the delays etc. + self._do_not_resync_before[_client.player_id] = time.time() + 1 tg.create_task(_client.unpause_at(0)) async def _handle_connected(self, slimplayer: SlimClient) -> None: @@ -830,6 +851,7 @@ class SlimprotoProvider(PlayerProvider): init_volume = DEFAULT_PLAYER_VOLUME init_power = False await slimplayer.power(init_power) + await slimplayer.stop() await slimplayer.volume_set(init_volume) def _get_sync_clients(self, player_id: str) -> Iterator[SlimClient]: diff --git a/music_assistant/server/providers/snapcast/__init__.py b/music_assistant/server/providers/snapcast/__init__.py index b6becf6a..0fa541b3 100644 --- a/music_assistant/server/providers/snapcast/__init__.py +++ b/music_assistant/server/providers/snapcast/__init__.py @@ -4,13 +4,17 @@ from __future__ import annotations import asyncio import random +import socket import time from contextlib import suppress from typing import TYPE_CHECKING, cast from snapcast.control import create_server from snapcast.control.client import Snapclient +from zeroconf import NonUniqueNameException +from zeroconf.asyncio import AsyncServiceInfo +from music_assistant.common.helpers.util import get_ip_pton from music_assistant.common.models.config_entries import ( CONF_ENTRY_CROSSFADE, CONF_ENTRY_CROSSFADE_DURATION, @@ -136,6 +140,7 @@ class SnapCastProvider(PlayerProvider): self._snapcast_server_control_port = self.config.get_value(CONF_SERVER_CONTROL_PORT) self._use_builtin_server = not self.config.get_value(CONF_USE_EXTERNAL_SERVER) self._stream_tasks = {} + if self._use_builtin_server: # start our own builtin snapserver self._snapserver_started = asyncio.Event() @@ -169,11 +174,17 @@ class SnapCastProvider(PlayerProvider): """Handle close/cleanup of the provider.""" for client in self._snapserver.clients: await self.cmd_stop(client.identifier) - await self._snapserver.stop() - self._snapserver_started.clear() if self._snapserver_runner and not self._snapserver_runner.done(): self._snapserver_runner.cancel() - await asyncio.sleep(2) # prevent race conditions when reloading + await asyncio.sleep(6) # prevent race conditions when reloading + await self._snapserver.stop() + self._snapserver_started.clear() + + def on_player_config_removed(self, player_id: str) -> None: + """Call (by config manager) when the configuration of a player is removed.""" + super().on_player_config_removed(player_id) + if self._use_builtin_server: + self.mass.create_task(self._snapserver.delete_client(player_id)) def _handle_update(self) -> None: """Process Snapcast init Player/Group and set callback .""" @@ -297,60 +308,59 @@ class SnapCastProvider(PlayerProvider): bit_depth=16, channels=2, ) - if queue_item.media_type == MediaType.ANNOUNCEMENT: - # stream announcement url directly - audio_iterator = get_media_stream( - self.mass, queue_item.streamdetails, pcm_format=pcm_format - ) - elif ( - queue_item.queue_id.startswith(UGP_PREFIX) - and (stream_job := self.mass.streams.multi_client_jobs.get(queue_item.queue_id)) - and stream_job.pending - ): - # handle special case for UGP multi client stream - stream_job = self.mass.streams.multi_client_jobs.get(queue_item.queue_id) - stream_job.expected_players.add(player_id) - audio_iterator = stream_job.subscribe( - player_id=player_id, - output_format=pcm_format, - ) + + if queue_item.queue_id.startswith(UGP_PREFIX): + # special case: we got forwarded a request from the UGP + # use the existing stream job that was already created by UGP + stream_job = self.mass.streams.stream_jobs[queue_item.queue_id] else: - audio_iterator = self.mass.streams.get_flow_stream( - queue, - start_queue_item=queue_item, - pcm_format=pcm_format, + if queue_item.media_type == MediaType.ANNOUNCEMENT: + # stream announcement url directly + audio_source = get_media_stream( + self.mass, queue_item.streamdetails, pcm_format=pcm_format + ) + else: + queue = self.mass.player_queues.get(queue_item.queue_id) + audio_source = self.mass.streams.get_flow_stream( + queue, start_queue_item=queue_item, pcm_format=pcm_format + ) + stream_job = self.mass.streams.create_stream_job( + queue_item.queue_id, pcm_audio_source=audio_source, pcm_format=pcm_format ) + stream_job.expected_players.add(player_id) async def _streamer() -> None: host = self._snapcast_server_host - _, writer = await asyncio.open_connection(host, port) - self.logger.debug("Opened connection to %s:%s", host, port) - player.current_item_id = f"{queue_item.queue_id}.{queue_item.queue_item_id}" - player.elapsed_time = 0 - player.elapsed_time_last_updated = time.time() - player.state = PlayerState.PLAYING - self._set_childs_state(player_id, PlayerState.PLAYING) - self.mass.players.register_or_update(player) + self.mass.players.update(player_id) + + def stream_callback(_stream) -> None: + player.state = PlayerState(stream.status) + if player.state == PlayerState.PLAYING: + player.current_item_id = f"{queue_item.queue_id}.{queue_item.queue_item_id}" + player.elapsed_time = 0 + player.elapsed_time_last_updated = time.time() + self._set_childs_state(player_id, player.state) + + stream.set_callback(stream_callback) + stream_path = f"tcp://{host}:{port}" + self.logger.debug("Start streaming to %s", stream_path) try: - async for pcm_chunk in audio_iterator: - writer.write(pcm_chunk) - await writer.drain() - # end of the stream reached - if writer.can_write_eof(): - writer.write_eof() - await writer.drain() - # we need to wait a bit before removing the stream to ensure - # that all snapclients have consumed the audio - # https://github.com/music-assistant/hass-music-assistant/issues/1962 - await asyncio.sleep(30) + await stream_job.stream_to_custom_output_path( + player_id, pcm_format, f"tcp://{host}:{port}" + ) + # we need to wait a bit for the stream status to become idle + # to ensure that all snapclients have consumed the audio + await self.mass.players.wait_for_state(player, PlayerState.IDLE) finally: - if not writer.is_closing(): - writer.close() + self.logger.debug("Finished streaming to %s", stream_path) + # there is no way to unsub the callback to we do this nasty + stream._callback_func = None await self._snapserver.stream_remove_stream(stream.identifier) - self.logger.debug("Closed connection to %s:%s", host, port) # start streaming the queue (pcm) audio in a background task self._stream_tasks[player_id] = asyncio.create_task(_streamer()) + if not queue_item.queue_id.startswith(UGP_PREFIX): + stream_job.start() def _get_snapgroup(self, player_id: str) -> Snapgroup: """Get snapcast group for given player_id.""" @@ -416,6 +426,38 @@ class SnapCastProvider(PlayerProvider): raise RuntimeError("Snapserver is already started!") logger = self.logger.getChild("snapserver") logger.info("Starting builtin Snapserver...") + # register the snapcast mdns services + for name, port in ( + ("-http", 1780), + ("-jsonrpc", 1705), + ("-stream", 1704), + ("-tcp", 1705), + ("", 1704), + ): + zeroconf_type = f"_snapcast{name}._tcp.local." + try: + info = AsyncServiceInfo( + zeroconf_type, + name=f"Snapcast.{zeroconf_type}", + properties={"is_mass": "true"}, + addresses=[await get_ip_pton(self.mass.webserver.publish_ip)], + port=port, + server=f"{socket.gethostname()}", + ) + attr_name = f"zc_service_set{name}" + if getattr(self, attr_name, None): + await self.mass.aiozc.async_update_service(info) + else: + await self.mass.aiozc.async_register_service(info, strict=False) + setattr(self, attr_name, True) + except NonUniqueNameException: + self.logger.debug( + "Could not register mdns record for %s as its already in use", zeroconf_type + ) + except Exception as err: + self.logger.exception( + "Could not register mdns record for %s: %s", zeroconf_type, str(err) + ) async with AsyncProcess( ["snapserver"], enable_stdin=False, enable_stdout=True, enable_stderr=False ) as snapserver_proc: diff --git a/music_assistant/server/providers/snapcast/manifest.json b/music_assistant/server/providers/snapcast/manifest.json index a31199f7..d5938754 100644 --- a/music_assistant/server/providers/snapcast/manifest.json +++ b/music_assistant/server/providers/snapcast/manifest.json @@ -3,8 +3,12 @@ "domain": "snapcast", "name": "Snapcast", "description": "Support for snapcast server and clients.", - "codeowners": ["@SantigoSotoC"], - "requirements": ["snapcast-mod==2.4.3"], + "codeowners": [ + "@SantigoSotoC" + ], + "requirements": [ + "snapcast==2.3.6" + ], "documentation": "https://music-assistant.io/player-support/snapcast/", "multi_instance": false, "builtin": false, diff --git a/music_assistant/server/providers/sonos/__init__.py b/music_assistant/server/providers/sonos/__init__.py index 0abe5b53..e69482b1 100644 --- a/music_assistant/server/providers/sonos/__init__.py +++ b/music_assistant/server/providers/sonos/__init__.py @@ -36,6 +36,7 @@ from music_assistant.common.models.player import DeviceInfo, Player from music_assistant.constants import CONF_CROSSFADE, VERBOSE_LOG_LEVEL from music_assistant.server.helpers.didl_lite import create_didl_metadata from music_assistant.server.models.player_provider import PlayerProvider +from music_assistant.server.providers.ugp import UGP_PREFIX from .player import SonosPlayer @@ -341,11 +342,6 @@ class SonosPlayerProvider(PlayerProvider): queue_item: QueueItem, ) -> None: """Handle PLAY MEDIA on given player.""" - url = await self.mass.streams.resolve_stream_url( - player_id, - queue_item=queue_item, - output_codec=ContentType.FLAC, - ) sonos_player = self.sonosplayers[player_id] mass_player = self.mass.players.get(player_id) if sonos_player.sync_coordinator: @@ -355,10 +351,17 @@ class SonosPlayerProvider(PlayerProvider): "accept play_media command, it is synced to another player." ) raise PlayerCommandFailed(msg) - await self.mass.create_task( + + is_flow_stream = queue_item.queue_item_id == "flow" or queue_item.queue_id.startswith( + UGP_PREFIX + ) + url = self.mass.streams.resolve_stream_url( + player_id, queue_item=queue_item, output_codec=ContentType.FLAC + ) + self.mass.create_task( sonos_player.soco.play_uri, url, - meta=create_didl_metadata(self.mass, url, queue_item), + meta=create_didl_metadata(self.mass, url, None if is_flow_stream else queue_item), ) async def enqueue_next_queue_item(self, player_id: str, queue_item: QueueItem) -> None: @@ -377,7 +380,7 @@ class SonosPlayerProvider(PlayerProvider): This will NOT be called if flow mode is enabled on the queue. """ sonos_player = self.sonosplayers[player_id] - url = await self.mass.streams.resolve_stream_url( + url = self.mass.streams.resolve_stream_url( player_id, queue_item=queue_item, output_codec=ContentType.FLAC, diff --git a/music_assistant/server/providers/spotify/__init__.py b/music_assistant/server/providers/spotify/__init__.py index 3a64e30d..b99f700f 100644 --- a/music_assistant/server/providers/spotify/__init__.py +++ b/music_assistant/server/providers/spotify/__init__.py @@ -402,7 +402,7 @@ class SpotifyProvider(MusicProvider): # retry with ap-port set to invalid value, which will force fallback args += ["--ap-port", "12345"] async with AsyncProcess(args) as librespot_proc: - async for chunk in librespot_proc.iter_any(64000): + async for chunk in librespot_proc.iter_any(): yield chunk self._ap_workaround = True diff --git a/music_assistant/server/providers/ugp/__init__.py b/music_assistant/server/providers/ugp/__init__.py index f11eb925..8998adf1 100644 --- a/music_assistant/server/providers/ugp/__init__.py +++ b/music_assistant/server/providers/ugp/__init__.py @@ -20,14 +20,20 @@ from music_assistant.common.models.config_entries import ( ) from music_assistant.common.models.enums import ( ConfigEntryType, + ContentType, PlayerFeature, PlayerState, PlayerType, ProviderFeature, ) +from music_assistant.common.models.media_items import AudioFormat from music_assistant.common.models.player import DeviceInfo, Player from music_assistant.common.models.queue_item import QueueItem from music_assistant.constants import CONF_CROSSFADE, CONF_GROUP_MEMBERS, SYNCGROUP_PREFIX +from music_assistant.server.controllers.streams import ( + FLOW_DEFAULT_BIT_DEPTH, + FLOW_DEFAULT_SAMPLE_RATE, +) from music_assistant.server.models.player_provider import PlayerProvider if TYPE_CHECKING: @@ -138,6 +144,8 @@ class UniversalGroupProvider(PlayerProvider): if member.state == PlayerState.IDLE: continue tg.create_task(self.mass.players.cmd_stop(member.player_id)) + if existing := self.mass.streams.stream_jobs.pop(player_id, None): + existing.stop() async def cmd_play(self, player_id: str) -> None: """Send PLAY command to given player.""" @@ -166,11 +174,22 @@ class UniversalGroupProvider(PlayerProvider): await self.cmd_power(player_id, True) group_player = self.mass.players.get(player_id) + await self.cmd_stop(player_id) + # create a multi-client stream job - all (direct) child's of this UGP group # will subscribe to this multi client queue stream - stream_job = await self.mass.streams.create_multi_client_stream_job( - player_id, - start_queue_item=queue_item, + pcm_format = AudioFormat( + content_type=ContentType.from_bit_depth(FLOW_DEFAULT_BIT_DEPTH), + sample_rate=FLOW_DEFAULT_SAMPLE_RATE, + bit_depth=FLOW_DEFAULT_BIT_DEPTH, + ) + queue = self.mass.player_queues.get(player_id) + stream_job = self.mass.streams.create_stream_job( + queue.queue_id, + pcm_audio_source=self.mass.streams.get_flow_stream( + queue=queue, start_queue_item=queue_item, pcm_format=pcm_format + ), + pcm_format=pcm_format, ) # create a fake queue item to forward to downstream play_media commands ugp_queue_item = QueueItem( diff --git a/music_assistant/server/server.py b/music_assistant/server/server.py index 461f01b0..1f1d2c73 100644 --- a/music_assistant/server/server.py +++ b/music_assistant/server/server.py @@ -29,6 +29,7 @@ from music_assistant.constants import ( CONFIGURABLE_CORE_CONTROLLERS, MIN_SCHEMA_VERSION, ROOT_LOGGER_NAME, + VERBOSE_LOG_LEVEL, ) from music_assistant.server.controllers.cache import CacheController from music_assistant.server.controllers.config import ConfigController @@ -627,7 +628,13 @@ class MusicAssistant: await info.async_request(zeroconf, 3000) await prov.on_mdns_service_state_change(name, state_change, info) - LOGGER.debug(f"Service {name} of type {service_type} state changed: {state_change}") + LOGGER.log( + VERBOSE_LOG_LEVEL, + "Service %s of type %s state changed: %s", + name, + service_type, + state_change, + ) for prov in self._providers.values(): if not prov.manifest.mdns_discovery: continue @@ -647,9 +654,6 @@ class MusicAssistant: ) -> bool | None: """Exit context manager.""" await self.stop() - if exc_val: - raise exc_val - return exc_type async def _update_available_providers_cache(self) -> None: """Update the global cache variable of loaded/available providers.""" diff --git a/requirements_all.txt b/requirements_all.txt index 3d0f3245..c85e39a2 100644 --- a/requirements_all.txt +++ b/requirements_all.txt @@ -32,7 +32,7 @@ python-fullykiosk==0.0.12 python-slugify==8.0.4 radios==0.3.0 shortuuid==1.0.12 -snapcast-mod==2.4.3 +snapcast==2.3.6 soco==0.30.2 sonos-websocket==0.1.3 tidalapi==0.7.4 -- 2.34.1