From: Marcel van der Veldt Date: Sat, 23 Mar 2024 00:38:50 +0000 (+0100) Subject: Fix cleanup of (aborted) ffmpeg processes (#1166) X-Git-Url: https://git.kitaultman.com/?a=commitdiff_plain;h=ad512a2e31d33b4cc7b052f9d6b14c005be83ebe;p=music-assistant-server.git Fix cleanup of (aborted) ffmpeg processes (#1166) --- diff --git a/music_assistant/__main__.py b/music_assistant/__main__.py index 712adc3f..48604d8e 100644 --- a/music_assistant/__main__.py +++ b/music_assistant/__main__.py @@ -50,7 +50,6 @@ def get_arguments(): help="Provide logging level. Example --log-level debug, " "default=info, possible=(critical, error, warning, info, debug)", ) - parser.add_argument("-u", "--enable-uvloop", action="store_true") return parser.parse_args() @@ -179,7 +178,6 @@ def main() -> None: hass_options = {} log_level = hass_options.get("log_level", args.log_level).upper() - enable_uvloop = bool(hass_options.get("enable_uvloop", args.enable_uvloop)) dev_mode = os.environ.get("PYTHONDEVMODE", "0") == "1" # setup logger @@ -203,7 +201,6 @@ def main() -> None: run( start_mass(), - use_uvloop=enable_uvloop, shutdown_callback=on_shutdown, executor_workers=32, ) diff --git a/music_assistant/server/controllers/players.py b/music_assistant/server/controllers/players.py index b8b3d476..28e33a78 100644 --- a/music_assistant/server/controllers/players.py +++ b/music_assistant/server/controllers/players.py @@ -603,12 +603,6 @@ class PlayerController(CoreController): player = self.get(player_id, True) if player.announcement_in_progress: return - # use stream server to host announcement on local network - # this ensures playback on all players, including ones that do not - # like https hosts and it also offers the pre-announce 'bell' - announcement_url = self.mass.streams.get_announcement_url( - player.player_id, url, use_pre_announce=use_pre_announce - ) try: # mark announcement_in_progress on player player.announcement_in_progress = True @@ -621,10 +615,16 @@ class PlayerController(CoreController): # check for native announce support if PlayerFeature.PLAY_ANNOUNCEMENT in player.supported_features: if prov := self.mass.get_provider(player.provider): + # use stream server to host announcement on local network + # this ensures playback on all players, including ones that do not + # like https hosts and it also offers the pre-announce 'bell' + announcement_url = self.mass.streams.get_announcement_url( + player.player_id, url, use_pre_announce=use_pre_announce + ) await prov.play_announcement(player_id, announcement_url) return # use fallback/default implementation - await self._play_announcement(player, announcement_url) + await self._play_announcement(player, url, use_pre_announce) finally: player.announcement_in_progress = False @@ -1078,11 +1078,7 @@ class PlayerController(CoreController): # if a child player turned ON while the group player is on, we need to resync/resume self.mass.create_task(self._sync_syncgroup(group_player.player_id)) - async def _play_announcement( - self, - player: Player, - announcement_url: str, - ) -> None: + async def _play_announcement(self, player: Player, url: str, use_pre_announce: bool) -> None: """Handle (default/fallback) implementation of the play announcement feature. This default implementation will; @@ -1099,28 +1095,28 @@ class PlayerController(CoreController): """ if player.synced_to: # redirect to sync master if player is group child - self.mass.create_task(self.play_announcement(player.synced_to, announcement_url)) + self.mass.create_task(self.play_announcement(player.synced_to, url)) return if active_group := self._get_active_player_group(player): - # redirect to group player if playergroup is atcive - self.mass.create_task(self.play_announcement(active_group.player_id, announcement_url)) + # redirect to group player if playergroup is active + self.mass.create_task(self.play_announcement(active_group.player_id, url)) return # create a queue item for the announcement so # we can send a regular play-media call downstream queue_item = QueueItem( queue_id=player.player_id, - queue_item_id=announcement_url, + queue_item_id=url, name="Announcement", duration=None, streamdetails=StreamDetails( provider="url", - item_id=announcement_url, + item_id=url, audio_format=AudioFormat( - content_type=ContentType.try_parse(announcement_url), + content_type=ContentType.try_parse(url), ), media_type=MediaType.ANNOUNCEMENT, - direct=announcement_url, - data=announcement_url, + direct=url, + data={"url": url, "use_pre_announce": use_pre_announce}, target_loudness=-10, ), ) diff --git a/music_assistant/server/controllers/streams.py b/music_assistant/server/controllers/streams.py index e773cf27..6dc0f4d3 100644 --- a/music_assistant/server/controllers/streams.py +++ b/music_assistant/server/controllers/streams.py @@ -19,12 +19,7 @@ from typing import TYPE_CHECKING import shortuuid from aiohttp import web -from music_assistant.common.helpers.util import ( - empty_queue, - get_ip, - select_free_port, - try_parse_bool, -) +from music_assistant.common.helpers.util import get_ip, select_free_port, try_parse_bool from music_assistant.common.models.config_entries import ( ConfigEntry, ConfigValueOption, @@ -43,18 +38,15 @@ from music_assistant.constants import ( CONF_PUBLISH_IP, SILENCE_FILE, UGP_PREFIX, - VERBOSE_LOG_LEVEL, ) from music_assistant.server.helpers.audio import LOGGER as AUDIO_LOGGER from music_assistant.server.helpers.audio import ( check_audio_support, crossfade_pcm_parts, - get_ffmpeg_args, get_ffmpeg_stream, get_media_stream, get_player_filter_params, ) -from music_assistant.server.helpers.process import AsyncProcess from music_assistant.server.helpers.util import get_ips from music_assistant.server.helpers.webserver import Webserver from music_assistant.server.models.core_controller import CoreController @@ -64,7 +56,6 @@ if TYPE_CHECKING: from music_assistant.common.models.player import Player from music_assistant.common.models.player_queue import PlayerQueue from music_assistant.common.models.queue_item import QueueItem - from music_assistant.server import MusicAssistant DEFAULT_STREAM_HEADERS = { @@ -83,46 +74,46 @@ FLOW_DEFAULT_BIT_DEPTH = 24 # pylint:disable=too-many-locals -class QueueStreamJob: +class MultiClientStreamJob: """ - Representation of a (multiclient) Audio stream job/task. - - The whole idea here is that the (pcm) audio source can be sent to multiple - players at once. For example for (slimproto/airplay) syncgroups and universal group. + Representation of a (multiclient) Audio Queue stream job/task. - All client players receive the exact same PCM audio chunks from the source audio, - which then can be optionally encoded and/or resampled to the player's preferences. - In case a stream is restarted (e.g. when seeking), - a new QueueStreamJob will be created. + The whole idea here is that in case of a player (sync)group, + all client players receive the exact same (PCM) audio chunks from the source audio. + A StreamJob is tied to a Queue and streams the queue flow stream, + In case a stream is restarted (e.g. when seeking), a new MultiClientStreamJob will be created. """ _audio_task: asyncio.Task | None = None def __init__( self, - mass: MusicAssistant, - pcm_audio_source: AsyncGenerator[bytes, None], + stream_controller: StreamsController, + queue_id: str, pcm_format: AudioFormat, - auto_start: bool = False, + start_queue_item: QueueItem, ) -> None: - """Initialize QueueStreamJob instance.""" - self.mass = mass - self.pcm_audio_source = pcm_audio_source + """Initialize MultiClientStreamJob instance.""" + self.stream_controller = stream_controller + self.queue_id = queue_id + self.queue = self.stream_controller.mass.player_queues.get(queue_id) + assert self.queue # just in case self.pcm_format = pcm_format - self.expected_players: set[str] = set() + self.start_queue_item = start_queue_item self.job_id = shortuuid.uuid() - self.bytes_streamed: int = 0 - self.logger = self.mass.streams.logger + self.expected_players: set[str] = set() self.subscribed_players: dict[str, asyncio.Queue[bytes]] = {} - self._finished = False - self.allow_start = auto_start + self.bytes_streamed: int = 0 self._all_clients_connected = asyncio.Event() + self.logger = stream_controller.logger.getChild("streamjob") + self._finished: bool = False + # start running the audio task in the background self._audio_task = asyncio.create_task(self._stream_job_runner()) @property def finished(self) -> bool: """Return if this StreamJob is finished.""" - return self._finished or (self._audio_task and self._audio_task.done()) + return self._finished or self._audio_task and self._audio_task.done() @property def pending(self) -> bool: @@ -132,116 +123,74 @@ class QueueStreamJob: @property def running(self) -> bool: """Return if this Job is running.""" - return ( - self._all_clients_connected.is_set() - and self._audio_task - and not self._audio_task.done() - ) - - def start(self) -> None: - """Start running (send audio chunks to connected players).""" - if self.finished: - raise RuntimeError("Task is already finished") - self.allow_start = True - if self.expected_players and len(self.subscribed_players) >= len(self.expected_players): - self._all_clients_connected.set() + return not self.finished and not self.pending def stop(self) -> None: """Stop running this job.""" - if self._audio_task and not self._audio_task.done(): - self._audio_task.cancel() - if not self._finished: - # we need to make sure that we close the async generator - with suppress(StopAsyncIteration): - task = asyncio.create_task(self.pcm_audio_source.__anext__()) - task.cancel() self._finished = True + if self._audio_task and self._audio_task.done(): + return + if self._audio_task: + self._audio_task.cancel() for sub_queue in self.subscribed_players.values(): - empty_queue(sub_queue) + with suppress(asyncio.QueueFull): + sub_queue.put_nowait(b"") - def resolve_stream_url(self, player_id: str, output_codec: ContentType) -> str: + def resolve_stream_url(self, child_player_id: str, output_codec: ContentType) -> str: """Resolve the childplayer specific stream URL to this streamjob.""" fmt = output_codec.value # handle raw pcm if output_codec.is_pcm(): - player = self.mass.streams.mass.players.get(player_id) + player = self.stream_controller.mass.players.get(child_player_id) player_max_bit_depth = 24 if player.supports_24bit else 16 output_sample_rate = min(self.pcm_format.sample_rate, player.max_sample_rate) output_bit_depth = min(self.pcm_format.bit_depth, player_max_bit_depth) - output_channels = self.mass.config.get_raw_player_config_value( - player_id, CONF_OUTPUT_CHANNELS, "stereo" + output_channels = self.stream_controller.mass.config.get_raw_player_config_value( + child_player_id, CONF_OUTPUT_CHANNELS, "stereo" ) channels = 1 if output_channels != "stereo" else 2 fmt += ( f";codec=pcm;rate={output_sample_rate};" f"bitrate={output_bit_depth};channels={channels}" ) - url = f"{self.mass.streams._server.base_url}/flow/{self.job_id}/{player_id}.{fmt}" - self.expected_players.add(player_id) + url = f"{self.stream_controller._server.base_url}/multi/{self.queue_id}/{self.job_id}/{child_player_id}/{self.start_queue_item.queue_item_id}.{fmt}" # noqa: E501 + self.expected_players.add(child_player_id) return url - async def iter_player_audio( - self, player_id: str, output_format: AudioFormat, chunk_size: int | None = None - ) -> AsyncGenerator[bytes, None]: - """Subscribe consumer and iterate player-specific audio.""" - async for chunk in get_ffmpeg_stream( - audio_input=self.subscribe(player_id), - input_format=self.pcm_format, - output_format=output_format, - filter_params=get_player_filter_params(self.mass, player_id), - chunk_size=chunk_size, - ): - yield chunk - - async def stream_to_custom_output_path( - self, player_id: str, output_format: AudioFormat, output_path: str | int - ) -> None: - """Subscribe consumer and instruct ffmpeg to send the audio to the given output path.""" - custom_file_pointer = isinstance(output_path, int) - ffmpeg_args = get_ffmpeg_args( - input_format=self.pcm_format, - output_format=output_format, - filter_params=get_player_filter_params(self.mass, player_id), - extra_args=[], - input_path="-", - output_path="-" if custom_file_pointer else output_path, - loglevel="info" if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL) else "quiet", - ) - # launch ffmpeg process with player specific settings - # the stream_job_runner will start pushing pcm chunks to the stdin - # the ffmpeg process will send the output directly to the given path (e.g. tcp socket) - async with AsyncProcess( - ffmpeg_args, - enable_stdin=True, - enable_stdout=custom_file_pointer, - enable_stderr=False, - custom_stdin=self.subscribe(player_id), - custom_stdout=output_path if custom_file_pointer else None, - name="ffmpeg_custom_output_path", - ) as ffmpeg_proc: - # we simply wait for the process to exit - await ffmpeg_proc.wait() - async def subscribe(self, player_id: str) -> AsyncGenerator[bytes, None]: """Subscribe consumer and iterate incoming chunks on the queue.""" try: - self.subscribed_players[player_id] = sub_queue = asyncio.Queue(2) - - if self._all_clients_connected.is_set(): - # client subscribes while we're already started + # some players (e.g. dlna, sonos) misbehave and do multiple GET requests + # to the stream in an attempt to get the audio details such as duration + # which is a bit pointless for our duration-less queue stream + # and it completely messes with the subscription logic + if player_id in self.subscribed_players: self.logger.warning( - "Client %s is joining while the stream is already started", player_id + "Player %s is making multiple requests " + "to the same stream, playback may be disturbed!", + player_id, + ) + player_id = f"{player_id}_{shortuuid.random(4)}" + elif self._all_clients_connected.is_set(): + # client subscribes while we're already started - that is going to be messy for sure + self.logger.warning( + "Player %s is is joining while the stream is already started, " + "playback may be disturbed!", + player_id, ) + self.subscribed_players[player_id] = sub_queue = asyncio.Queue(2) + + if self._all_clients_connected.is_set(): + # client subscribes while we're already started - we dont support that (for now?) + msg = f"Client {player_id} is joining while the stream is already started" + raise RuntimeError(msg) self.logger.debug("Subscribed client %s", player_id) - if ( - self.expected_players - and self.allow_start - and len(self.subscribed_players) == len(self.expected_players) - ): + if len(self.subscribed_players) == len(self.expected_players): # we reached the number of expected subscribers, set event # so that chunks can be pushed + await asyncio.sleep(0.2) self._all_clients_connected.set() # keep reading audio chunks from the queue until we receive an empty one @@ -258,7 +207,7 @@ class QueueStreamJob: await asyncio.sleep(2) if len(self.subscribed_players) == 0 and self._audio_task and not self.finished: self.logger.debug("Cleaning up, all clients disappeared...") - self.stop() + self._audio_task.cancel() async def _put_chunk(self, chunk: bytes) -> None: """Put chunk of data to all subscribers.""" @@ -271,7 +220,11 @@ class QueueStreamJob: async def _stream_job_runner(self) -> None: """Feed audio chunks to StreamJob subscribers.""" chunk_num = 0 - async for chunk in self.pcm_audio_source: + async for chunk in self.stream_controller.get_flow_stream( + self.queue, + self.start_queue_item, + self.pcm_format, + ): chunk_num += 1 if chunk_num == 1: # wait until all expected clients are connected @@ -280,19 +233,20 @@ class QueueStreamJob: await self._all_clients_connected.wait() except TimeoutError: if len(self.subscribed_players) == 0: - self.logger.exception( - "Abort multi client stream job %s: " - "client(s) did not connect within timeout", - self.job_id, + self.stream_controller.logger.exception( + "Abort multi client stream job for queue %s: " + "clients did not connect within timeout", + self.queue.display_name, ) break # not all clients connected but timeout expired, set flag and move on # with all clients that did connect self._all_clients_connected.set() else: - self.logger.debug( - "Starting queue stream job %s with %s (out of %s) connected clients", - self.job_id, + self.stream_controller.logger.debug( + "Starting multi client stream job for queue %s " + "with %s out of %s connected clients", + self.queue.display_name, len(self.subscribed_players), len(self.expected_players), ) @@ -323,7 +277,7 @@ class StreamsController(CoreController): """Initialize instance.""" super().__init__(*args, **kwargs) self._server = Webserver(self.logger, enable_dynamic_routes=True) - self.stream_jobs: dict[str, QueueStreamJob] = {} + self.multi_client_jobs: dict[str, MultiClientStreamJob] = {} self.register_dynamic_route = self._server.register_dynamic_route self.unregister_dynamic_route = self._server.unregister_dynamic_route self.manifest.name = "Streamserver" @@ -413,7 +367,7 @@ class StreamsController(CoreController): static_routes=[ ( "*", - "/flow/{job_id}/{player_id}.{fmt}", + "/flow/{queue_id}/{queue_item_id}.{fmt}", self.serve_queue_flow_stream, ), ( @@ -421,6 +375,11 @@ class StreamsController(CoreController): "/single/{queue_id}/{queue_item_id}.{fmt}", self.serve_queue_item_stream, ), + ( + "*", + "/multi/{queue_id}/{job_id}/{player_id}/{queue_item_id}.{fmt}", + self.serve_multi_subscriber_stream, + ), ( "*", "/command/{queue_id}/{command}.mp3", @@ -447,43 +406,25 @@ class StreamsController(CoreController): ) -> str: """Resolve the stream URL for the given QueueItem.""" fmt = output_codec.value - # handle announcement item - if queue_item.media_type == MediaType.ANNOUNCEMENT: - return queue_item.queue_item_id - # handle request for (multi client) queue flow stream + # handle special stream created by UGP if queue_item.queue_id.startswith(UGP_PREFIX): - # special case: we got forwarded a request from a Universal Group Player - # use the existing stream job that was already created by UGP - stream_job = self.mass.streams.stream_jobs[queue_item.queue_id] - return stream_job.resolve_stream_url(player_id, output_codec) - - if flow_mode: - # create a new flow mode stream job session - pcm_format = AudioFormat( - content_type=ContentType.from_bit_depth(24), - sample_rate=FLOW_DEFAULT_SAMPLE_RATE, - bit_depth=FLOW_DEFAULT_BIT_DEPTH, + return self.multi_client_jobs[queue_item.queue_id].resolve_stream_url( + player_id, output_codec ) - stream_job = self.create_stream_job( - queue_item.queue_id, - pcm_audio_source=self.get_flow_stream( - self.mass.player_queues.get(queue_item.queue_id), - start_queue_item=queue_item, - pcm_format=pcm_format, - ), - pcm_format=pcm_format, - auto_start=True, + # handle announcement item + if queue_item.media_type == MediaType.ANNOUNCEMENT: + return self.get_announcement_url( + player_id=queue_item.queue_id, + announcement_url=queue_item.streamdetails.data["url"], + use_pre_announce=queue_item.streamdetails.data["use_pre_announce"], + content_type=output_codec, ) - - return stream_job.resolve_stream_url(player_id, output_codec) - # handle raw pcm without exact format specifiers if output_codec.is_pcm() and ";" not in fmt: fmt += f";codec=pcm;rate={44100};bitrate={16};channels={2}" query_params = {} - url = ( - f"{self._server.base_url}/single/{queue_item.queue_id}/{queue_item.queue_item_id}.{fmt}" - ) + base_path = "flow" if flow_mode else "single" + url = f"{self._server.base_url}/{base_path}/{queue_item.queue_id}/{queue_item.queue_item_id}.{fmt}" # noqa: E501 # we add a timestamp as basic checksum # most importantly this is to invalidate any caches # but also to handle edge cases such as single track repeat @@ -491,27 +432,38 @@ class StreamsController(CoreController): url += "?" + urllib.parse.urlencode(query_params) return url - def create_stream_job( + def create_multi_client_stream_job( self, queue_id: str, - pcm_audio_source: AsyncGenerator[bytes, None], - pcm_format: AudioFormat, - auto_start: bool = False, - ) -> QueueStreamJob: - """ - Create a QueueStreamJob for the given queue.. + start_queue_item: QueueItem, + pcm_bit_depth: int = FLOW_DEFAULT_BIT_DEPTH, + pcm_sample_rate: int = FLOW_DEFAULT_SAMPLE_RATE, + ) -> MultiClientStreamJob: + """Create a MultiClientStreamJob for the given queue.. This is called by player/sync group implementations to start streaming the queue audio to multiple players at once. """ - if existing_job := self.stream_jobs.pop(queue_id, None): + if existing_job := self.multi_client_jobs.pop(queue_id, None): + if ( + queue_id.startswith(UGP_PREFIX) + and existing_job.job_id == start_queue_item.queue_item_id + ): + return existing_job # cleanup existing job first - existing_job.stop() - self.stream_jobs[queue_id] = stream_job = QueueStreamJob( - self.mass, - pcm_audio_source=pcm_audio_source, - pcm_format=pcm_format, - auto_start=auto_start, + if not existing_job.finished: + self.logger.warning("Detected existing (running) stream job for queue %s", queue_id) + existing_job.stop() + self.multi_client_jobs[queue_id] = stream_job = MultiClientStreamJob( + self, + queue_id=queue_id, + pcm_format=AudioFormat( + content_type=ContentType.from_bit_depth(pcm_bit_depth), + sample_rate=pcm_sample_rate, + bit_depth=pcm_bit_depth, + channels=2, + ), + start_queue_item=start_queue_item, ) return stream_job @@ -586,32 +538,28 @@ class StreamsController(CoreController): async def serve_queue_flow_stream(self, request: web.Request) -> web.Response: """Stream Queue Flow audio to player.""" self._log_request(request) - job_id = request.match_info["job_id"] - for queue_id, stream_job in self.stream_jobs.items(): - if stream_job.job_id == job_id: - break - else: - raise web.HTTPNotFound(reason=f"Unknown StreamJob: {job_id}") - if stream_job.finished: - raise web.HTTPNotFound(reason=f"StreamJob {job_id} already finished") - if not (queue := self.mass.player_queues.get(queue_id)): + queue_id = request.match_info["queue_id"] + queue = self.mass.player_queues.get(queue_id) + if not queue: raise web.HTTPNotFound(reason=f"Unknown Queue: {queue_id}") - player_id = request.match_info["player_id"] - child_player = self.mass.players.get(player_id) - if not child_player: - raise web.HTTPNotFound(reason=f"Unknown player: {player_id}") - # work out (childplayer specific!) output format/details + if not (queue_player := self.mass.players.get(queue_id)): + raise web.HTTPNotFound(reason=f"Unknown Player: {queue_id}") + start_queue_item_id = request.match_info["queue_item_id"] + start_queue_item = self.mass.player_queues.get_item(queue_id, start_queue_item_id) + if not start_queue_item: + raise web.HTTPNotFound(reason=f"Unknown Queue item: {start_queue_item_id}") + # work out output format/details output_format = await self._get_output_format( output_format_str=request.match_info["fmt"], - queue_player=child_player, - default_sample_rate=stream_job.pcm_format.sample_rate, - default_bit_depth=stream_job.pcm_format.bit_depth, + queue_player=queue_player, + default_sample_rate=FLOW_DEFAULT_SAMPLE_RATE, + default_bit_depth=FLOW_DEFAULT_BIT_DEPTH, ) # play it safe: only allow icy metadata for mp3 and aac enable_icy = request.headers.get( "Icy-MetaData", "" ) == "1" and output_format.content_type in (ContentType.MP3, ContentType.AAC) - icy_meta_interval = 16384 * 4 if output_format.content_type.is_lossless() else 16384 + icy_meta_interval = 16384 # prepare request, add some DLNA/UPNP compatible headers headers = { @@ -632,32 +580,31 @@ class StreamsController(CoreController): if request.method != "GET": return resp - # some players (e.g. dlna, sonos) misbehave and do multiple GET requests - # to the stream in an attempt to get the audio details such as duration - # which is a bit pointless for our duration-less queue stream - # and it completely messes with the subscription logic - if player_id in stream_job.subscribed_players: - self.logger.warning( - "Player %s is making multiple requests " - "to the same stream, playback may be disturbed!", - player_id, - ) - elif "rincon" in player_id.lower(): - await asyncio.sleep(0.1) - # all checks passed, start streaming! - self.logger.debug( - "Start serving Queue flow audio stream for queue %s to player %s", - queue.display_name, - child_player.display_name, + self.logger.debug("Start serving Queue flow audio stream for %s", queue.display_name) + + # collect player specific ffmpeg args to re-encode the source PCM stream + pcm_format = AudioFormat( + content_type=ContentType.from_bit_depth(output_format.bit_depth), + sample_rate=output_format.sample_rate, + bit_depth=output_format.bit_depth, + channels=2, ) - async for chunk in stream_job.iter_player_audio( - player_id, output_format, chunk_size=icy_meta_interval if enable_icy else None + async for chunk in get_ffmpeg_stream( + audio_input=self.get_flow_stream( + queue=queue, start_queue_item=start_queue_item, pcm_format=pcm_format + ), + input_format=pcm_format, + output_format=output_format, + filter_params=get_player_filter_params(self.mass, queue_player.player_id), + chunk_size=icy_meta_interval if enable_icy else None, ): try: await resp.write(chunk) except (BrokenPipeError, ConnectionResetError): + # race condition break + if not enable_icy: continue @@ -685,6 +632,64 @@ class StreamsController(CoreController): return resp + async def serve_multi_subscriber_stream(self, request: web.Request) -> web.Response: + """Stream Queue Flow audio to a child player within a multi subscriber setup.""" + self._log_request(request) + queue_id = request.match_info["queue_id"] + streamjob = self.multi_client_jobs.get(queue_id) + if not streamjob: + raise web.HTTPNotFound(reason=f"Unknown StreamJob for queue: {queue_id}") + job_id = request.match_info["job_id"] + if job_id != streamjob.job_id: + raise web.HTTPNotFound(reason=f"StreamJob ID {job_id} mismatch for queue: {queue_id}") + child_player_id = request.match_info["player_id"] + child_player = self.mass.players.get(child_player_id) + if not child_player: + raise web.HTTPNotFound(reason=f"Unknown player: {child_player_id}") + # work out (childplayer specific!) output format/details + output_format = await self._get_output_format( + output_format_str=request.match_info["fmt"], + queue_player=child_player, + default_sample_rate=streamjob.pcm_format.sample_rate, + default_bit_depth=streamjob.pcm_format.bit_depth, + ) + # prepare request, add some DLNA/UPNP compatible headers + headers = { + **DEFAULT_STREAM_HEADERS, + "Content-Type": f"audio/{output_format.output_format_str}", + } + resp = web.StreamResponse( + status=200, + reason="OK", + headers=headers, + ) + await resp.prepare(request) + + # return early if this is not a GET request + if request.method != "GET": + return resp + + # all checks passed, start streaming! + self.logger.debug( + "Start serving multi-subscriber Queue flow audio stream for queue %s to player %s", + streamjob.queue.display_name, + child_player.display_name, + ) + + async for chunk in get_ffmpeg_stream( + audio_input=streamjob.subscribe(child_player_id), + input_format=streamjob.pcm_format, + output_format=output_format, + filter_params=get_player_filter_params(self.mass, child_player_id), + ): + try: + await resp.write(chunk) + except (BrokenPipeError, ConnectionResetError): + # race condition + break + + return resp + async def serve_command_request(self, request: web.Request) -> web.Response: """Handle special 'command' request for a player.""" self._log_request(request) @@ -703,11 +708,11 @@ class StreamsController(CoreController): raise web.HTTPNotFound(reason=f"Unknown Player: {player_id}") if player_id not in self.announcements: raise web.HTTPNotFound(reason=f"No pending announcements for Player: {player_id}") - announcement = self.announcements[player_id] + announcement_url = self.announcements[player_id] use_pre_announce = try_parse_bool(request.query.get("pre_announce")) # work out output format/details - fmt = request.match_info.get("fmt", announcement.rsplit(".")[-1]) + fmt = request.match_info.get("fmt", announcement_url.rsplit(".")[-1]) audio_format = AudioFormat(content_type=ContentType.try_parse(fmt)) # prepare request, add some DLNA/UPNP compatible headers headers = { @@ -728,26 +733,13 @@ class StreamsController(CoreController): # all checks passed, start streaming! self.logger.debug( "Start serving audio stream for Announcement %s to %s", - announcement, + announcement_url, player.display_name, ) - extra_args = [] - filter_params = ["loudnorm=I=-10:LRA=7:tp=-2:offset=-0.5"] - if use_pre_announce: - extra_args += [ - "-i", - ANNOUNCE_ALERT_FILE, - "-filter_complex", - "[1:a][0:a]concat=n=2:v=0:a=1,loudnorm=I=-10:LRA=7:tp=-2:offset=-0.5", - ] - filter_params = [] - - async for chunk in get_ffmpeg_stream( - audio_input=announcement, - input_format=audio_format, + async for chunk in self.get_announcement_stream( + announcement_url=announcement_url, output_format=audio_format, - extra_args=extra_args, - filter_params=filter_params, + use_pre_announce=use_pre_announce, ): try: await resp.write(chunk) @@ -756,7 +748,7 @@ class StreamsController(CoreController): self.logger.debug( "Finished serving audio stream for Announcement %s to %s", - announcement, + announcement_url, player.display_name, ) @@ -922,6 +914,32 @@ class StreamsController(CoreController): del buffer self.logger.info("Finished Queue Flow stream for Queue %s", queue.display_name) + async def get_announcement_stream( + self, announcement_url: str, output_format: AudioFormat, use_pre_announce: bool = False + ) -> AsyncGenerator[bytes, None]: + """Get the special announcement stream.""" + # work out output format/details + fmt = announcement_url.rsplit(".")[-1] + audio_format = AudioFormat(content_type=ContentType.try_parse(fmt)) + extra_args = [] + filter_params = ["loudnorm=I=-10:LRA=7:tp=-2:offset=-0.5"] + if use_pre_announce: + extra_args += [ + "-i", + ANNOUNCE_ALERT_FILE, + "-filter_complex", + "[1:a][0:a]concat=n=2:v=0:a=1,loudnorm=I=-10:LRA=7:tp=-2:offset=-0.5", + ] + filter_params = [] + async for chunk in get_ffmpeg_stream( + audio_input=announcement_url, + input_format=audio_format, + output_format=output_format, + extra_args=extra_args, + filter_params=filter_params, + ): + yield chunk + def _log_request(self, request: web.Request) -> None: """Log request.""" if not self.logger.isEnabledFor(logging.DEBUG): diff --git a/music_assistant/server/helpers/audio.py b/music_assistant/server/helpers/audio.py index 720d3ac5..c6515755 100644 --- a/music_assistant/server/helpers/audio.py +++ b/music_assistant/server/helpers/audio.py @@ -373,6 +373,7 @@ async def get_media_stream( # noqa: PLR0915 filter_params=filter_params, extra_args=extra_args, input_path=streamdetails.direct or "-", + loglevel="info", # needed for loudness measurement ) finished = False @@ -381,24 +382,18 @@ async def get_media_stream( # noqa: PLR0915 ffmpeg_args, enable_stdin=streamdetails.direct is None, enable_stderr=True, + custom_stdin=mass.get_provider(streamdetails.provider).get_audio_stream( + streamdetails, + seek_position=streamdetails.seek_position if streamdetails.can_seek else 0, + ) + if not streamdetails.direct + else None, name="ffmpeg_media_stream", ) await ffmpeg_proc.start() logger = LOGGER.getChild("media_stream") logger.debug("start media stream for: %s", streamdetails.uri) - async def writer() -> None: - """Task that grabs the source audio and feeds it to ffmpeg.""" - music_prov = mass.get_provider(streamdetails.provider) - seek_pos = streamdetails.seek_position if streamdetails.can_seek else 0 - async for audio_chunk in music_prov.get_audio_stream(streamdetails, seek_pos): - await ffmpeg_proc.write(audio_chunk) - # write eof when last packet is received - await ffmpeg_proc.write_eof() - - if streamdetails.direct is None: - ffmpeg_proc.attached_tasks.append(asyncio.create_task(writer())) - # get pcm chunks from stdout # we always stay one chunk behind to properly detect end of chunks # so we can strip silence at the beginning and end of a track @@ -450,31 +445,28 @@ async def get_media_stream( # noqa: PLR0915 finally: seconds_streamed = bytes_sent / pcm_sample_size if bytes_sent else 0 streamdetails.seconds_streamed = seconds_streamed + state_str = "finished" if finished else "aborted" + logger.debug( + "stream %s for: %s (%s seconds streamed)", + state_str, + streamdetails.uri, + seconds_streamed, + ) + # store accurate duration if finished: - logger.debug( - "finished stream for: %s (%s seconds streamed)", - streamdetails.uri, - seconds_streamed, - ) - # store accurate duration streamdetails.duration = streamdetails.seek_position + seconds_streamed - else: - logger.debug( - "stream aborted for %s (%s seconds streamed)", - streamdetails.uri, - seconds_streamed, - ) # use communicate to read stderr and wait for exit # read log for loudness measurement (or errors) - _, stderr = await ffmpeg_proc.communicate() - if ffmpeg_proc.returncode != 0: - # ffmpeg has a non zero returncode meaning trouble + try: + _, stderr = await asyncio.wait_for(ffmpeg_proc.communicate(), 5) + except TimeoutError: + stderr = b"" + # ensure to send close here so we terminate and cleanup the process + await ffmpeg_proc.close() + if ffmpeg_proc.returncode != 0 and not bytes_sent: logger.warning("stream error on %s", streamdetails.uri) - logger.warning(stderr.decode()) - finished = False - elif loudness_details := _parse_loudnorm(stderr): - logger.log(VERBOSE_LOG_LEVEL, stderr.decode()) + elif stderr and (loudness_details := _parse_loudnorm(stderr)): required_seconds = 600 if streamdetails.media_type == MediaType.RADIO else 120 if finished or (seconds_streamed >= required_seconds): LOGGER.debug("Loudness measurement for %s: %s", streamdetails.uri, loudness_details) @@ -482,7 +474,7 @@ async def get_media_stream( # noqa: PLR0915 await mass.music.set_track_loudness( streamdetails.item_id, streamdetails.provider, loudness_details ) - else: + elif stderr: logger.log(VERBOSE_LOG_LEVEL, stderr.decode()) # report playback @@ -859,14 +851,18 @@ def get_ffmpeg_args( input_format: AudioFormat, output_format: AudioFormat, filter_params: list[str], - extra_args: list[str], + extra_args: list[str] | None = None, input_path: str = "-", output_path: str = "-", - loglevel: str = "info", + loglevel: str | None = None, ) -> list[str]: """Collect all args to send to the ffmpeg process.""" + if loglevel is None: + loglevel = "info" if LOGGER.isEnabledFor(VERBOSE_LOG_LEVEL) else "quiet" + if extra_args is None: + extra_args = [] + extra_args += ["-bufsize", "32M"] ffmpeg_present, libsoxr_support, version = get_global_cache_value("ffmpeg_support") - if not ffmpeg_present: msg = ( "FFmpeg binary is missing from system." diff --git a/music_assistant/server/helpers/process.py b/music_assistant/server/helpers/process.py index d4cbc312..c529bc12 100644 --- a/music_assistant/server/helpers/process.py +++ b/music_assistant/server/helpers/process.py @@ -12,6 +12,7 @@ import asyncio import logging import os from contextlib import suppress +from signal import SIGINT from types import TracebackType from typing import TYPE_CHECKING @@ -98,8 +99,6 @@ class AsyncProcess: stdin=stdin if self._enable_stdin else None, stdout=stdout if self._enable_stdout else None, stderr=asyncio.subprocess.PIPE if self._enable_stderr else None, - limit=4000000, - pipesize=256000, ) LOGGER.debug("Started %s with PID %s", self._name, self.proc.pid) @@ -167,12 +166,21 @@ class AsyncProcess: task.cancel() with suppress(asyncio.CancelledError): await task + if self.proc.returncode is None: + # always first try to send sigint signal to try clean shutdown + # for example ffmpeg needs this to cleanly shutdown and not lock on pipes + self.proc.send_signal(SIGINT) + # allow the process a little bit of time to respond to the signal + await asyncio.sleep(0.1) + # send communicate until we exited while self.proc.returncode is None: - # make sure the process is cleaned up + # make sure the process is really cleaned up. + # especially with pipes this can cause deadlocks if not properly guarded + # we need to use communicate to ensure buffers are flushed + # we do that with sending communicate try: - # we need to use communicate to ensure buffers are flushed - await asyncio.wait_for(self.proc.communicate(), 10) + await asyncio.wait_for(self.proc.communicate(), 2) except TimeoutError: LOGGER.debug( "Process %s with PID %s did not stop in time. Sending terminate...", @@ -180,7 +188,6 @@ class AsyncProcess: self.proc.pid, ) self.proc.terminate() - await asyncio.sleep(0.5) LOGGER.debug( "Process %s with PID %s stopped with returncode %s", self._name, diff --git a/music_assistant/server/providers/airplay/__init__.py b/music_assistant/server/providers/airplay/__init__.py index 8faafab8..4cdee394 100644 --- a/music_assistant/server/providers/airplay/__init__.py +++ b/music_assistant/server/providers/airplay/__init__.py @@ -8,6 +8,7 @@ import os import platform import socket import time +from collections.abc import AsyncGenerator from contextlib import suppress from dataclasses import dataclass from random import randint, randrange @@ -42,7 +43,7 @@ from music_assistant.common.models.media_items import AudioFormat from music_assistant.common.models.player import DeviceInfo, Player from music_assistant.common.models.player_queue import PlayerQueue from music_assistant.constants import CONF_SYNC_ADJUST, UGP_PREFIX, VERBOSE_LOG_LEVEL -from music_assistant.server.helpers.audio import get_media_stream +from music_assistant.server.helpers.audio import get_ffmpeg_args, get_player_filter_params from music_assistant.server.helpers.process import AsyncProcess, check_output from music_assistant.server.models.player_provider import PlayerProvider @@ -51,7 +52,6 @@ if TYPE_CHECKING: from music_assistant.common.models.provider import ProviderManifest from music_assistant.common.models.queue_item import QueueItem from music_assistant.server import MusicAssistant - from music_assistant.server.controllers.streams import QueueStreamJob from music_assistant.server.models import ProviderInstanceType DOMAIN = "airplay" @@ -181,20 +181,23 @@ def get_primary_ip_address(discovery_info: AsyncServiceInfo) -> str | None: class AirplayStream: """Object that holds the details of a stream job.""" - def __init__(self, prov: AirplayProvider, airplay_player: AirPlayPlayer) -> None: + def __init__( + self, prov: AirplayProvider, airplay_player: AirPlayPlayer, input_format: AudioFormat + ) -> None: """Initialize AirplayStream.""" self.prov = prov self.mass = prov.mass self.airplay_player = airplay_player + self.input_format = input_format # always generate a new active remote id to prevent race conditions # with the named pipe used to send audio self.active_remote_id: str = str(randint(1000, 8000)) self.start_ntp: int | None = None # use as checksum self.prevent_playback: bool = False - self.stream_job: QueueStreamJob | None = None self._log_reader_task: asyncio.Task | None = None self._audio_reader_task: asyncio.Task | None = None self._cliraop_proc: AsyncProcess | None = None + self._ffmpeg_proc: AsyncProcess | None = None self._stop_requested = False @property @@ -206,10 +209,9 @@ class AirplayStream: and self._cliraop_proc.returncode is None ) - async def start(self, start_ntp: int, stream_job: QueueStreamJob) -> None: + async def start(self, start_ntp: int) -> None: """Initialize CLIRaop process for a player.""" self.start_ntp = start_ntp - self.stream_job = stream_job extra_args = [] player_id = self.airplay_player.player_id mass_player = self.mass.players.get(player_id) @@ -237,7 +239,7 @@ class AirplayStream: "-port", str(self.airplay_player.discovery_info.port), "-wait", - str(3000 - sync_adjust), + str(2000 - sync_adjust), "-volume", str(mass_player.volume_level), *extra_args, @@ -255,12 +257,28 @@ class AirplayStream: # connect cliraop stdin with ffmpeg stdout using os pipes read, write = os.pipe() + # launch ffmpeg, feeding (player specific) audio chunks on stdout - self._audio_reader_task = asyncio.create_task( - stream_job.stream_to_custom_output_path( - player_id=player_id, output_format=AIRPLAY_PCM_FORMAT, output_path=write - ) + # one could argue that the intermediate ffmpeg towards cliraop is not needed + # when there are no player specific filters or extras but in this case + # ffmpeg serves as a small buffer towards the realtime cliraop streamer + ffmpeg_args = get_ffmpeg_args( + input_format=self.input_format, + output_format=AIRPLAY_PCM_FORMAT, + filter_params=get_player_filter_params(self.mass, player_id), + ) + # launch ffmpeg process with player specific settings + # the stream_job_runner will start pushing pcm chunks to the stdin + # the ffmpeg process will send the output directly to the given path (e.g. tcp socket) + self._ffmpeg_proc = AsyncProcess( + ffmpeg_args, + enable_stdin=True, + enable_stdout=True, + enable_stderr=False, + custom_stdout=write, + name="cliraop_ffmpeg", ) + await self._ffmpeg_proc.start() self._cliraop_proc = AsyncProcess( cliraop_args, enable_stdin=True, @@ -271,24 +289,31 @@ class AirplayStream: await self._cliraop_proc.start() self._log_reader_task = asyncio.create_task(self._log_watcher()) - # self._audio_reader_task = asyncio.create_task(self._audio_reader()) + async def write_chunk(self, chunk: bytes) -> None: + """Write a (pcm) audio chunk to the player.""" + await self._ffmpeg_proc.write(chunk) + + async def write_eof(self) -> None: + """Write EOF to the ffmpeg stdin.""" + await self._ffmpeg_proc.write_eof() + await self._ffmpeg_proc.wait() + await self.stop() async def stop(self, wait: bool = True): """Stop playback and cleanup.""" - if not self.running: + if self._cliraop_proc.closed and self._ffmpeg_proc.closed: return self._stop_requested = True - # send stop with cli command - await self.send_cli_command("ACTION=STOP") async def _stop() -> None: - # always stop the audio feeder - if self._audio_reader_task and not self._audio_reader_task.done(): - with suppress(asyncio.CancelledError): - self._audio_reader_task.cancel() - await self._cliraop_proc.write_eof() - # the process should exit gracefully after the stop request was processed - await asyncio.wait_for(self._cliraop_proc.wait(), 30) + # ffmpeg MUST be stopped before cliraop due to the chained pipes + await self._ffmpeg_proc.close() + # allow the cliraop process to stop gracefully first + await self.send_cli_command("ACTION=STOP") + with suppress(TimeoutError): + await asyncio.wait_for(self._cliraop_proc.wait(), 5) + # send regular close anyway (which also logs the returncode) + await self._cliraop_proc.close() task = self.mass.create_task(_stop()) if wait: @@ -358,12 +383,9 @@ class AirplayStream: self.mass.players.update(airplay_player.player_id) if "lost packet out of backlog" in line: lost_packets += 1 - if lost_packets == 50: + if lost_packets == 100: logger.warning("High packet loss detected, stopping playback...") - queue = self.mass.player_queues.get_active_queue(mass_player.player_id) - await self.mass.player_queues.stop(queue.queue_id) - else: - logger.debug(line) + await self.stop(False) logger.log(VERBOSE_LOG_LEVEL, line) @@ -371,6 +393,8 @@ class AirplayStream: if airplay_player.active_stream == self: mass_player.state = PlayerState.IDLE self.mass.players.update(airplay_player.player_id) + # ensure we're cleaned up afterwards + await self.stop() async def _send_metadata(self, queue: PlayerQueue) -> None: """Send metadata to player (and connected sync childs).""" @@ -589,26 +613,34 @@ class AirplayProvider(PlayerProvider): if queue_item.queue_id.startswith(UGP_PREFIX): # special case: we got forwarded a request from the UGP # use the existing stream job that was already created by UGP - stream_job = self.mass.streams.stream_jobs[queue_item.queue_id] + stream_job = self.mass.streams.multi_client_jobs[queue_item.queue_id] + stream_job.expected_players.add(player_id) + input_format = stream_job.pcm_format + audio_source = stream_job.subscribe(player_id) + elif queue_item.media_type == MediaType.ANNOUNCEMENT: + # special case: stream announcement + input_format = AIRPLAY_PCM_FORMAT + audio_source = self.mass.streams.get_announcement_stream( + queue_item.streamdetails.data["url"], + pcm_format=AIRPLAY_PCM_FORMAT, + use_pre_announce=queue_item.streamdetails.data["use_pre_announce"], + ) else: - if queue_item.media_type == MediaType.ANNOUNCEMENT: - # stream announcement url directly - audio_source = get_media_stream( - self.mass, queue_item.streamdetails, pcm_format=AIRPLAY_PCM_FORMAT - ) - else: - queue = self.mass.player_queues.get(queue_item.queue_id) - audio_source = self.mass.streams.get_flow_stream( - queue, start_queue_item=queue_item, pcm_format=AIRPLAY_PCM_FORMAT - ) - stream_job = self.mass.streams.create_stream_job( - queue_item.queue_id, pcm_audio_source=audio_source, pcm_format=AIRPLAY_PCM_FORMAT + queue = self.mass.player_queues.get(queue_item.queue_id) + input_format = AIRPLAY_PCM_FORMAT + audio_source = self.mass.streams.get_flow_stream( + queue, start_queue_item=queue_item, pcm_format=AIRPLAY_PCM_FORMAT ) + self.mass.create_task(self._handle_stream_audio, player_id, audio_source, input_format) + async def _handle_stream_audio( + self, player_id: str, audio_source: AsyncGenerator[bytes, None], input_format: AudioFormat + ) -> None: + """Handle streaming of audio to one or more airplay players.""" # Python is not suitable for realtime audio streaming so we do the actual streaming # of (RAOP) audio using a small executable written in C based on libraop to do the actual - # timestamped playback. The raw pcm audio is fed to a named pipe, read by the executable - # and we can send some ingteractie commands to the process stdin. + # timestamped playback, whicj reads pcm audio from stdin + # and we can send some interactive commands using a named pipe. # get current ntp before we start _, stdout = await check_output(f"{self.cliraop_bin} -ntp") @@ -617,11 +649,35 @@ class AirplayProvider(PlayerProvider): # setup Raop process for player and its sync childs async with asyncio.TaskGroup() as tg: for airplay_player in self._get_sync_clients(player_id): - stream_job.expected_players.add(airplay_player.player_id) - airplay_player.active_stream = AirplayStream(self, airplay_player) - tg.create_task(airplay_player.active_stream.start(start_ntp, stream_job)) - if not queue_item.queue_id.startswith(UGP_PREFIX): - stream_job.start() + airplay_player.active_stream = AirplayStream( + self, airplay_player, input_format=input_format + ) + tg.create_task(airplay_player.active_stream.start(start_ntp)) + + async for chunk in audio_source: + active_clients = 0 + async with asyncio.TaskGroup() as tg: + for airplay_player in self._get_sync_clients(player_id): + if not (airplay_player.active_stream and airplay_player.active_stream.running): + # player stopped or switched to a new stream + continue + if airplay_player.active_stream.start_ntp != start_ntp: + # checksum mismatch + continue + tg.create_task(airplay_player.active_stream.write_chunk(chunk)) + active_clients += 1 + if active_clients == 0: + # no more clients + return + # entire stream consumed: send EOF + async with asyncio.TaskGroup() as tg: + for airplay_player in self._get_sync_clients(player_id): + if ( + not airplay_player.active_stream + or airplay_player.active_stream.start_ntp != start_ntp + ): + continue + tg.create_task(airplay_player.active_stream.write_eof()) async def cmd_volume_set(self, player_id: str, volume_level: int) -> None: """Send VOLUME_SET command to given player. diff --git a/music_assistant/server/providers/chromecast/__init__.py b/music_assistant/server/providers/chromecast/__init__.py index a0c4d87b..5111b49d 100644 --- a/music_assistant/server/providers/chromecast/__init__.py +++ b/music_assistant/server/providers/chromecast/__init__.py @@ -420,8 +420,8 @@ class ChromecastProvider(PlayerProvider): # originally/officially cast supports 96k sample rate # but it seems a (recent?) update broke this # for now use 48k as max sample rate to play safe - max_sample_rate=48000, - supports_24bit=True, + max_sample_rate=44100 if cast_info.is_audio_group else 48000, + supports_24bit=not cast_info.is_audio_group, enabled_by_default=enabled_by_default, ), logger=self.logger.getChild(cast_info.friendly_name), diff --git a/music_assistant/server/providers/slimproto/__init__.py b/music_assistant/server/providers/slimproto/__init__.py index c66d73c8..d89f76c3 100644 --- a/music_assistant/server/providers/slimproto/__init__.py +++ b/music_assistant/server/providers/slimproto/__init__.py @@ -44,7 +44,6 @@ from music_assistant.common.models.enums import ( RepeatMode, ) from music_assistant.common.models.errors import MusicAssistantError, SetupFailedError -from music_assistant.common.models.media_items import AudioFormat from music_assistant.common.models.player import DeviceInfo, Player from music_assistant.constants import ( CONF_CROSSFADE, @@ -53,7 +52,6 @@ from music_assistant.constants import ( CONF_PORT, CONF_SYNC_ADJUST, MASS_LOGO_ONLINE, - UGP_PREFIX, VERBOSE_LOG_LEVEL, ) from music_assistant.server.models.player_provider import PlayerProvider @@ -343,23 +341,13 @@ class SlimprotoProvider(PlayerProvider): if player.group_childs: # player has sync members, we need to start a (multi-player) stream job # to make sure that all clients receive the exact same audio - pcm_format = AudioFormat( - content_type=ContentType.from_bit_depth(24), sample_rate=48000, bit_depth=24 - ) - queue = self.mass.player_queues.get(queue_item.queue_id) - stream_job = self.mass.streams.create_stream_job( + stream_job = self.mass.streams.create_multi_client_stream_job( queue_id=queue_item.queue_id, - pcm_audio_source=self.mass.streams.get_flow_stream( - queue, - start_queue_item=queue_item, - pcm_format=pcm_format, - ), - pcm_format=pcm_format, + start_queue_item=queue_item, ) # forward command to player and any connected sync members - sync_clients = self._get_sync_clients(player_id) async with asyncio.TaskGroup() as tg: - for slimplayer in sync_clients: + for slimplayer in self._get_sync_clients(player_id): enforce_mp3 = await self.mass.config.get_player_config_value( slimplayer.player_id, CONF_ENFORCE_MP3 ) @@ -375,8 +363,6 @@ class SlimprotoProvider(PlayerProvider): auto_play=False, ) ) - if not queue_item.queue_id.startswith(UGP_PREFIX): - stream_job.start() else: # regular, single player playback slimplayer = self.slimproto.get_player(player_id) @@ -733,7 +719,7 @@ class SlimprotoProvider(PlayerProvider): sync_playpoints = self._sync_playpoints[slimplayer.player_id] active_queue = self.mass.player_queues.get_active_queue(slimplayer.player_id) - stream_job = self.mass.streams.stream_jobs.get(active_queue.queue_id) + stream_job = self.mass.streams.multi_client_jobs.get(active_queue.queue_id) if not stream_job: # should not happen, but just in case return diff --git a/music_assistant/server/providers/snapcast/__init__.py b/music_assistant/server/providers/snapcast/__init__.py index 7633f2e4..5cdf4b88 100644 --- a/music_assistant/server/providers/snapcast/__init__.py +++ b/music_assistant/server/providers/snapcast/__init__.py @@ -35,7 +35,7 @@ from music_assistant.common.models.errors import SetupFailedError from music_assistant.common.models.media_items import AudioFormat from music_assistant.common.models.player import DeviceInfo, Player from music_assistant.constants import UGP_PREFIX -from music_assistant.server.helpers.audio import get_media_stream +from music_assistant.server.helpers.audio import get_ffmpeg_args, get_player_filter_params from music_assistant.server.helpers.process import AsyncProcess, check_output from music_assistant.server.models.player_provider import PlayerProvider @@ -64,6 +64,15 @@ DEFAULT_SNAPSERVER_PORT = 1705 SNAPWEB_DIR: Final[pathlib.Path] = pathlib.Path(__file__).parent.resolve().joinpath("snapweb") +DEFAULT_SNAPCAST_FORMAT = AudioFormat( + content_type=ContentType.PCM_S16LE, + sample_rate=48000, + # TODO: can we handle 24 bits bit depth ? + bit_depth=16, + channels=2, +) + + async def setup( mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig ) -> ProviderInstanceType: @@ -309,33 +318,27 @@ class SnapCastProvider(PlayerProvider): snap_group = self._get_snapgroup(player_id) await snap_group.set_stream(stream.identifier) - # TODO: can we handle 24 bits bit depth ? - pcm_format = AudioFormat( - content_type=ContentType.PCM_S16LE, - sample_rate=48000, - bit_depth=16, - channels=2, - ) - if queue_item.queue_id.startswith(UGP_PREFIX): # special case: we got forwarded a request from the UGP # use the existing stream job that was already created by UGP - stream_job = self.mass.streams.stream_jobs[queue_item.queue_id] + stream_job = self.mass.streams.multi_client_jobs[queue_item.queue_id] + stream_job.expected_players.add(player_id) + input_format = stream_job.pcm_format + audio_source = stream_job.subscribe(player_id) + elif queue_item.media_type == MediaType.ANNOUNCEMENT: + # special case: stream announcement + input_format = DEFAULT_SNAPCAST_FORMAT + audio_source = self.mass.streams.get_announcement_stream( + queue_item.streamdetails.data["url"], + pcm_format=DEFAULT_SNAPCAST_FORMAT, + use_pre_announce=queue_item.streamdetails.data["use_pre_announce"], + ) else: - if queue_item.media_type == MediaType.ANNOUNCEMENT: - # stream announcement url directly - audio_source = get_media_stream( - self.mass, queue_item.streamdetails, pcm_format=pcm_format - ) - else: - queue = self.mass.player_queues.get(queue_item.queue_id) - audio_source = self.mass.streams.get_flow_stream( - queue, start_queue_item=queue_item, pcm_format=pcm_format - ) - stream_job = self.mass.streams.create_stream_job( - queue_item.queue_id, pcm_audio_source=audio_source, pcm_format=pcm_format + queue = self.mass.player_queues.get(queue_item.queue_id) + input_format = DEFAULT_SNAPCAST_FORMAT + audio_source = self.mass.streams.get_flow_stream( + queue, start_queue_item=queue_item, pcm_format=DEFAULT_SNAPCAST_FORMAT ) - stream_job.expected_players.add(player_id) async def _streamer() -> None: host = self._snapcast_server_host @@ -352,23 +355,35 @@ class SnapCastProvider(PlayerProvider): stream.set_callback(stream_callback) stream_path = f"tcp://{host}:{port}" self.logger.debug("Start streaming to %s", stream_path) + ffmpeg_args = get_ffmpeg_args( + input_format=input_format, + output_format=DEFAULT_SNAPCAST_FORMAT, + filter_params=get_player_filter_params(self.mass, player_id), + output_path=f"tcp://{host}:{port}", + ) try: - await stream_job.stream_to_custom_output_path( - player_id, pcm_format, f"tcp://{host}:{port}" - ) - # we need to wait a bit for the stream status to become idle - # to ensure that all snapclients have consumed the audio - await self.mass.players.wait_for_state(player, PlayerState.IDLE) + async with AsyncProcess( + ffmpeg_args, + enable_stdin=True, + enable_stdout=False, + enable_stderr=False, + name="snapcast_ffmpeg", + ) as ffmpeg_proc: + async for chunk in audio_source: + await ffmpeg_proc.write(chunk) + await ffmpeg_proc.write_eof() + # we need to wait a bit for the stream status to become idle + # to ensure that all snapclients have consumed the audio + await self.mass.players.wait_for_state(player, PlayerState.IDLE) finally: self.logger.debug("Finished streaming to %s", stream_path) # there is no way to unsub the callback to we do this nasty stream._callback_func = None - await self._snapserver.stream_remove_stream(stream.identifier) + with suppress(TypeError, KeyError, AttributeError): + await self._snapserver.stream_remove_stream(stream.identifier) # start streaming the queue (pcm) audio in a background task self._stream_tasks[player_id] = asyncio.create_task(_streamer()) - if not queue_item.queue_id.startswith(UGP_PREFIX): - stream_job.start() def _get_snapgroup(self, player_id: str) -> Snapgroup: """Get snapcast group for given player_id.""" diff --git a/music_assistant/server/providers/spotify/__init__.py b/music_assistant/server/providers/spotify/__init__.py index 1d0c5303..611efd94 100644 --- a/music_assistant/server/providers/spotify/__init__.py +++ b/music_assistant/server/providers/spotify/__init__.py @@ -177,7 +177,7 @@ class SpotifyProvider(MusicProvider): result.artists += [ await self._parse_artist(item) for item in searchresult["artists"]["items"] - if (item and item["id"]) + if (item and item["id"] and item["name"]) ] if "albums" in searchresult: result.albums += [ @@ -411,7 +411,7 @@ class SpotifyProvider(MusicProvider): artist = Artist( item_id=artist_obj["id"], provider=self.domain, - name=artist_obj["name"], + name=artist_obj["name"] or artist_obj["id"], provider_mappings={ ProviderMapping( item_id=artist_obj["id"], @@ -455,6 +455,8 @@ class SpotifyProvider(MusicProvider): album.external_ids.add((ExternalID.BARCODE, album_obj["external_ids"]["ean"])) for artist_obj in album_obj["artists"]: + if not artist_obj.get("name") or not artist_obj.get("id"): + continue album.artists.append(await self._parse_artist(artist_obj)) with contextlib.suppress(ValueError): @@ -523,6 +525,8 @@ class SpotifyProvider(MusicProvider): if artist: track.artists.append(artist) for track_artist in track_obj.get("artists", []): + if not track_artist.get("name") or not track_artist.get("id"): + continue artist = await self._parse_artist(track_artist) if artist and artist.item_id not in {x.item_id for x in track.artists}: track.artists.append(artist) diff --git a/music_assistant/server/providers/ugp/__init__.py b/music_assistant/server/providers/ugp/__init__.py index 43cf0a88..08d37700 100644 --- a/music_assistant/server/providers/ugp/__init__.py +++ b/music_assistant/server/providers/ugp/__init__.py @@ -20,13 +20,11 @@ from music_assistant.common.models.config_entries import ( ) from music_assistant.common.models.enums import ( ConfigEntryType, - ContentType, PlayerFeature, PlayerState, PlayerType, ProviderFeature, ) -from music_assistant.common.models.media_items import AudioFormat from music_assistant.common.models.player import DeviceInfo, Player from music_assistant.common.models.queue_item import QueueItem from music_assistant.constants import ( @@ -35,10 +33,6 @@ from music_assistant.constants import ( SYNCGROUP_PREFIX, UGP_PREFIX, ) -from music_assistant.server.controllers.streams import ( - FLOW_DEFAULT_BIT_DEPTH, - FLOW_DEFAULT_SAMPLE_RATE, -) from music_assistant.server.models.player_provider import PlayerProvider if TYPE_CHECKING: @@ -147,7 +141,7 @@ class UniversalGroupProvider(PlayerProvider): if member.state == PlayerState.IDLE: continue tg.create_task(self.mass.players.cmd_stop(member.player_id)) - if existing := self.mass.streams.stream_jobs.pop(player_id, None): + if existing := self.mass.streams.multi_client_jobs.pop(player_id, None): existing.stop() async def cmd_play(self, player_id: str) -> None: @@ -179,22 +173,17 @@ class UniversalGroupProvider(PlayerProvider): # create a multi-client stream job - all (direct) child's of this UGP group # will subscribe to this multi client queue stream - pcm_format = AudioFormat( - content_type=ContentType.from_bit_depth(FLOW_DEFAULT_BIT_DEPTH), - sample_rate=FLOW_DEFAULT_SAMPLE_RATE, - bit_depth=FLOW_DEFAULT_BIT_DEPTH, - ) queue = self.mass.player_queues.get(player_id) - stream_job = self.mass.streams.create_stream_job( + stream_job = self.mass.streams.create_multi_client_stream_job( queue.queue_id, - pcm_audio_source=self.mass.streams.get_flow_stream( - queue=queue, start_queue_item=queue_item, pcm_format=pcm_format - ), - pcm_format=pcm_format, + start_queue_item=queue_item, ) # create a fake queue item to forward to downstream play_media commands ugp_queue_item = QueueItem( - player_id, queue_item_id="flow", name=group_player.display_name, duration=None + player_id, + queue_item_id=stream_job.job_id, + name="Music Assistant", + duration=None, ) # forward the stream job to all group members @@ -206,7 +195,6 @@ class UniversalGroupProvider(PlayerProvider): if member is None: continue tg.create_task(player_prov.play_media(member.player_id, ugp_queue_item)) - stream_job.start() async def poll_player(self, player_id: str) -> None: """Poll player for state updates.""" diff --git a/music_assistant/server/server.py b/music_assistant/server/server.py index d087b22e..78398cf9 100644 --- a/music_assistant/server/server.py +++ b/music_assistant/server/server.py @@ -124,10 +124,10 @@ class MusicAssistant: self.config = ConfigController(self) await self.config.setup() LOGGER.info( - "Starting Music Assistant Server (%s) version %s - uvloop: %s", + "Starting Music Assistant Server (%s) version %s - HA add-on: %s", self.server_id, self.version, - "uvloop" in str(self.loop), + self.running_as_hass_addon, ) # setup other core controllers self.cache = CacheController(self) diff --git a/pyproject.toml b/pyproject.toml index bef03bf8..45f3bcdc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -40,7 +40,6 @@ server = [ "zeroconf==0.131.0", "cryptography==42.0.5", "ifaddr==0.2.0", - "uvloop==0.19.0", ] test = [ "black==24.2.0", diff --git a/requirements_all.txt b/requirements_all.txt index 3e1ea6a1..bcd67adb 100644 --- a/requirements_all.txt +++ b/requirements_all.txt @@ -37,7 +37,6 @@ soco==0.30.2 sonos-websocket==0.1.3 tidalapi==0.7.4 unidecode==1.3.8 -uvloop==0.19.0 xmltodict==0.13.0 ytmusicapi==1.6.0 zeroconf==0.131.0