From 19d39562fc0854fce61974f61e025d4c9ce43036 Mon Sep 17 00:00:00 2001 From: Marcel van der Veldt Date: Fri, 15 Mar 2024 01:29:23 +0100 Subject: [PATCH] Group playback fixes (#1144) --- music_assistant/common/models/media_items.py | 6 + .../server/controllers/player_queues.py | 4 +- music_assistant/server/controllers/players.py | 10 +- music_assistant/server/controllers/streams.py | 659 +++++++----------- .../server/controllers/webserver.py | 2 +- music_assistant/server/helpers/audio.py | 483 ++++++++----- music_assistant/server/helpers/didl_lite.py | 6 +- music_assistant/server/helpers/process.py | 54 +- music_assistant/server/helpers/tags.py | 131 ++-- .../server/models/player_provider.py | 8 - .../server/providers/airplay/__init__.py | 363 ++++------ .../server/providers/chromecast/__init__.py | 18 +- .../server/providers/dlna/__init__.py | 32 +- .../server/providers/fully_kiosk/__init__.py | 18 +- .../server/providers/hass_players/__init__.py | 22 +- .../server/providers/slimproto/__init__.py | 95 +-- .../server/providers/snapcast/__init__.py | 87 +-- .../server/providers/sonos/__init__.py | 35 +- .../server/providers/spotify/__init__.py | 2 +- .../server/providers/ugp/__init__.py | 13 +- 20 files changed, 906 insertions(+), 1142 deletions(-) diff --git a/music_assistant/common/models/media_items.py b/music_assistant/common/models/media_items.py index 5c5e1b63..2727d7b2 100644 --- a/music_assistant/common/models/media_items.py +++ b/music_assistant/common/models/media_items.py @@ -60,6 +60,12 @@ class AudioFormat(DataClassDictMixin): """Return the PCM sample size.""" return int(self.sample_rate * (self.bit_depth / 8) * self.channels) + def __eq__(self, other: AudioFormat) -> bool: + """Check equality of two items.""" + if not other: + return False + return self.output_format_str == other.output_format_str + @dataclass(frozen=True, kw_only=True) class ProviderMapping(DataClassDictMixin): diff --git a/music_assistant/server/controllers/player_queues.py b/music_assistant/server/controllers/player_queues.py index d49bebc9..50fe0c6d 100644 --- a/music_assistant/server/controllers/player_queues.py +++ b/music_assistant/server/controllers/player_queues.py @@ -821,7 +821,9 @@ class PlayerQueuesController(CoreController): try: # Check if the QueueItem is playable. For example, YT Music returns Radio Items # that are not playable which will stop playback. - await get_stream_details(mass=self.mass, queue_item=next_item) + next_item.streamdetails = await get_stream_details( + mass=self.mass, queue_item=next_item + ) # Lazy load the full MediaItem for the QueueItem, making sure to get the # maximum quality of thumbs next_item.media_item = await self.mass.music.get_item_by_uri(next_item.uri) diff --git a/music_assistant/server/controllers/players.py b/music_assistant/server/controllers/players.py index 3702c377..812c27d9 100644 --- a/music_assistant/server/controllers/players.py +++ b/music_assistant/server/controllers/players.py @@ -525,24 +525,26 @@ class PlayerController(CoreController): if group_player.powered == power: return # nothing to do + # make sure to update the group power state + group_player.powered = power # always stop (group/master)player at power off if not power and group_player.state in (PlayerState.PLAYING, PlayerState.PAUSED): await self.cmd_stop(player_id) async with asyncio.TaskGroup() as tg: - members_powered = False + any_member_powered = False for member in self.iter_group_members(group_player, only_powered=True): - members_powered = True + any_member_powered = True if power: # set active source to group player if the group (is going to be) powered member.active_source = group_player.player_id - elif member.active_source == group_player.player_id: + else: # turn off child player when group turns off tg.create_task(self.cmd_power(member.player_id, False)) member.active_source = None # edge case: group turned on but no members are powered, power them all! - if not members_powered and power: + if not any_member_powered and power: for member in self.iter_group_members(group_player, only_powered=False): tg.create_task(self.cmd_power(member.player_id, True)) member.active_source = group_player.player_id diff --git a/music_assistant/server/controllers/streams.py b/music_assistant/server/controllers/streams.py index 514e44e8..d5dfaf97 100644 --- a/music_assistant/server/controllers/streams.py +++ b/music_assistant/server/controllers/streams.py @@ -13,13 +13,12 @@ import logging import time import urllib.parse from collections.abc import AsyncGenerator -from contextlib import suppress from typing import TYPE_CHECKING import shortuuid from aiohttp import web -from music_assistant.common.helpers.util import get_ip, select_free_port +from music_assistant.common.helpers.util import empty_queue, get_ip, select_free_port from music_assistant.common.models.config_entries import ( ConfigEntry, ConfigValueOption, @@ -33,9 +32,6 @@ from music_assistant.constants import ( CONF_BIND_PORT, CONF_CROSSFADE, CONF_CROSSFADE_DURATION, - CONF_EQ_BASS, - CONF_EQ_MID, - CONF_EQ_TREBLE, CONF_OUTPUT_CHANNELS, CONF_PUBLISH_IP, SILENCE_FILE, @@ -43,10 +39,11 @@ from music_assistant.constants import ( from music_assistant.server.helpers.audio import ( check_audio_support, crossfade_pcm_parts, + get_ffmpeg_stream, get_media_stream, + get_player_filter_params, get_stream_details, ) -from music_assistant.server.helpers.process import AsyncProcess from music_assistant.server.helpers.util import get_ips from music_assistant.server.helpers.webserver import Webserver from music_assistant.server.models.core_controller import CoreController @@ -56,6 +53,7 @@ if TYPE_CHECKING: from music_assistant.common.models.player import Player from music_assistant.common.models.player_queue import PlayerQueue from music_assistant.common.models.queue_item import QueueItem + from music_assistant.server import MusicAssistant DEFAULT_STREAM_HEADERS = { @@ -67,88 +65,94 @@ DEFAULT_STREAM_HEADERS = { "icy-name": "Music Assistant", "icy-pub": "0", } -FLOW_MAX_SAMPLE_RATE = 192000 +FLOW_MAX_SAMPLE_RATE = 96000 FLOW_MAX_BIT_DEPTH = 24 # pylint:disable=too-many-locals -class MultiClientStreamJob: +class MultiClientQueueStreamJob: """Representation of a (multiclient) Audio Queue stream job/task. - The whole idea here is that in case of a player (sync)group, - all client players receive the exact same (PCM) audio chunks from the source audio. + The whole idea here is that the queue stream audio can be sent to multiple + players at once. For example for (slimproto/airplay) syncgroups and universal group. + all client players receive the exact same audio chunks from the source audio, + encoded and/or resampled to the player's preferences. A StreamJob is tied to a Queue and streams the queue flow stream, - In case a stream is restarted (e.g. when seeking), a new MultiClientStreamJob will be created. + In case a stream is restarted (e.g. when seeking), + a new MultiClientQueueStreamJob will be created. """ _audio_task: asyncio.Task | None = None def __init__( self, - stream_controller: StreamsController, - queue_id: str, + mass: MusicAssistant, + pcm_audio_source: AsyncGenerator[bytes, None], pcm_format: AudioFormat, - start_queue_item: QueueItem, - seek_position: int = 0, - fade_in: bool = False, + expected_players: set[str], + auto_start: bool = True, ) -> None: - """Initialize MultiClientStreamJob instance.""" - self.stream_controller = stream_controller - self.queue_id = queue_id - self.queue = self.stream_controller.mass.player_queues.get(queue_id) - assert self.queue # just in case + """Initialize MultiClientQueueStreamJob instance.""" + self.mass = mass + self.pcm_audio_source = pcm_audio_source self.pcm_format = pcm_format - self.start_queue_item = start_queue_item - self.seek_position = seek_position - self.fade_in = fade_in + self.expected_players = expected_players self.job_id = shortuuid.uuid() - self.expected_players: set[str] = set() - self.subscribed_players: dict[str, asyncio.Queue[bytes]] = {} + self.auto_start = auto_start self.bytes_streamed: int = 0 - self._all_clients_connected = asyncio.Event() - self.logger = stream_controller.logger.getChild(f"streamjob_{self.job_id}") - self._finished: bool = False - self.workaround_players_seen: set[str] = set() - # start running the audio task in the background - self._audio_task = asyncio.create_task(self._stream_job_runner()) + self.logger = self.mass.streams.logger.getChild(f"stream_job.{self.job_id}") + self._subscribed_players: dict[str, asyncio.Queue] = {} + self._finished = asyncio.Event() + self._audio_task: asyncio.Task | None = None @property def finished(self) -> bool: """Return if this StreamJob is finished.""" - return self._finished or self._audio_task and self._audio_task.done() + return self._finished.is_set() or self._audio_task and self._audio_task.done() @property def pending(self) -> bool: """Return if this Job is pending start.""" - return not self.finished and not self._all_clients_connected.is_set() + return not self.finished and not self._audio_task @property def running(self) -> bool: """Return if this Job is running.""" - return not self.finished and not self.pending + return self._audio_task and not self._audio_task.done() + + def start(self) -> None: + """Start running (send audio chunks to connected players).""" + if self.running: + return + if self.finished: + raise RuntimeError("Task is already finished") + self.logger.debug( + "Starting multi client stream job %s with %s out of %s connected clients", + self.job_id, + len(self._subscribed_players), + len(self.expected_players), + ) + self._audio_task = asyncio.create_task(self._stream_job_runner()) def stop(self) -> None: """Stop running this job.""" - self._finished = True if self._audio_task and self._audio_task.done(): return if self._audio_task: self._audio_task.cancel() - for sub_queue in self.subscribed_players.values(): - with suppress(asyncio.QueueFull): - sub_queue.put_nowait(b"EOF") + self._finished.set() def resolve_stream_url(self, child_player_id: str, output_codec: ContentType) -> str: """Resolve the childplayer specific stream URL to this streamjob.""" fmt = output_codec.value # handle raw pcm if output_codec.is_pcm(): - player = self.stream_controller.mass.players.get(child_player_id) + player = self.mass.streams.mass.players.get(child_player_id) player_max_bit_depth = 24 if player.supports_24bit else 16 output_sample_rate = min(self.pcm_format.sample_rate, player.max_sample_rate) output_bit_depth = min(self.pcm_format.bit_depth, player_max_bit_depth) - output_channels = self.stream_controller.mass.config.get_raw_player_config_value( + output_channels = self.mass.config.get_raw_player_config_value( child_player_id, CONF_OUTPUT_CHANNELS, "stereo" ) channels = 1 if output_channels != "stereo" else 2 @@ -156,98 +160,66 @@ class MultiClientStreamJob: f";codec=pcm;rate={output_sample_rate};" f"bitrate={output_bit_depth};channels={channels}" ) - url = f"{self.stream_controller._server.base_url}/{self.queue_id}/multi/{self.job_id}/{child_player_id}/{self.start_queue_item.queue_item_id}.{fmt}" # noqa: E501 + url = f"{self.mass.streams._server.base_url}/multi/{self.job_id}/{child_player_id}.{fmt}" self.expected_players.add(child_player_id) return url - async def subscribe(self, player_id: str) -> AsyncGenerator[bytes, None]: - """Subscribe consumer and iterate incoming chunks on the queue.""" - if ( - player_id in self.stream_controller.workaround_players - and player_id not in self.workaround_players_seen + async def subscribe( + self, player_id: str, output_format: AudioFormat, chunk_size: int | None = None + ) -> AsyncGenerator[bytes, None]: + """Subscribe consumer and iterate chunks on the queue encoded to given output format.""" + async for chunk in get_ffmpeg_stream( + audio_input=self._subscribe_pcm(player_id), + input_format=self.pcm_format, + output_format=output_format, + filter_params=get_player_filter_params(self.mass, player_id), + chunk_size=chunk_size, ): - self.workaround_players_seen.add(player_id) - yield b"EOF" - return + yield chunk + async def _subscribe_pcm(self, player_id: str) -> AsyncGenerator[bytes, None]: + """Subscribe consumer and iterate incoming (raw pcm) chunks on the queue.""" try: - self.subscribed_players[player_id] = sub_queue = asyncio.Queue(2) - - if self._all_clients_connected.is_set(): - # client subscribes while we're already started - we dont support that (for now?) - msg = f"Client {player_id} is joining while the stream is already started" - raise RuntimeError(msg) - self.logger.debug("Subscribed client %s", player_id) + self._subscribed_players[player_id] = queue = asyncio.Queue(1) + + if self.running: + # client subscribes while we're already started + # that will probably cause side effects but let it go + self.logger.warning( + "Player %s is joining while the stream is already started!", player_id + ) + else: + self.logger.debug("Subscribed player %s", player_id) - if len(self.subscribed_players) == len(self.expected_players): + await asyncio.sleep(0.2) # debounce + if ( + self.auto_start + and not self.running + and len(self._subscribed_players) == len(self.expected_players) + ): # we reached the number of expected subscribers, set event # so that chunks can be pushed - self._all_clients_connected.set() - - # keep reading audio chunks from the queue until we receive an EOF chunk - while True: - chunk = await sub_queue.get() - if chunk == b"EOF": - # EOF chunk received - break - yield chunk + self.start() + # yield from queue until finished + while not self._finished.is_set(): + yield await queue.get() finally: - self.subscribed_players.pop(player_id, None) + if sub_queue := self._subscribed_players.pop(player_id, None): + empty_queue(sub_queue) self.logger.debug("Unsubscribed client %s", player_id) # check if this was the last subscriber and we should cancel await asyncio.sleep(2) - if len(self.subscribed_players) == 0 and self._audio_task and not self.finished: + if len(self._subscribed_players) == 0 and not self.finished: self.logger.debug("Cleaning up, all clients disappeared...") - self._audio_task.cancel() - - async def _put_chunk(self, chunk: bytes) -> None: - """Put chunk of data to all subscribers.""" - async with asyncio.TaskGroup() as tg: - for sub_queue in list(self.subscribed_players.values()): - # put this chunk on the player's subqueue - tg.create_task(sub_queue.put(chunk)) - self.bytes_streamed += len(chunk) + self.stop() async def _stream_job_runner(self) -> None: """Feed audio chunks to StreamJob subscribers.""" - chunk_num = 0 - async for chunk in self.stream_controller.get_flow_stream( - self.queue, - self.start_queue_item, - self.pcm_format, - self.seek_position, - self.fade_in, - ): - chunk_num += 1 - if chunk_num == 1: - # wait until all expected clients are connected - try: - async with asyncio.timeout(10): - await self._all_clients_connected.wait() - except TimeoutError: - if len(self.subscribed_players) == 0: - self.stream_controller.logger.exception( - "Abort multi client stream job for queue %s: " - "clients did not connect within timeout", - self.queue.display_name, - ) - break - # not all clients connected but timeout expired, set flag and move on - # with all clients that did connect - self._all_clients_connected.set() - else: - self.stream_controller.logger.debug( - "Starting multi client stream job for queue %s " - "with %s out of %s connected clients", - self.queue.display_name, - len(self.subscribed_players), - len(self.expected_players), - ) - - await self._put_chunk(chunk) - - # mark EOF with EOF chunk - await self._put_chunk(b"EOF") + async for chunk in self.pcm_audio_source: + async with asyncio.TaskGroup() as tg: + for listener_queue in list(self._subscribed_players.values()): + tg.create_task(listener_queue.put(chunk)) + self._finished.set() def parse_pcm_info(content_type: str) -> tuple[int, int, int]: @@ -270,7 +242,7 @@ class StreamsController(CoreController): """Initialize instance.""" super().__init__(*args, **kwargs) self._server = Webserver(self.logger, enable_dynamic_routes=True) - self.multi_client_jobs: dict[str, MultiClientStreamJob] = {} + self.multi_client_jobs: dict[str, MultiClientQueueStreamJob] = {} self.register_dynamic_route = self._server.register_dynamic_route self.unregister_dynamic_route = self._server.unregister_dynamic_route self.manifest.name = "Streamserver" @@ -280,7 +252,6 @@ class StreamsController(CoreController): "some player specific local control callbacks." ) self.manifest.icon = "cast-audio" - self.workaround_players: set[str] = set() @property def base_url(self) -> str: @@ -358,22 +329,22 @@ class StreamsController(CoreController): static_routes=[ ( "*", - "/{queue_id}/multi/{job_id}/{player_id}/{queue_item_id}.{fmt}", + "/multi/{job_id}/{player_id}.{fmt}", self.serve_multi_subscriber_stream, ), ( "*", - "/{queue_id}/flow/{queue_item_id}.{fmt}", + "/flow/{queue_id}/{queue_item_id}.{fmt}", self.serve_queue_flow_stream, ), ( "*", - "/{queue_id}/single/{queue_item_id}.{fmt}", + "/single/{queue_id}/{queue_item_id}.{fmt}", self.serve_queue_item_stream, ), ( "*", - "/{queue_id}/command/{command}.mp3", + "/command/{queue_id}/{command}.mp3", self.serve_command_request, ), ], @@ -385,6 +356,7 @@ class StreamsController(CoreController): async def resolve_stream_url( self, + player_id: str, queue_item: QueueItem, output_codec: ContentType, seek_position: int = 0, @@ -393,12 +365,16 @@ class StreamsController(CoreController): ) -> str: """Resolve the stream URL for the given QueueItem.""" fmt = output_codec.value + # handle request for multi client queue stream + stream_job = self.multi_client_jobs.get(queue_item.queue_id) + if queue_item.queue_item_id == "flow" or stream_job and stream_job.pending: + return stream_job.resolve_stream_url(player_id, output_codec) # handle raw pcm without exact format specifiers if output_codec.is_pcm() and ";" not in fmt: fmt += f";codec=pcm;rate={44100};bitrate={16};channels={2}" query_params = {} base_path = "flow" if flow_mode else "single" - url = f"{self._server.base_url}/{queue_item.queue_id}/{base_path}/{queue_item.queue_item_id}.{fmt}" # noqa: E501 + url = f"{self._server.base_url}/{base_path}/{queue_item.queue_id}/{queue_item.queue_item_id}.{fmt}" # noqa: E501 if seek_position: query_params["seek_position"] = str(seek_position) if fade_in: @@ -418,29 +394,39 @@ class StreamsController(CoreController): fade_in: bool = False, pcm_bit_depth: int = 24, pcm_sample_rate: int = 48000, - ) -> MultiClientStreamJob: - """Create a MultiClientStreamJob for the given queue.. + expected_players: set[str] | None = None, + auto_start: bool = True, + ) -> MultiClientQueueStreamJob: + """ + Create a MultiClientQueueStreamJob for the given queue.. This is called by player/sync group implementations to start streaming the queue audio to multiple players at once. """ - if existing_job := self.multi_client_jobs.pop(queue_id, None): + if existing_job := self.multi_client_jobs.get(queue_id, None): + if existing_job.pending: + return existing_job # cleanup existing job first - if not existing_job.finished: - self.logger.warning("Detected existing (running) stream job for queue %s", queue_id) - existing_job.stop() - self.multi_client_jobs[queue_id] = stream_job = MultiClientStreamJob( - self, - queue_id=queue_id, - pcm_format=AudioFormat( - content_type=ContentType.from_bit_depth(pcm_bit_depth), - sample_rate=pcm_sample_rate, - bit_depth=pcm_bit_depth, - channels=2, + existing_job.stop() + queue = self.mass.player_queues.get(queue_id) + pcm_format = AudioFormat( + content_type=ContentType.from_bit_depth(pcm_bit_depth), + sample_rate=pcm_sample_rate, + bit_depth=pcm_bit_depth, + channels=2, + ) + self.multi_client_jobs[queue_id] = stream_job = MultiClientQueueStreamJob( + self.mass, + pcm_audio_source=self.get_flow_stream( + queue=queue, + start_queue_item=start_queue_item, + pcm_format=pcm_format, + seek_position=seek_position, + fade_in=fade_in, ), - start_queue_item=start_queue_item, - seek_position=seek_position, - fade_in=fade_in, + pcm_format=pcm_format, + expected_players=expected_players or set(), + auto_start=auto_start, ) return stream_job @@ -496,7 +482,6 @@ class StreamsController(CoreController): queue.display_name, ) queue.index_in_buffer = self.mass.player_queues.index_by_id(queue_id, queue_item_id) - # collect player specific ffmpeg args to re-encode the source PCM stream pcm_format = AudioFormat( content_type=ContentType.from_bit_depth( queue_item.streamdetails.audio_format.bit_depth @@ -504,39 +489,22 @@ class StreamsController(CoreController): sample_rate=queue_item.streamdetails.audio_format.sample_rate, bit_depth=queue_item.streamdetails.audio_format.bit_depth, ) - ffmpeg_args = await self._get_player_ffmpeg_args( - queue_player, + async for chunk in get_ffmpeg_stream( + audio_input=get_media_stream( + self.mass, + streamdetails=queue_item.streamdetails, + pcm_format=pcm_format, + seek_position=seek_position, + fade_in=fade_in, + ), input_format=pcm_format, output_format=output_format, - ) - - async with AsyncProcess(ffmpeg_args, True) as ffmpeg_proc: - # feed stdin with pcm audio chunks from origin - async def read_audio() -> None: - try: - async for _, chunk in get_media_stream( - self.mass, - streamdetails=queue_item.streamdetails, - pcm_format=pcm_format, - seek_position=seek_position, - fade_in=fade_in, - ): - try: - await ffmpeg_proc.write(chunk) - except BrokenPipeError: - break - finally: - ffmpeg_proc.write_eof() - - ffmpeg_proc.attach_task(read_audio()) - - # read final chunks from stdout - async for chunk in ffmpeg_proc.iter_any(768000): - try: - await resp.write(chunk) - except (BrokenPipeError, ConnectionResetError): - # race condition - break + filter_params=get_player_filter_params(self.mass, queue_player.player_id), + ): + try: + await resp.write(chunk) + except (BrokenPipeError, ConnectionResetError): + break return resp @@ -544,8 +512,7 @@ class StreamsController(CoreController): """Stream Queue Flow audio to player.""" self._log_request(request) queue_id = request.match_info["queue_id"] - queue = self.mass.player_queues.get(queue_id) - if not queue: + if not (queue := self.mass.player_queues.get(queue_id)): raise web.HTTPNotFound(reason=f"Unknown Queue: {queue_id}") start_queue_item_id = request.match_info["queue_item_id"] start_queue_item = self.mass.player_queues.get_item(queue_id, start_queue_item_id) @@ -585,105 +552,92 @@ class StreamsController(CoreController): # all checks passed, start streaming! self.logger.debug("Start serving Queue flow audio stream for %s", queue_player.name) - # collect player specific ffmpeg args to re-encode the source PCM stream pcm_format = AudioFormat( content_type=ContentType.from_bit_depth(output_format.bit_depth), sample_rate=output_format.sample_rate, bit_depth=output_format.bit_depth, channels=2, ) - ffmpeg_args = await self._get_player_ffmpeg_args( - queue_player, + async for chunk in get_ffmpeg_stream( + audio_input=self.get_flow_stream( + queue=queue, + start_queue_item=start_queue_item, + pcm_format=pcm_format, + seek_position=seek_position, + fade_in=fade_in, + ), input_format=pcm_format, output_format=output_format, - ) - - async with AsyncProcess(ffmpeg_args, True) as ffmpeg_proc: - # feed stdin with pcm audio chunks from origin - async def read_audio() -> None: - try: - async for chunk in self.get_flow_stream( - queue=queue, - start_queue_item=start_queue_item, - pcm_format=pcm_format, - seek_position=seek_position, - fade_in=fade_in, - ): - try: - await ffmpeg_proc.write(chunk) - except BrokenPipeError: - break - finally: - ffmpeg_proc.write_eof() - - ffmpeg_proc.attach_task(read_audio()) - - # read final chunks from stdout - iterator = ( - ffmpeg_proc.iter_chunked(icy_meta_interval) - if enable_icy - else ffmpeg_proc.iter_any(768000) - ) - async for chunk in iterator: - try: - await resp.write(chunk) - except (BrokenPipeError, ConnectionResetError): - # race condition - break - - if not enable_icy: - continue - - # if icy metadata is enabled, send the icy metadata after the chunk - if ( - # use current item here and not buffered item, otherwise - # the icy metadata will be too much ahead - (current_item := queue.current_item) - and current_item.streamdetails - and current_item.streamdetails.stream_title - ): - title = current_item.streamdetails.stream_title - elif queue and current_item and current_item.name: - title = current_item.name - else: - title = "Music Assistant" - metadata = f"StreamTitle='{title}';".encode() - if current_item and current_item.image: - metadata += f"StreamURL='{current_item.image.path}'".encode() - while len(metadata) % 16 != 0: - metadata += b"\x00" - length = len(metadata) - length_b = chr(int(length / 16)).encode() - await resp.write(length_b + metadata) + filter_params=get_player_filter_params(self.mass, queue_player.player_id), + chunk_size=icy_meta_interval if enable_icy else None, + ): + try: + await resp.write(chunk) + except (BrokenPipeError, ConnectionResetError): + break + + if not enable_icy: + continue + + # if icy metadata is enabled, send the icy metadata after the chunk + if ( + # use current item here and not buffered item, otherwise + # the icy metadata will be too much ahead + (current_item := queue.current_item) + and current_item.streamdetails + and current_item.streamdetails.stream_title + ): + title = current_item.streamdetails.stream_title + elif queue and current_item and current_item.name: + title = current_item.name + else: + title = "Music Assistant" + metadata = f"StreamTitle='{title}';".encode() + if current_item and current_item.image: + metadata += f"StreamURL='{current_item.image.path}'".encode() + while len(metadata) % 16 != 0: + metadata += b"\x00" + length = len(metadata) + length_b = chr(int(length / 16)).encode() + await resp.write(length_b + metadata) return resp async def serve_multi_subscriber_stream(self, request: web.Request) -> web.Response: """Stream Queue Flow audio to a child player within a multi subscriber setup.""" self._log_request(request) - queue_id = request.match_info["queue_id"] - streamjob = self.multi_client_jobs.get(queue_id) - if not streamjob: - raise web.HTTPNotFound(reason=f"Unknown StreamJob for queue: {queue_id}") job_id = request.match_info["job_id"] - if job_id != streamjob.job_id: - raise web.HTTPNotFound(reason=f"StreamJob ID {job_id} mismatch for queue: {queue_id}") - child_player_id = request.match_info["player_id"] - child_player = self.mass.players.get(child_player_id) + for queue_id, stream_job in self.multi_client_jobs.items(): + if stream_job.job_id == job_id: + break + else: + raise web.HTTPNotFound(reason=f"Unknown StreamJob: {job_id}") + if stream_job.finished: + raise web.HTTPNotFound(reason=f"StreamJob {job_id} already finished") + if not (queue := self.mass.player_queues.get(queue_id)): + raise web.HTTPNotFound(reason=f"Unknown Queue: {queue_id}") + + player_id = request.match_info["player_id"] + child_player = self.mass.players.get(player_id) if not child_player: - raise web.HTTPNotFound(reason=f"Unknown player: {child_player_id}") + raise web.HTTPNotFound(reason=f"Unknown player: {player_id}") # work out (childplayer specific!) output format/details output_format = await self._get_output_format( output_format_str=request.match_info["fmt"], queue_player=child_player, - default_sample_rate=streamjob.pcm_format.sample_rate, - default_bit_depth=streamjob.pcm_format.bit_depth, + default_sample_rate=stream_job.pcm_format.sample_rate, + default_bit_depth=stream_job.pcm_format.bit_depth, ) # prepare request, add some DLNA/UPNP compatible headers + enable_icy = request.headers.get("Icy-MetaData", "") == "1" + icy_meta_interval = 16384 * 4 if output_format.content_type.is_lossless() else 16384 headers = { **DEFAULT_STREAM_HEADERS, "Content-Type": f"audio/{output_format.output_format_str}", } + if enable_icy: + headers["icy-metaint"] = str(icy_meta_interval) + resp = web.StreamResponse( status=200, reason="OK", @@ -695,53 +649,43 @@ class StreamsController(CoreController): if request.method != "GET": return resp - # some players (e.g. dlna, sonos) misbehave and do multiple GET requests - # to the stream in an attempt to get the audio details such as duration - # which is a bit pointless for our duration-less queue stream - # and it completely messes with the subscription logic - if child_player_id in streamjob.subscribed_players: - self.logger.warning( - "Player %s is making multiple requests " - "to the same stream, playback may be disturbed!", - child_player_id, - ) - self.workaround_players.add(child_player_id) - # all checks passed, start streaming! self.logger.debug( "Start serving multi-subscriber Queue flow audio stream for queue %s to player %s", - streamjob.queue.display_name, + queue.display_name, child_player.display_name, ) - - # collect player specific ffmpeg args to re-encode the source PCM stream - ffmpeg_args = await self._get_player_ffmpeg_args( - child_player, - input_format=streamjob.pcm_format, - output_format=output_format, - ) - - async with AsyncProcess(ffmpeg_args, True) as ffmpeg_proc: - # feed stdin with pcm audio chunks from origin - async def read_audio() -> None: - try: - async for chunk in streamjob.subscribe(child_player_id): - try: - await ffmpeg_proc.write(chunk) - except BrokenPipeError: - break - finally: - ffmpeg_proc.write_eof() - - ffmpeg_proc.attach_task(read_audio()) - - # read final chunks from stdout - async for chunk in ffmpeg_proc.iter_any(768000): - try: - await resp.write(chunk) - except (BrokenPipeError, ConnectionResetError): - # race condition - break + async for chunk in stream_job.subscribe( + player_id, output_format, chunk_size=icy_meta_interval if enable_icy else None + ): + try: + await resp.write(chunk) + except (BrokenPipeError, ConnectionResetError): + break + if not enable_icy: + continue + + # if icy metadata is enabled, send the icy metadata after the chunk + if ( + # use current item here and not buffered item, otherwise + # the icy metadata will be too much ahead + (current_item := queue.current_item) + and current_item.streamdetails + and current_item.streamdetails.stream_title + ): + title = current_item.streamdetails.stream_title + elif queue and current_item and current_item.name: + title = current_item.name + else: + title = "Music Assistant" + metadata = f"StreamTitle='{title}';".encode() + if current_item and current_item.image: + metadata += f"StreamURL='{current_item.image.path}'".encode() + while len(metadata) % 16 != 0: + metadata += b"\x00" + length = len(metadata) + length_b = chr(int(length / 16)).encode() + await resp.write(length_b + metadata) return resp @@ -756,7 +700,7 @@ class StreamsController(CoreController): def get_command_url(self, player_or_queue_id: str, command: str) -> str: """Get the url for the special command stream.""" - return f"{self.base_url}/{player_or_queue_id}/command/{command}.mp3" + return f"{self.base_url}/command/{player_or_queue_id}/{command}.mp3" async def get_flow_stream( self, @@ -821,7 +765,7 @@ class StreamsController(CoreController): bytes_written = 0 buffer = b"" # handle incoming audio chunks - async for is_last_chunk, chunk in get_media_stream( + async for chunk in get_media_stream( self.mass, queue_track.streamdetails, pcm_format=pcm_format, @@ -831,21 +775,14 @@ class StreamsController(CoreController): strip_silence_begin=use_crossfade, strip_silence_end=use_crossfade, ): - # throttle buffer, do not allow more than 30 seconds in player's own buffer - seconds_buffered = (total_bytes_written + bytes_written) / pcm_sample_size - player = self.mass.players.get(queue.queue_id) - if seconds_buffered > 60 and player.corrected_elapsed_time > 30: - while (seconds_buffered - player.corrected_elapsed_time) > 30: - await asyncio.sleep(1) - # ALWAYS APPEND CHUNK TO BUFFER buffer += chunk - if not is_last_chunk and len(buffer) < buffer_size: + if len(buffer) < buffer_size: # buffer is not full enough, move on continue #### HANDLE CROSSFADE OF PREVIOUS TRACK AND NEW TRACK - if not is_last_chunk and last_fadeout_part: + if last_fadeout_part: # perform crossfade fadein_part = buffer[:crossfade_size] remaining_bytes = buffer[crossfade_size:] @@ -867,22 +804,6 @@ class StreamsController(CoreController): last_fadeout_part = b"" buffer = b"" - #### HANDLE END OF TRACK - elif is_last_chunk: - if use_crossfade: - # if crossfade is enabled, save fadeout part to pickup for next track - last_fadeout_part = buffer[-crossfade_size:] - remaining_bytes = buffer[:-crossfade_size] - yield remaining_bytes - bytes_written += len(remaining_bytes) - del remaining_bytes - else: - # no crossfade enabled, just yield the (entire) buffer last part - yield buffer - bytes_written += len(buffer) - # clear vars - buffer = b"" - #### OTHER: enough data in buffer, feed to output else: chunk_size = len(chunk) @@ -890,6 +811,26 @@ class StreamsController(CoreController): bytes_written += chunk_size buffer = buffer[chunk_size:] + #### HANDLE END OF TRACK + if last_fadeout_part: + # edge case: we did not get enough data to make the crossfade + yield last_fadeout_part + bytes_written += len(last_fadeout_part) + last_fadeout_part = b"" + if use_crossfade: + # if crossfade is enabled, save fadeout part to pickup for next track + last_fadeout_part = buffer[-crossfade_size:] + remaining_bytes = buffer[:-crossfade_size] + yield remaining_bytes + bytes_written += len(remaining_bytes) + del remaining_bytes + else: + # no crossfade enabled, just yield the (entire) buffer last part + yield buffer + bytes_written += len(buffer) + # clear vars + buffer = b"" + # update duration details based on the actual pcm data we sent # this also accounts for crossfade and silence stripping queue_track.streamdetails.seconds_streamed = bytes_written / pcm_sample_size @@ -909,92 +850,6 @@ class StreamsController(CoreController): yield last_fadeout_part self.logger.info("Finished Queue Flow stream for Queue %s", queue.display_name) - async def _get_player_ffmpeg_args( - self, - player: Player, - input_format: AudioFormat, - output_format: AudioFormat, - ) -> list[str]: - """Get player specific arguments for the given (pcm) input and output details.""" - # generic args - generic_args = [ - "ffmpeg", - "-hide_banner", - "-loglevel", - "warning" if self.logger.isEnabledFor(logging.DEBUG) else "quiet", - "-ignore_unknown", - ] - # input args - input_args = [ - "-f", - input_format.content_type.value, - "-ac", - str(input_format.channels), - "-channel_layout", - "mono" if input_format.channels == 1 else "stereo", - "-ar", - str(input_format.sample_rate), - "-i", - "-", - ] - # select output args - if output_format.content_type == ContentType.FLAC: - # set compression level to 0 to prevent issues with cast players - output_args = ["-f", "flac", "-compression_level", "0"] - elif output_format.content_type == ContentType.AAC: - output_args = ["-f", "adts", "-c:a", "aac", "-b:a", "320k"] - elif output_format.content_type == ContentType.MP3: - output_args = ["-f", "mp3", "-c:a", "mp3", "-b:a", "320k"] - else: - output_args = ["-f", output_format.content_type.value] - - # append channels - output_args += ["-ac", str(output_format.channels)] - # append sample rate (if codec is lossless) - if output_format.content_type.is_lossless(): - output_args += ["-ar", str(output_format.sample_rate)] - # append output = pipe - output_args += ["-"] - - # collect extra and filter args - # TODO: add convolution/DSP/roomcorrections here! - extra_args = [] - filter_params = [] - - # the below is a very basic 3-band equalizer, - # this could be a lot more sophisticated at some point - if ( - eq_bass := self.mass.config.get_raw_player_config_value( - player.player_id, CONF_EQ_BASS, 0 - ) - ) != 0: - filter_params.append(f"equalizer=frequency=100:width=200:width_type=h:gain={eq_bass}") - if ( - eq_mid := self.mass.config.get_raw_player_config_value(player.player_id, CONF_EQ_MID, 0) - ) != 0: - filter_params.append(f"equalizer=frequency=900:width=1800:width_type=h:gain={eq_mid}") - if ( - eq_treble := self.mass.config.get_raw_player_config_value( - player.player_id, CONF_EQ_TREBLE, 0 - ) - ) != 0: - filter_params.append( - f"equalizer=frequency=9000:width=18000:width_type=h:gain={eq_treble}" - ) - # handle output mixing only left or right - conf_channels = self.mass.config.get_raw_player_config_value( - player.player_id, CONF_OUTPUT_CHANNELS, "stereo" - ) - if conf_channels == "left": - filter_params.append("pan=mono|c0=FL") - elif conf_channels == "right": - filter_params.append("pan=mono|c0=FR") - - if filter_params: - extra_args += ["-af", ",".join(filter_params)] - - return generic_args + input_args + extra_args + output_args - def _log_request(self, request: web.Request) -> None: """Log request.""" if not self.logger.isEnabledFor(logging.DEBUG): diff --git a/music_assistant/server/controllers/webserver.py b/music_assistant/server/controllers/webserver.py index 507f42d5..dd726900 100644 --- a/music_assistant/server/controllers/webserver.py +++ b/music_assistant/server/controllers/webserver.py @@ -375,7 +375,7 @@ class WebsocketClientHandler: result = await result self._send_message(SuccessResultMessage(msg.message_id, result)) except Exception as err: # pylint: disable=broad-except - if self.log_level == "VERBOSE": + if self._logger.isEnabledFor(logging.DEBUG): self._logger.exception("Error handling message: %s", msg) else: self._logger.error("Error handling message: %s: %s", msg.command, str(err)) diff --git a/music_assistant/server/helpers/audio.py b/music_assistant/server/helpers/audio.py index 298042a4..529bd4f6 100644 --- a/music_assistant/server/helpers/audio.py +++ b/music_assistant/server/helpers/audio.py @@ -25,6 +25,10 @@ from music_assistant.common.models.errors import ( from music_assistant.common.models.media_items import AudioFormat, ContentType, MediaType from music_assistant.common.models.streamdetails import LoudnessMeasurement, StreamDetails from music_assistant.constants import ( + CONF_EQ_BASS, + CONF_EQ_MID, + CONF_EQ_TREBLE, + CONF_OUTPUT_CHANNELS, CONF_VOLUME_NORMALIZATION, CONF_VOLUME_NORMALIZATION_TARGET, ROOT_LOGGER_NAME, @@ -204,9 +208,7 @@ async def analyze_loudness(mass: MusicAssistant, streamdetails: StreamDetails) - enable_stdout=False, enable_stderr=True, ) as ffmpeg_proc: - - async def writer() -> None: - """Task that grabs the source audio and feeds it to ffmpeg.""" + if streamdetails.direct is None: music_prov = mass.get_provider(streamdetails.provider) chunk_count = 0 async for audio_chunk in music_prov.get_audio_stream(streamdetails): @@ -217,11 +219,6 @@ async def analyze_loudness(mass: MusicAssistant, streamdetails: StreamDetails) - break ffmpeg_proc.write_eof() - if streamdetails.direct is None: - writer_task = ffmpeg_proc.attach_task(writer()) - # wait for the writer task to finish - await writer_task - _, stderr = await ffmpeg_proc.communicate() if loudness_details := _parse_loudnorm(stderr): LOGGER.debug("Loudness measurement for %s: %s", item_name, loudness_details) @@ -384,6 +381,7 @@ async def get_media_stream( # noqa: PLR0915 Other than stripping silence at end and beginning and optional volume normalization this is the pure, unaltered audio data as PCM chunks. """ + logger = LOGGER.getChild("media_stream") bytes_sent = 0 streamdetails.seconds_skipped = seek_position is_radio = streamdetails.media_type == MediaType.RADIO or not streamdetails.duration @@ -397,127 +395,159 @@ async def get_media_stream( # noqa: PLR0915 strip_silence_end = False # collect all arguments for ffmpeg + filter_params = [] + extra_args = [] seek_pos = seek_position if (streamdetails.direct or not streamdetails.can_seek) else 0 - args = await _get_ffmpeg_args( - streamdetails=streamdetails, - pcm_output_format=pcm_format, + if seek_pos: # only use ffmpeg seeking if the provider stream does not support seeking - seek_position=seek_pos, - fade_in=fade_in, + extra_args += ["-ss", str(seek_pos)] + if streamdetails.target_loudness is not None: + # add loudnorm filters + filter_rule = f"loudnorm=I={streamdetails.target_loudness}:LRA=7:tp=-2:offset=-0.5" + if streamdetails.loudness: + filter_rule += f":measured_I={streamdetails.loudness.integrated}" + filter_rule += f":measured_LRA={streamdetails.loudness.lra}" + filter_rule += f":measured_tp={streamdetails.loudness.true_peak}" + filter_rule += f":measured_thresh={streamdetails.loudness.threshold}" + filter_rule += ":print_format=json" + filter_params.append(filter_rule) + if fade_in: + filter_params.append("afade=type=in:start_time=0:duration=3") + ffmpeg_args = await _get_ffmpeg_args( + input_format=streamdetails.audio_format, + output_format=pcm_format, + filter_params=filter_params, + extra_args=extra_args, + input_path=streamdetails.direct or "-", ) - async with AsyncProcess( - args, enable_stdin=streamdetails.direct is None, enable_stderr=True - ) as ffmpeg_proc: - LOGGER.debug("start media stream for: %s", streamdetails.uri) - - async def writer() -> None: - """Task that grabs the source audio and feeds it to ffmpeg.""" - LOGGER.debug("writer started for %s", streamdetails.uri) - music_prov = mass.get_provider(streamdetails.provider) - seek_pos = seek_position if streamdetails.can_seek else 0 - async for audio_chunk in music_prov.get_audio_stream(streamdetails, seek_pos): - await ffmpeg_proc.write(audio_chunk) - # write eof when last packet is received - ffmpeg_proc.write_eof() - LOGGER.debug("writer finished for %s", streamdetails.uri) - - if streamdetails.direct is None: - ffmpeg_proc.attach_task(writer()) - - # get pcm chunks from stdout - # we always stay one chunk behind to properly detect end of chunks - # so we can strip silence at the beginning and end of a track - prev_chunk = b"" - chunk_num = 0 - try: - async for chunk in ffmpeg_proc.iter_chunked(chunk_size): - chunk_num += 1 - if strip_silence_begin and chunk_num == 2: - # first 2 chunks received, strip silence of beginning - stripped_audio = await strip_silence( - mass, - prev_chunk + chunk, - sample_rate=pcm_format.sample_rate, - bit_depth=pcm_format.bit_depth, - ) - yield (False, stripped_audio) - bytes_sent += len(stripped_audio) - prev_chunk = b"" - del stripped_audio - continue - if strip_silence_end and chunk_num >= (expected_chunks - 6): - # last part of the track, collect multiple chunks to strip silence later - prev_chunk += chunk - continue - - # middle part of the track, send previous chunk and collect current chunk - if prev_chunk: - yield (False, prev_chunk) - bytes_sent += len(prev_chunk) + finished = False + logger.debug("start media stream for: %s", streamdetails.uri) - prev_chunk = chunk - - # all chunks received, strip silence of last part - - if strip_silence_end and prev_chunk: - final_chunk = await strip_silence( + writer_task: asyncio.Task | None = None + ffmpeg_proc = AsyncProcess( + ffmpeg_args, enable_stdin=streamdetails.direct is None, enable_stderr=True + ) + await ffmpeg_proc.start() + + async def writer() -> None: + """Task that grabs the source audio and feeds it to ffmpeg.""" + logger.debug("writer started for %s", streamdetails.uri) + music_prov = mass.get_provider(streamdetails.provider) + seek_pos = seek_position if streamdetails.can_seek else 0 + async for audio_chunk in music_prov.get_audio_stream(streamdetails, seek_pos): + await ffmpeg_proc.write(audio_chunk) + # write eof when last packet is received + ffmpeg_proc.write_eof() + logger.debug("writer finished for %s", streamdetails.uri) + + if streamdetails.direct is None: + writer_task = asyncio.create_task(writer()) + + # get pcm chunks from stdout + # we always stay one chunk behind to properly detect end of chunks + # so we can strip silence at the beginning and end of a track + prev_chunk = b"" + chunk_num = 0 + try: + async for chunk in ffmpeg_proc.iter_chunked(chunk_size): + chunk_num += 1 + if strip_silence_begin and chunk_num == 2: + # first 2 chunks received, strip silence of beginning + stripped_audio = await strip_silence( mass, - prev_chunk, + prev_chunk + chunk, sample_rate=pcm_format.sample_rate, bit_depth=pcm_format.bit_depth, - reverse=True, ) - else: - final_chunk = prev_chunk + yield stripped_audio + bytes_sent += len(stripped_audio) + prev_chunk = b"" + del stripped_audio + continue + if strip_silence_end and chunk_num >= (expected_chunks - 6): + # last part of the track, collect multiple chunks to strip silence later + prev_chunk += chunk + continue - # use communicate to read stderr and wait for exit - # read log for loudness measurement (or errors) - _, stderr = await ffmpeg_proc.communicate() - if ffmpeg_proc.returncode != 0: - # ffmpeg has a non zero returncode meaning trouble - LOGGER.getChild("ffmpeg").warning("STREAM ERROR on %s", streamdetails.uri) - LOGGER.getChild("ffmpeg").warning(stderr.decode()) - elif loudness_details := _parse_loudnorm(stderr): + # middle part of the track, send previous chunk and collect current chunk + if prev_chunk: + yield prev_chunk + bytes_sent += len(prev_chunk) + + prev_chunk = chunk + + # all chunks received, strip silence of last part + + if strip_silence_end and prev_chunk: + final_chunk = await strip_silence( + mass, + prev_chunk, + sample_rate=pcm_format.sample_rate, + bit_depth=pcm_format.bit_depth, + reverse=True, + ) + else: + final_chunk = prev_chunk + + # ensure the final chunk is sent + # its important this is done here at the end so we can catch errors first + yield final_chunk + bytes_sent += len(final_chunk) + del final_chunk + del prev_chunk + finished = True + finally: + seconds_streamed = bytes_sent / pcm_sample_size if bytes_sent else 0 + streamdetails.seconds_streamed = seconds_streamed + if finished: + logger.debug( + "finished stream for: %s (%s seconds streamed)", + streamdetails.uri, + seconds_streamed, + ) + # store accurate duration + streamdetails.duration = seek_position + seconds_streamed + else: + logger.debug( + "stream aborted for %s (%s seconds streamed)", + streamdetails.uri, + seconds_streamed, + ) + if writer_task and not writer_task.done(): + writer_task.cancel() + # use communicate to read stderr and wait for exit + # read log for loudness measurement (or errors) + _, stderr = await ffmpeg_proc.communicate() + if ffmpeg_proc.returncode != 0: + # ffmpeg has a non zero returncode meaning trouble + logger.warning("STREAM ERROR on %s", streamdetails.uri) + logger.warning(stderr.decode()) + elif loudness_details := _parse_loudnorm(stderr): + required_seconds = 300 if streamdetails.media_type == MediaType.RADIO else 60 + if finished or seconds_streamed >= required_seconds: LOGGER.debug("Loudness measurement for %s: %s", streamdetails.uri, loudness_details) streamdetails.loudness = loudness_details await mass.music.set_track_loudness( streamdetails.item_id, streamdetails.provider, loudness_details ) - else: - LOGGER.getChild("ffmpeg").debug(stderr.decode()) - - # ensure the final chunk is sent and mark as final - # its important this is done here at the end so we can catch errors first - yield (True, final_chunk) - bytes_sent += len(final_chunk) - del final_chunk - del prev_chunk - - except (asyncio.CancelledError, GeneratorExit): - LOGGER.debug("media stream aborted for: %s", streamdetails.uri) - raise else: - LOGGER.debug("finished media stream for: %s", streamdetails.uri) - # store accurate duration - streamdetails.duration = seek_position + bytes_sent / pcm_sample_size - finally: - # report playback - seconds_streamed = bytes_sent / pcm_sample_size if bytes_sent else 0 - streamdetails.seconds_streamed = seconds_streamed - if seconds_streamed < 20: - mass.create_task( - mass.music.mark_item_played( - streamdetails.media_type, streamdetails.item_id, streamdetails.provider - ) + logger.debug(stderr.decode()) + + # report playback + if finished or seconds_streamed > 30: + mass.create_task( + mass.music.mark_item_played( + streamdetails.media_type, streamdetails.item_id, streamdetails.provider ) - if music_prov := mass.get_provider(streamdetails.provider): - mass.create_task(music_prov.on_streamed(streamdetails, seconds_streamed)) + ) + if music_prov := mass.get_provider(streamdetails.provider): + mass.create_task(music_prov.on_streamed(streamdetails, seconds_streamed)) - if not streamdetails.loudness: - # send loudness analyze job to background worker - # note that we only do this if a track was at least been partially played - mass.create_task(analyze_loudness(mass, streamdetails)) + if not streamdetails.loudness: + # send loudness analyze job to background worker + # note that we only do this if a track was at least been partially played + mass.create_task(analyze_loudness(mass, streamdetails)) async def resolve_radio_stream(mass: MusicAssistant, url: str) -> tuple[str, bool]: @@ -670,6 +700,77 @@ async def get_file_stream( yield data +async def get_ffmpeg_stream( + audio_input: AsyncGenerator[bytes, None] | str, + input_format: AudioFormat, + output_format: AudioFormat, + filter_params: list[str] | None = None, + extra_args: list[str] | None = None, + chunk_size: int | None = None, +) -> AsyncGenerator[bytes, None]: + """ + Get the ffmpeg audio stream as async generator. + + Takes care of resampling and/or recoding if needed, + according to player preferences. + """ + logger = LOGGER.getChild("media_stream") + use_stdin = not isinstance(audio_input, str) + if input_format == output_format and not filter_params and not chunk_size and use_stdin: + # edge case: input and output exactly the same, we can bypass ffmpeg + # return the raw input stream, no actions needed here + async for chunk in audio_input: + yield chunk + return + + ffmpeg_args = await _get_ffmpeg_args( + input_format=input_format, + output_format=output_format, + filter_params=filter_params or [], + extra_args=extra_args or [], + input_path="-" if use_stdin else audio_input, + output_path="-", + ) + + writer_task: asyncio.Task | None = None + ffmpeg_proc = AsyncProcess( + ffmpeg_args, enable_stdin=use_stdin, enable_stdout=True, enable_stderr=True + ) + await ffmpeg_proc.start() + + # feed stdin with pcm audio chunks from origin + async def writer() -> None: + async for chunk in audio_input: + if ffmpeg_proc.closed: + return + await ffmpeg_proc.write(chunk) + ffmpeg_proc.write_eof() + + try: + if not isinstance(audio_input, str): + writer_task = asyncio.create_task(writer()) + + # read final chunks from stdout + chunk_size = chunk_size or get_chunksize(output_format, 1) + async for chunk in ffmpeg_proc.iter_chunked(chunk_size): + try: + yield chunk + except (BrokenPipeError, ConnectionResetError): + # race condition + break + finally: + if writer_task and not writer_task.done(): + writer_task.cancel() + # use communicate to read stderr and wait for exit + # read log for loudness measurement (or errors) + _, stderr = await ffmpeg_proc.communicate() + if ffmpeg_proc.returncode != 0: + # ffmpeg has a non zero returncode meaning trouble + logger.warning("FFMPEG ERROR\n%s", stderr.decode()) + else: + logger.debug(stderr.decode()) + + async def check_audio_support() -> tuple[bool, bool, str]: """Check if ffmpeg is present (with/without libsoxr support).""" cache_key = "audio_support_cache" @@ -702,7 +803,7 @@ async def get_preview_stream( "ffmpeg", "-hide_banner", "-loglevel", - "quiet", + "info", "-ignore_unknown", ] if streamdetails.direct: @@ -715,22 +816,30 @@ async def get_preview_stream( output_args = ["-to", "30", "-f", "mp3", "-"] args = input_args + output_args - async with AsyncProcess(args, True) as ffmpeg_proc: - async def writer() -> None: - """Task that grabs the source audio and feeds it to ffmpeg.""" - music_prov = mass.get_provider(streamdetails.provider) - async for audio_chunk in music_prov.get_audio_stream(streamdetails, 30): - await ffmpeg_proc.write(audio_chunk) - # write eof when last packet is received - ffmpeg_proc.write_eof() + writer_task: asyncio.Task | None = None + ffmpeg_proc = AsyncProcess(args, enable_stdin=True, enable_stdout=True, enable_stderr=False) + await ffmpeg_proc.start() - if not streamdetails.direct: - ffmpeg_proc.attach_task(writer()) + async def writer() -> None: + """Task that grabs the source audio and feeds it to ffmpeg.""" + music_prov = mass.get_provider(streamdetails.provider) + async for audio_chunk in music_prov.get_audio_stream(streamdetails, 30): + await ffmpeg_proc.write(audio_chunk) + # write eof when last packet is received + ffmpeg_proc.write_eof() - # yield chunks from stdout + if not streamdetails.direct: + writer_task = asyncio.create_task(writer()) + + # yield chunks from stdout + try: async for chunk in ffmpeg_proc.iter_any(): yield chunk + finally: + if writer_task and not writer_task.done(): + writer_task.cancel() + await ffmpeg_proc.close() async def get_silence( @@ -767,7 +876,7 @@ async def get_silence( "-t", str(duration), "-f", - output_format.output_fmt.value, + output_format.output_format_str, "-", ] async with AsyncProcess(args) as ffmpeg_proc: @@ -794,11 +903,42 @@ def get_chunksize( return int((320000 / 8) * seconds) +def get_player_filter_params( + mass: MusicAssistant, + player_id: str, +) -> list[str]: + """Get player specific filter parameters for ffmpeg (if any).""" + # collect all players-specific filter args + # TODO: add convolution/DSP/roomcorrections here?! + filter_params = [] + + # the below is a very basic 3-band equalizer, + # this could be a lot more sophisticated at some point + if (eq_bass := mass.config.get_raw_player_config_value(player_id, CONF_EQ_BASS, 0)) != 0: + filter_params.append(f"equalizer=frequency=100:width=200:width_type=h:gain={eq_bass}") + if (eq_mid := mass.config.get_raw_player_config_value(player_id, CONF_EQ_MID, 0)) != 0: + filter_params.append(f"equalizer=frequency=900:width=1800:width_type=h:gain={eq_mid}") + if (eq_treble := mass.config.get_raw_player_config_value(player_id, CONF_EQ_TREBLE, 0)) != 0: + filter_params.append(f"equalizer=frequency=9000:width=18000:width_type=h:gain={eq_treble}") + # handle output mixing only left or right + conf_channels = mass.config.get_raw_player_config_value( + player_id, CONF_OUTPUT_CHANNELS, "stereo" + ) + if conf_channels == "left": + filter_params.append("pan=mono|c0=FL") + elif conf_channels == "right": + filter_params.append("pan=mono|c0=FR") + + return filter_params + + async def _get_ffmpeg_args( - streamdetails: StreamDetails, - pcm_output_format: AudioFormat, - seek_position: int = 0, - fade_in: bool = False, + input_format: AudioFormat, + output_format: AudioFormat, + filter_params: list[str], + extra_args: list[str], + input_path: str = "-", + output_path: str = "-", ) -> list[str]: """Collect all args to send to the ffmpeg process.""" ffmpeg_present, libsoxr_support, version = await check_audio_support() @@ -827,76 +967,51 @@ async def _get_ffmpeg_args( # collect input args input_args = [ "-ac", - str(streamdetails.audio_format.channels), + str(input_format.channels), "-channel_layout", - "mono" if streamdetails.audio_format.channels == 1 else "stereo", + "mono" if input_format.channels == 1 else "stereo", ] - if seek_position: - input_args += ["-ss", str(seek_position)] - if streamdetails.direct: - # ffmpeg can access the inputfile (or url) directly - if streamdetails.direct.startswith("http"): - # append reconnect options for direct stream from http + if input_format.content_type.is_pcm(): + input_args += ["-ar", str(input_format.sample_rate)] + if input_path.startswith("http"): + # append reconnect options for direct stream from http + input_args += [ + "-reconnect", + "1", + "-reconnect_streamed", + "1", + "-reconnect_delay_max", + "10", + ] + if major_version > 4: + # these options are only supported in ffmpeg > 5 input_args += [ - "-reconnect", + "-reconnect_on_network_error", "1", - "-reconnect_streamed", - "1", - "-reconnect_delay_max", - "10", + "-reconnect_on_http_error", + "5xx", ] - if major_version > 4: - # these options are only supported in ffmpeg > 5 - input_args += [ - "-reconnect_on_network_error", - "1", - "-reconnect_on_http_error", - "5xx", - ] - - input_args += ["-i", streamdetails.direct] - else: - # the input is received from pipe/stdin - if streamdetails.audio_format.content_type != ContentType.UNKNOWN: - input_args += ["-f", streamdetails.audio_format.content_type.value] - input_args += [ - "-i", - "-", - ] + if input_format.content_type != ContentType.UNKNOWN: + input_args += ["-f", input_format.content_type.value] + input_args += ["-i", input_path] # collect output args output_args = [ "-acodec", - pcm_output_format.content_type.name.lower(), + output_format.content_type.name.lower(), "-f", - pcm_output_format.content_type.value, + output_format.content_type.value, "-ac", - str(pcm_output_format.channels), + str(output_format.channels), "-ar", - str(pcm_output_format.sample_rate), - "-", + str(output_format.sample_rate), + output_path, ] - # collect extra and filter args - extra_args = [] - filter_params = [] - if streamdetails.target_loudness is not None: - filter_rule = f"loudnorm=I={streamdetails.target_loudness}:LRA=7:tp=-2:offset=-0.5" - if streamdetails.loudness: - filter_rule += f":measured_I={streamdetails.loudness.integrated}" - filter_rule += f":measured_LRA={streamdetails.loudness.lra}" - filter_rule += f":measured_tp={streamdetails.loudness.true_peak}" - filter_rule += f":measured_thresh={streamdetails.loudness.threshold}" - filter_rule += ":print_format=json" - filter_params.append(filter_rule) - if ( - streamdetails.audio_format.sample_rate != pcm_output_format.sample_rate - and libsoxr_support - and streamdetails.media_type == MediaType.TRACK - ): - # prefer libsoxr high quality resampler (if present) for sample rate conversions + + # prefer libsoxr high quality resampler (if present) for sample rate conversions + if input_format.sample_rate != output_format.sample_rate and libsoxr_support: filter_params.append("aresample=resampler=soxr") - if fade_in: - filter_params.append("afade=type=in:start_time=0:duration=3") + if filter_params: extra_args += ["-af", ",".join(filter_params)] diff --git a/music_assistant/server/helpers/didl_lite.py b/music_assistant/server/helpers/didl_lite.py index 159be910..c3a27314 100644 --- a/music_assistant/server/helpers/didl_lite.py +++ b/music_assistant/server/helpers/didl_lite.py @@ -33,12 +33,14 @@ def create_didl_metadata( "" ) is_radio = queue_item.media_type != MediaType.TRACK or not queue_item.duration - image_url = mass.metadata.get_image_url(queue_item.image) if queue_item.image else "" + image_url = ( + mass.metadata.get_image_url(queue_item.image) if queue_item.image else MASS_LOGO_ONLINE + ) if is_radio: # radio or other non-track item return ( '' - '' + f'' f"{escape_string(queue_item.name)}" f"{escape_string(image_url)}" f"{queue_item.queue_item_id}" diff --git a/music_assistant/server/helpers/process.py b/music_assistant/server/helpers/process.py index eeab4f65..8668b499 100644 --- a/music_assistant/server/helpers/process.py +++ b/music_assistant/server/helpers/process.py @@ -12,7 +12,7 @@ from contextlib import suppress from typing import TYPE_CHECKING if TYPE_CHECKING: - from collections.abc import AsyncGenerator, Coroutine + from collections.abc import AsyncGenerator LOGGER = logging.getLogger(__name__) @@ -37,9 +37,18 @@ class AsyncProcess: self._enable_stdin = enable_stdin self._enable_stdout = enable_stdout self._enable_stderr = enable_stderr - self._attached_task: asyncio.Task = None - self.closed = False - self.returncode: int | None = None + + @property + def closed(self) -> bool: + """Return if the process was closed.""" + return self.returncode is not None + + @property + def returncode(self) -> int | None: + """Return the erturncode of the process.""" + if self._proc is None: + return None + return self._proc.returncode async def __aenter__(self) -> AsyncProcess: """Enter context manager.""" @@ -119,37 +128,27 @@ class AsyncProcess: # already exited, race condition pass - async def close(self) -> None: - """Close/terminate the process.""" - self.closed = True - if self._attached_task and not self._attached_task.done(): - with suppress(asyncio.CancelledError): - self._attached_task.cancel() + async def close(self) -> int: + """Close/terminate the process and wait for exit.""" + if self.returncode is not None: + return self.returncode # make sure the process is cleaned up - self.write_eof() - if self._proc.returncode is None: - try: - async with asyncio.timeout(10): - await self.communicate() - except TimeoutError: - self._proc.kill() - await self.wait() + try: + async with asyncio.timeout(10): + await self.communicate() + except (TimeoutError, asyncio.CancelledError): + self._proc.terminate() + return await self.wait() async def wait(self) -> int: """Wait for the process and return the returncode.""" if self.returncode is not None: return self.returncode - if self._proc.returncode is not None: - self.returncode = self._proc.returncode - return self.returncode - self.returncode = await self._proc.wait() - self.closed = True - return self.returncode + return await self._proc.wait() async def communicate(self, input_data: bytes | None = None) -> tuple[bytes, bytes]: """Write bytes to process and read back results.""" stdout, stderr = await self._proc.communicate(input_data) - self.returncode = self._proc.returncode return (stdout, stderr) async def read_stderr(self, n: int = -1) -> bytes: @@ -161,11 +160,6 @@ class AsyncProcess: """ return await self._proc.stderr.read(n) - def attach_task(self, coro: Coroutine) -> asyncio.Task: - """Attach given coro func as reader/writer task to properly cancel it when needed.""" - self._attached_task = task = asyncio.create_task(coro) - return task - async def check_output(shell_cmd: str) -> tuple[int, bytes]: """Run shell subprocess and return output.""" diff --git a/music_assistant/server/helpers/tags.py b/music_assistant/server/helpers/tags.py index 8f5830a0..d917ef30 100644 --- a/music_assistant/server/helpers/tags.py +++ b/music_assistant/server/helpers/tags.py @@ -2,6 +2,7 @@ from __future__ import annotations +import asyncio import json import logging import os @@ -366,51 +367,54 @@ async def parse_tags( file_path, ) - async with AsyncProcess( + writer_task: asyncio.Task | None = None + ffmpeg_proc = AsyncProcess( args, enable_stdin=file_path == "-", enable_stdout=True, enable_stderr=False - ) as proc: - if file_path == "-": - # feed the file contents to the process - - async def chunk_feeder() -> None: - bytes_read = 0 - try: - async for chunk in input_file: - if proc.closed: - break - await proc.write(chunk) - bytes_read += len(chunk) - del chunk - if bytes_read > 25 * 1000000: - # this is possibly a m4a file with 'moove atom' metadata at the - # end of the file - # we'll have to read the entire file to do something with it - # for now we just ignore/deny these files - LOGGER.error("Found file with tags not present at beginning of file") - break - finally: - proc.write_eof() - - proc.attach_task(chunk_feeder()) - - try: - res = await proc.read(-1) - data = json.loads(res) - if error := data.get("error"): - raise InvalidDataError(error["string"]) - if not data.get("streams"): - msg = "Not an audio file" - raise InvalidDataError(msg) - tags = AudioTags.parse(data) - del res - del data - if not tags.duration and file_size and tags.bit_rate: - # estimate duration from filesize/bitrate - tags.duration = int((file_size * 8) / tags.bit_rate) - return tags - except (KeyError, ValueError, JSONDecodeError, InvalidDataError) as err: - msg = f"Unable to retrieve info for {file_path}: {err!s}" - raise InvalidDataError(msg) from err + ) + await ffmpeg_proc.start() + + async def writer() -> None: + bytes_read = 0 + async for chunk in input_file: + if ffmpeg_proc.closed: + break + await ffmpeg_proc.write(chunk) + bytes_read += len(chunk) + del chunk + if bytes_read > 25 * 1000000: + # this is possibly a m4a file with 'moove atom' metadata at the + # end of the file + # we'll have to read the entire file to do something with it + # for now we just ignore/deny these files + LOGGER.error("Found file with tags not present at beginning of file") + break + + if file_path == "-": + # feed the file contents to the process + writer_task = asyncio.create_task(writer) + + try: + res = await ffmpeg_proc.read(-1) + data = json.loads(res) + if error := data.get("error"): + raise InvalidDataError(error["string"]) + if not data.get("streams"): + msg = "Not an audio file" + raise InvalidDataError(msg) + tags = AudioTags.parse(data) + del res + del data + if not tags.duration and file_size and tags.bit_rate: + # estimate duration from filesize/bitrate + tags.duration = int((file_size * 8) / tags.bit_rate) + return tags + except (KeyError, ValueError, JSONDecodeError, InvalidDataError) as err: + msg = f"Unable to retrieve info for {file_path}: {err!s}" + raise InvalidDataError(msg) from err + finally: + if writer_task and not writer_task.done(): + writer_task.cancel() + await ffmpeg_proc.close() async def get_embedded_image(input_file: str | AsyncGenerator[bytes, None]) -> bytes | None: @@ -436,20 +440,27 @@ async def get_embedded_image(input_file: str | AsyncGenerator[bytes, None]) -> b "-", ) - async with AsyncProcess( + writer_task: asyncio.Task | None = None + ffmpeg_proc = AsyncProcess( args, enable_stdin=file_path == "-", enable_stdout=True, enable_stderr=False - ) as proc: - if file_path == "-": - # feed the file contents to the process - async def chunk_feeder() -> None: - try: - async for chunk in input_file: - if proc.closed: - break - await proc.write(chunk) - finally: - proc.write_eof() - - proc.attach_task(chunk_feeder()) - - return await proc.read(-1) + ) + await ffmpeg_proc.start() + + async def writer() -> None: + async for chunk in input_file: + if ffmpeg_proc.closed: + break + await ffmpeg_proc.write(chunk) + ffmpeg_proc.write_eof() + + # feed the file contents to the process stdin + if file_path == "-": + writer_task = asyncio.create_task(writer) + + # return image bytes from stdout + try: + return await ffmpeg_proc.read(-1) + finally: + if writer_task and not writer_task.cancelled(): + writer_task.cancel() + await ffmpeg_proc.close() diff --git a/music_assistant/server/models/player_provider.py b/music_assistant/server/models/player_provider.py index cfacf47e..85b05576 100644 --- a/music_assistant/server/models/player_provider.py +++ b/music_assistant/server/models/player_provider.py @@ -22,7 +22,6 @@ from .provider import Provider if TYPE_CHECKING: from music_assistant.common.models.player import Player from music_assistant.common.models.queue_item import QueueItem - from music_assistant.server.controllers.streams import MultiClientStreamJob # ruff: noqa: ARG001, ARG002 @@ -123,13 +122,6 @@ class PlayerProvider(Provider): """ raise NotImplementedError - async def play_stream(self, player_id: str, stream_job: MultiClientStreamJob) -> None: - """Handle PLAY STREAM on given player. - - This is a special feature from the Universal Group provider. - """ - raise NotImplementedError - async def enqueue_next_queue_item(self, player_id: str, queue_item: QueueItem) -> None: """ Handle enqueuing of the next queue item on the player. diff --git a/music_assistant/server/providers/airplay/__init__.py b/music_assistant/server/providers/airplay/__init__.py index 7cdbfe21..263b839d 100644 --- a/music_assistant/server/providers/airplay/__init__.py +++ b/music_assistant/server/providers/airplay/__init__.py @@ -17,10 +17,14 @@ from zeroconf import IPVersion, ServiceStateChange from zeroconf.asyncio import AsyncServiceInfo from music_assistant.common.helpers.datetime import utc -from music_assistant.common.helpers.util import empty_queue, get_ip_pton, select_free_port +from music_assistant.common.helpers.util import get_ip_pton, select_free_port from music_assistant.common.models.config_entries import ( CONF_ENTRY_CROSSFADE, CONF_ENTRY_CROSSFADE_DURATION, + CONF_ENTRY_EQ_BASS, + CONF_ENTRY_EQ_MID, + CONF_ENTRY_EQ_TREBLE, + CONF_ENTRY_OUTPUT_CHANNELS, CONF_ENTRY_SYNC_ADJUST, ConfigEntry, ConfigValueType, @@ -37,6 +41,7 @@ from music_assistant.common.models.media_items import AudioFormat from music_assistant.common.models.player import DeviceInfo, Player from music_assistant.common.models.player_queue import PlayerQueue from music_assistant.constants import CONF_SYNC_ADJUST +from music_assistant.server.helpers.audio import get_ffmpeg_stream, get_player_filter_params from music_assistant.server.helpers.process import check_output from music_assistant.server.models.player_provider import PlayerProvider @@ -45,7 +50,6 @@ if TYPE_CHECKING: from music_assistant.common.models.provider import ProviderManifest from music_assistant.common.models.queue_item import QueueItem from music_assistant.server import MusicAssistant - from music_assistant.server.controllers.streams import MultiClientStreamJob from music_assistant.server.models import ProviderInstanceType DOMAIN = "airplay" @@ -59,6 +63,10 @@ CONF_PASSWORD = "password" PLAYER_CONFIG_ENTRIES = ( CONF_ENTRY_CROSSFADE, CONF_ENTRY_CROSSFADE_DURATION, + CONF_ENTRY_EQ_BASS, + CONF_ENTRY_EQ_MID, + CONF_ENTRY_EQ_TREBLE, + CONF_ENTRY_OUTPUT_CHANNELS, ConfigEntry( key=CONF_ENCRYPTION, type=ConfigEntryType.BOOLEAN, @@ -177,7 +185,7 @@ class AirplayStreamJob: self.active_remote_id: str = str(randint(1000, 8000)) self.start_ntp: int | None = None # use as checksum self.prevent_playback: bool = False - self._audio_buffer = asyncio.Queue(2) + self._audio_iterator: AsyncGenerator[bytes, None] | None = None self._log_reader_task: asyncio.Task | None = None self._audio_reader_task: asyncio.Task | None = None self._cliraop_proc: asyncio.subprocess.Process | None = None @@ -192,9 +200,10 @@ class AirplayStreamJob: and self._cliraop_proc.returncode is None ) - async def init_cliraop(self, start_ntp: int) -> None: + async def start(self, start_ntp: int, audio_iterator: AsyncGenerator[bytes, None]) -> None: """Initialize CLIRaop process for a player.""" self.start_ntp = start_ntp + self._audio_iterator = audio_iterator extra_args = [] player_id = self.airplay_player.player_id mass_player = self.mass.players.get(player_id) @@ -202,9 +211,6 @@ class AirplayStreamJob: extra_args += ["-encrypt"] if self.mass.config.get_raw_player_config_value(player_id, CONF_ALAC_ENCODE, True): extra_args += ["-alac"] - if "airport" in mass_player.device_info.model.lower(): - # enforce auth on airport express - extra_args += ["-auth"] for prop in ("et", "md", "am", "pk", "pw"): if prop_value := self.airplay_player.discovery_info.decoded_properties.get(prop): extra_args += [f"-{prop}", prop_value] @@ -213,8 +219,6 @@ class AirplayStreamJob: if device_password := self.mass.config.get_raw_player_config_value( player_id, CONF_PASSWORD, None ): - # NOTE: This may not work as we might need to do - # some fancy hashing with the plain password first?! extra_args += ["-password", device_password] if self.prov.log_level == "DEBUG": extra_args += ["-debug", "5"] @@ -258,20 +262,21 @@ class AirplayStreamJob: return await self.send_cli_command("ACTION=STOP") self._stop_requested = True + if not force: + return # stop background tasks - if self._log_reader_task and not self._log_reader_task.done(): - if force: - self._log_reader_task.cancel() - with suppress(asyncio.CancelledError): - await self._log_reader_task if self._audio_reader_task and not self._audio_reader_task.done(): - if force: - self._audio_reader_task.cancel() with suppress(asyncio.CancelledError): + self._audio_reader_task.cancel() await self._audio_reader_task - - empty_queue(self._audio_buffer) - await asyncio.wait_for(self._cliraop_proc.communicate(), 30) + if self._log_reader_task and not self._log_reader_task.done(): + with suppress(asyncio.CancelledError): + self._log_reader_task.cancel() + await self._log_reader_task + with suppress(TimeoutError): + await asyncio.wait_for(self._cliraop_proc.communicate(), 5) + if self._cliraop_proc.returncode is None: + self._cliraop_proc.kill() async def send_cli_command(self, command: str) -> None: """Send an interactive command to the running CLIRaop binary.""" @@ -356,35 +361,96 @@ class AirplayStreamJob: self.mass.players.update(airplay_player.player_id) async def _audio_reader(self) -> None: - """Read audio chunks from buffer and send them to the cliraop process.""" + """Read audio chunks and send them to the cliraop process.""" logger = self.airplay_player.logger - logger.debug("Audio reader started") - while self.running: - chunk = await self._audio_buffer.get() - if chunk == b"EOF": - # EOF chunk - break + mass_player = self.mass.players.get(self.airplay_player.player_id, True) + queue = self.mass.player_queues.get_active_queue(mass_player.active_source) + logger.debug( + "Starting RAOP stream for Queue %s to %s", + queue.display_name, + mass_player.display_name, + ) + prev_metadata_checksum: str = "" + prev_progress_report: float = 0 + async for chunk in self._audio_iterator: + if not self.running: + return self._cliraop_proc.stdin.write(chunk) - with suppress(BrokenPipeError, ConnectionResetError): + try: await self._cliraop_proc.stdin.drain() + except (BrokenPipeError, ConnectionResetError): + break + # send metadata to player(s) if needed + # NOTE: this must all be done in separate tasks to not disturb audio + now = time.time() + if queue and queue.current_item and queue.current_item.streamdetails: + metadata_checksum = ( + queue.current_item.streamdetails.stream_title + or queue.current_item.queue_item_id + ) + if prev_metadata_checksum != metadata_checksum: + prev_metadata_checksum = metadata_checksum + prev_progress_report = now + self.mass.create_task(self._send_metadata(queue)) + # send the progress report every 5 seconds + elif now - prev_progress_report >= 5: + prev_progress_report = now + self.mass.create_task(self._send_progress(queue)) # send EOF if self._cliraop_proc.returncode is None and not self._cliraop_proc.stdin.is_closing(): self._cliraop_proc.stdin.write_eof() with suppress(BrokenPipeError, ConnectionResetError): await self._cliraop_proc.stdin.drain() - logger.debug("Audio reader finished") + logger.debug( + "Finished RAOP stream for Queue %s to %s", + queue.display_name, + mass_player.display_name, + ) - async def write_chunk(self, data: bytes) -> None: - """Write a chunk of (pcm) data to the audio buffer.""" - if not self.running: + async def _send_metadata(self, queue: PlayerQueue) -> None: + """Send metadata to player (and connected sync childs).""" + if not queue or not queue.current_item: return - await self._audio_buffer.put(data) + duration = min(queue.current_item.duration or 0, 3600) + title = queue.current_item.name + artist = "" + album = "" + if queue.current_item.streamdetails and queue.current_item.streamdetails.stream_title: + # stream title from radio station + stream_title = queue.current_item.streamdetails.stream_title + if " - " in stream_title: + artist, title = stream_title.split(" - ", 1) + else: + title = stream_title + # set album to radio station name + album = queue.current_item.name + if media_item := queue.current_item.media_item: + if artist_str := getattr(media_item, "artist_str", None): + artist = artist_str + if _album := getattr(media_item, "album", None): + album = _album.name - async def write_eof(self) -> None: - """Write end-of-file chunk to the audo buffer.""" - if not self.running: + cmd = f"TITLE={title or 'Music Assistant'}\nARTIST={artist}\nALBUM={album}\n" + cmd += f"DURATION={duration}\nPROGRESS=0\nACTION=SENDMETA\n" + + await self.send_cli_command(cmd) + + # get image + if not queue.current_item.image: return - await self._audio_buffer.put(b"EOF") + + # the image format needs to be 500x500 jpeg for maximum compatibility with players + image_url = self.mass.metadata.get_image_url( + queue.current_item.image, size=500, prefer_proxy=True, image_format="jpeg" + ) + await self.send_cli_command(f"ARTWORK={image_url}\n") + + async def _send_progress(self, queue: PlayerQueue) -> None: + """Send progress report to player (and connected sync childs).""" + if not queue or not queue.current_item: + return + progress = int(queue.corrected_elapsed_time) + await self.send_cli_command(f"PROGRESS={progress}\n") @dataclass @@ -404,7 +470,6 @@ class AirplayProvider(PlayerProvider): cliraop_bin: str | None = None _players: dict[str, AirPlayPlayer] _discovery_running: bool = False - _stream_tasks: dict[str, asyncio.Task] _dacp_server: asyncio.Server = None _dacp_info: AsyncServiceInfo = None @@ -416,7 +481,6 @@ class AirplayProvider(PlayerProvider): async def handle_async_init(self) -> None: """Handle async initialization of the provider.""" self._players = {} - self._stream_tasks = {} self.cliraop_bin = await self._getcliraop_binary() dacp_port = await select_free_port(39831, 49831) self.dacp_id = dacp_id = f"{randrange(2 ** 64):X}" @@ -504,8 +568,6 @@ class AirplayProvider(PlayerProvider): - player_id: player_id of the player to handle the command. """ - if existing_stream := self._stream_tasks.get(player_id): - existing_stream.cancel() async def stop_player(airplay_player: AirPlayPlayer) -> None: if airplay_player.active_stream: @@ -560,80 +622,41 @@ class AirplayProvider(PlayerProvider): - seek_position: Optional seek to this position. - fade_in: Optionally fade in the item at playback start. """ + player = self.mass.players.get(player_id) + if player.synced_to: + # should not happen, but just in case + raise RuntimeError("Player is synced") # fix race condition where resync and play media are called at more or less the same time if self._resync_handle: self._resync_handle.cancel() self._resync_handle = None # always stop existing stream first - if existing_stream := self._stream_tasks.get(player_id): - existing_stream.cancel() for airplay_player in self._get_sync_clients(player_id): if airplay_player.active_stream and airplay_player.active_stream.running: self.mass.create_task(airplay_player.active_stream.stop(force=True)) - # start streaming the queue (pcm) audio in a background task - queue = self.mass.player_queues.get_active_queue(player_id) - self._stream_tasks[player_id] = asyncio.create_task( - self._stream_audio( - player_id, - queue=queue, - audio_iterator=self.mass.streams.get_flow_stream( - queue, - start_queue_item=queue_item, - pcm_format=AudioFormat( - content_type=ContentType.PCM_S16LE, - sample_rate=44100, - bit_depth=16, - channels=2, - ), - seek_position=seek_position, - fade_in=fade_in, - ), - ) + pcm_format = AudioFormat( + content_type=ContentType.PCM_S16LE, + sample_rate=44100, + bit_depth=16, + channels=2, ) - async def play_stream(self, player_id: str, stream_job: MultiClientStreamJob) -> None: - """Handle PLAY STREAM on given player. - - This is a special feature from the Universal Group provider. - """ - # fix race condition where resync and play media are called at more or less the same time - if self._resync_handle: - self._resync_handle.cancel() - self._resync_handle = None - # always stop existing stream first - if existing_stream := self._stream_tasks.get(player_id): - existing_stream.cancel() - async with asyncio.TaskGroup() as tg: - for airplay_player in self._get_sync_clients(player_id): - if airplay_player.active_stream and airplay_player.active_stream.running: - tg.create_task(airplay_player.active_stream.stop(force=True)) - if stream_job.pcm_format.bit_depth != 16 or stream_job.pcm_format.sample_rate != 44100: - # TODO: resample on the fly here ? - raise RuntimeError("Unsupported PCM format") - # start streaming the queue (pcm) audio in a background task - queue = self.mass.player_queues.get_active_queue(player_id) - self._stream_tasks[player_id] = asyncio.create_task( - self._stream_audio( - player_id, - queue=queue, - audio_iterator=stream_job.subscribe(player_id), + if queue_item.queue_item_id == "flow": + # handle special case for UGP multi client stream + stream_job = self.mass.streams.multi_client_jobs.get(queue_item.queue_id) + elif player.group_childs: + # create a new multi client flow stream + stream_job = await self.mass.streams.create_multi_client_stream_job( + queue_item.queue_id, + queue_item, + seek_position=seek_position, + fade_in=fade_in, + pcm_bit_depth=16, + pcm_sample_rate=44100, ) - ) - - async def _stream_audio( - self, player_id: str, queue: PlayerQueue, audio_iterator: AsyncGenerator[bytes, None] - ) -> None: - """Handle the actual streaming of audio to Airplay.""" - player = self.mass.players.get(player_id) - if player.synced_to: - # should not happen, but just in case - raise RuntimeError("Player is synced") - synced_player_ids = [x.player_id for x in self._get_sync_clients(player_id)] - self.logger.debug( - "Starting RAOP stream for Queue %s to %s", - queue.display_name, - "/".join(synced_player_ids), - ) + else: + # for a single player we just consume the flow stream directly + stream_job = None # Python is not suitable for realtime audio streaming. # So, I've decided to go the fancy route here. I've created a small binary @@ -646,60 +669,30 @@ class AirplayProvider(PlayerProvider): start_ntp = int(stdout.strip()) # setup Raop process for player and its sync childs - for airplay_player in self._get_sync_clients(player_id): - airplay_player.active_stream = AirplayStreamJob(self, airplay_player) - await airplay_player.active_stream.init_cliraop(start_ntp) - prev_metadata_checksum: str = "" - prev_progress_report: float = 0 - async for pcm_chunk in audio_iterator: - # send audio chunk to player(s) - available_clients = 0 + async with asyncio.TaskGroup() as tg: for airplay_player in self._get_sync_clients(player_id): - if ( - not airplay_player.active_stream - or not airplay_player.active_stream.running - or airplay_player.active_stream.start_ntp != start_ntp - ): - # catch when this stream is no longer active on the player - continue - available_clients += 1 - await airplay_player.active_stream.write_chunk(pcm_chunk) - if not available_clients: - # this streamjob is no longer active - return - - # send metadata to player(s) if needed - # NOTE: this must all be done in separate tasks to not disturb audio - now = time.time() - if queue and queue.current_item and queue.current_item.streamdetails: - metadata_checksum = ( - queue.current_item.streamdetails.stream_title - or queue.current_item.queue_item_id - ) - if prev_metadata_checksum != metadata_checksum: - prev_metadata_checksum = metadata_checksum - prev_progress_report = now - self.mass.create_task(self._send_metadata(player_id, queue)) - # send the progress report every 5 seconds - elif now - prev_progress_report >= 5: - prev_progress_report = now - self.mass.create_task(self._send_progress(player_id, queue)) - - # end of stream reached - write eof - self.logger.debug( - "Finished RAOP stream for Queue %s to %s", - queue.display_name, - "/".join(synced_player_ids), - ) - for airplay_player in self._get_sync_clients(player_id): - if ( - not airplay_player.active_stream - or not airplay_player.active_stream.running - or airplay_player.active_stream.start_ntp != start_ntp - ): - # this may not happen, but guard just in case - continue - await airplay_player.active_stream.write_eof() + if stream_job: + stream_job.expected_players.add(airplay_player.player_id) + audio_iterator = stream_job.subscribe( + player_id=airplay_player.player_id, + output_format=pcm_format, + ) + else: + queue = self.mass.player_queues.get_active_queue(queue_item.queue_id) + audio_iterator = get_ffmpeg_stream( + self.mass.streams.get_flow_stream( + queue, + start_queue_item=queue_item, + pcm_format=pcm_format, + seek_position=seek_position, + fade_in=fade_in, + ), + input_format=pcm_format, + output_format=pcm_format, + filter_params=get_player_filter_params(self.mass, airplay_player.player_id), + ) + airplay_player.active_stream = AirplayStreamJob(self, airplay_player) + tg.create_task(airplay_player.active_stream.start(start_ntp, audio_iterator)) async def cmd_volume_set(self, player_id: str, volume_level: int) -> None: """Send VOLUME_SET command to given player. @@ -960,7 +953,7 @@ class AirplayProvider(PlayerProvider): # device switched to another source (or is powered off) if active_stream := airplay_player.active_stream: # ignore this if we just started playing to prevent false positives - if mass_player.elapsed_time > 2 and mass_player.state == PlayerState.PLAYING: + if mass_player.elapsed_time > 10 and mass_player.state == PlayerState.PLAYING: active_stream.prevent_playback = True self.mass.create_task(self.monitor_prevent_playback(player_id)) elif "device-prevent-playback=0" in path: @@ -987,60 +980,6 @@ class AirplayProvider(PlayerProvider): finally: writer.close() - async def _send_metadata(self, player_id: str, queue: PlayerQueue) -> None: - """Send metadata to player (and connected sync childs).""" - if not queue or not queue.current_item: - return - duration = min(queue.current_item.duration or 0, 3600) - title = queue.current_item.name - artist = "" - album = "" - if queue.current_item.streamdetails and queue.current_item.streamdetails.stream_title: - # stream title from radio station - stream_title = queue.current_item.streamdetails.stream_title - if " - " in stream_title: - artist, title = stream_title.split(" - ", 1) - else: - title = stream_title - # set album to radio station name - album = queue.current_item.name - if media_item := queue.current_item.media_item: - if artist_str := getattr(media_item, "artist_str", None): - artist = artist_str - if _album := getattr(media_item, "album", None): - album = _album.name - - cmd = f"TITLE={title or 'Music Assistant'}\nARTIST={artist}\nALBUM={album}\n" - cmd += f"DURATION={duration}\nPROGRESS=0\nACTION=SENDMETA\n" - - for airplay_player in self._get_sync_clients(player_id): - if not airplay_player.active_stream or not airplay_player.active_stream.running: - continue - await airplay_player.active_stream.send_cli_command(cmd) - - # get image - if not queue.current_item.image: - return - - # the image format needs to be 500x500 jpeg for maximum compatibility with players - image_url = self.mass.metadata.get_image_url( - queue.current_item.image, size=500, prefer_proxy=True, image_format="jpeg" - ) - for airplay_player in self._get_sync_clients(player_id): - if not airplay_player.active_stream or not airplay_player.active_stream.running: - continue - await airplay_player.active_stream.send_cli_command(f"ARTWORK={image_url}\n") - - async def _send_progress(self, player_id: str, queue: PlayerQueue) -> None: - """Send progress report to player (and connected sync childs).""" - if not queue or not queue.current_item: - return - progress = int(queue.corrected_elapsed_time) - for airplay_player in self._get_sync_clients(player_id): - if not airplay_player.active_stream or not airplay_player.active_stream.running: - continue - await airplay_player.active_stream.send_cli_command(f"PROGRESS={progress}\n") - async def monitor_prevent_playback(self, player_id: str): """Monitor the prevent playback state of an airplay player.""" count = 0 @@ -1058,7 +997,7 @@ class AirplayProvider(PlayerProvider): return if not active_stream.prevent_playback: return - await asyncio.sleep(0.25) + await asyncio.sleep(0.5) airplay_player.logger.info( "Player has been in prevent playback mode for too long, powering off.", diff --git a/music_assistant/server/providers/chromecast/__init__.py b/music_assistant/server/providers/chromecast/__init__.py index 6077108a..8019a1e7 100644 --- a/music_assistant/server/providers/chromecast/__init__.py +++ b/music_assistant/server/providers/chromecast/__init__.py @@ -49,7 +49,6 @@ if TYPE_CHECKING: from music_assistant.common.models.provider import ProviderManifest from music_assistant.common.models.queue_item import QueueItem from music_assistant.server import MusicAssistant - from music_assistant.server.controllers.streams import MultiClientStreamJob from music_assistant.server.models import ProviderInstanceType @@ -250,6 +249,7 @@ class ChromecastProvider(PlayerProvider): player_id, CONF_FLOW_MODE ) or await self.mass.config.get_player_config_value(player_id, CONF_CROSSFADE) url = await self.mass.streams.resolve_stream_url( + player_id, queue_item=queue_item, output_codec=ContentType.FLAC, seek_position=seek_position, @@ -286,25 +286,11 @@ class ChromecastProvider(PlayerProvider): media_controller = castplayer.cc.media_controller await asyncio.to_thread(media_controller.send_message, queuedata, True) - async def play_stream(self, player_id: str, stream_job: MultiClientStreamJob) -> None: - """Handle PLAY STREAM on given player. - - This is a special feature from the Universal Group provider. - """ - url = stream_job.resolve_stream_url(player_id, ContentType.FLAC) - castplayer = self.castplayers[player_id] - await asyncio.to_thread( - castplayer.cc.play_media, - url, - content_type="audio/flac", - title="Music Assistant", - thumb=MASS_LOGO_ONLINE, - ) - async def enqueue_next_queue_item(self, player_id: str, queue_item: QueueItem) -> None: """Handle enqueuing of the next queue item on the player.""" castplayer = self.castplayers[player_id] url = await self.mass.streams.resolve_stream_url( + player_id, queue_item=queue_item, output_codec=ContentType.FLAC, ) diff --git a/music_assistant/server/providers/dlna/__init__.py b/music_assistant/server/providers/dlna/__init__.py index 6e5d0244..25606235 100644 --- a/music_assistant/server/providers/dlna/__init__.py +++ b/music_assistant/server/providers/dlna/__init__.py @@ -55,7 +55,6 @@ if TYPE_CHECKING: from music_assistant.common.models.provider import ProviderManifest from music_assistant.common.models.queue_item import QueueItem from music_assistant.server import MusicAssistant - from music_assistant.server.controllers.streams import MultiClientStreamJob from music_assistant.server.models import ProviderInstanceType BASE_PLAYER_FEATURES = ( @@ -357,6 +356,7 @@ class DLNAPlayerProvider(PlayerProvider): use_flow_mode = await self.mass.config.get_player_config_value(player_id, CONF_FLOW_MODE) enforce_mp3 = await self.mass.config.get_player_config_value(player_id, CONF_ENFORCE_MP3) url = await self.mass.streams.resolve_stream_url( + player_id, queue_item=queue_item, output_codec=ContentType.MP3 if enforce_mp3 else ContentType.FLAC, seek_position=seek_position, @@ -386,40 +386,12 @@ class DLNAPlayerProvider(PlayerProvider): dlna_player.force_poll = True await self.poll_player(dlna_player.udn) - @catch_request_errors - async def play_stream(self, player_id: str, stream_job: MultiClientStreamJob) -> None: - """Handle PLAY STREAM on given player. - - This is a special feature from the Universal Group provider. - """ - enforce_mp3 = await self.mass.config.get_player_config_value(player_id, CONF_ENFORCE_MP3) - output_codec = ContentType.MP3 if enforce_mp3 else ContentType.FLAC - url = stream_job.resolve_stream_url(player_id, output_codec) - dlna_player = self.dlnaplayers[player_id] - # always clear queue (by sending stop) first - if dlna_player.device.can_stop: - await self.cmd_stop(player_id) - didl_metadata = create_didl_metadata(self.mass, url, None) - await dlna_player.device.async_set_transport_uri(url, "Music Assistant", didl_metadata) - # Play it - await dlna_player.device.async_wait_for_can_play(10) - # optimistically set this timestamp to help in case of a player - # that does not report the progress - now = time.time() - dlna_player.player.elapsed_time = 0 - dlna_player.player.elapsed_time_last_updated = now - await dlna_player.device.async_play() - # force poll the device - for sleep in (1, 2): - await asyncio.sleep(sleep) - dlna_player.force_poll = True - await self.poll_player(dlna_player.udn) - @catch_request_errors async def enqueue_next_queue_item(self, player_id: str, queue_item: QueueItem) -> None: """Handle enqueuing of the next queue item on the player.""" dlna_player = self.dlnaplayers[player_id] url = await self.mass.streams.resolve_stream_url( + player_id, queue_item=queue_item, output_codec=ContentType.FLAC, ) diff --git a/music_assistant/server/providers/fully_kiosk/__init__.py b/music_assistant/server/providers/fully_kiosk/__init__.py index c505618c..a0887c33 100644 --- a/music_assistant/server/providers/fully_kiosk/__init__.py +++ b/music_assistant/server/providers/fully_kiosk/__init__.py @@ -31,7 +31,6 @@ if TYPE_CHECKING: from music_assistant.common.models.provider import ProviderManifest from music_assistant.common.models.queue_item import QueueItem from music_assistant.server import MusicAssistant - from music_assistant.server.controllers.streams import MultiClientStreamJob from music_assistant.server.models import ProviderInstanceType AUDIOMANAGER_STREAM_MUSIC = 3 @@ -199,6 +198,7 @@ class FullyKioskProvider(PlayerProvider): player = self.mass.players.get(player_id) enforce_mp3 = await self.mass.config.get_player_config_value(player_id, CONF_ENFORCE_MP3) url = await self.mass.streams.resolve_stream_url( + player_id, queue_item=queue_item, output_codec=ContentType.MP3 if enforce_mp3 else ContentType.FLAC, seek_position=seek_position, @@ -212,22 +212,6 @@ class FullyKioskProvider(PlayerProvider): player.state = PlayerState.PLAYING self.mass.players.update(player_id) - async def play_stream(self, player_id: str, stream_job: MultiClientStreamJob) -> None: - """Handle PLAY STREAM on given player. - - This is a special feature from the Universal Group provider. - """ - player = self.mass.players.get(player_id) - enforce_mp3 = await self.mass.config.get_player_config_value(player_id, CONF_ENFORCE_MP3) - output_codec = ContentType.MP3 if enforce_mp3 else ContentType.FLAC - url = stream_job.resolve_stream_url(player_id, output_codec) - await self._fully.playSound(url, AUDIOMANAGER_STREAM_MUSIC) - player.current_item_id = player_id - player.elapsed_time = 0 - player.elapsed_time_last_updated = time.time() - player.state = PlayerState.PLAYING - self.mass.players.update(player_id) - async def poll_player(self, player_id: str) -> None: """Poll player for state updates. diff --git a/music_assistant/server/providers/hass_players/__init__.py b/music_assistant/server/providers/hass_players/__init__.py index 9d611a8a..7323b6ce 100644 --- a/music_assistant/server/providers/hass_players/__init__.py +++ b/music_assistant/server/providers/hass_players/__init__.py @@ -44,7 +44,6 @@ if TYPE_CHECKING: from music_assistant.common.models.provider import ProviderManifest from music_assistant.common.models.queue_item import QueueItem from music_assistant.server import MusicAssistant - from music_assistant.server.controllers.streams import MultiClientStreamJob from music_assistant.server.models import ProviderInstanceType from music_assistant.server.providers.hass import HomeAssistant as HomeAssistantProvider @@ -266,6 +265,7 @@ class HomeAssistantPlayers(PlayerProvider): use_flow_mode = await self.mass.config.get_player_config_value(player_id, CONF_FLOW_MODE) enforce_mp3 = await self.mass.config.get_player_config_value(player_id, CONF_ENFORCE_MP3) url = await self.mass.streams.resolve_stream_url( + player_id, queue_item=queue_item, output_codec=ContentType.MP3 if enforce_mp3 else ContentType.FLAC, seek_position=seek_position, @@ -287,25 +287,6 @@ class HomeAssistantPlayers(PlayerProvider): player.elapsed_time = 0 player.elapsed_time_last_updated = time.time() - async def play_stream(self, player_id: str, stream_job: MultiClientStreamJob) -> None: - """Handle PLAY STREAM on given player. - - This is a special feature from the Universal Group provider. - """ - enforce_mp3 = await self.mass.config.get_player_config_value(player_id, CONF_ENFORCE_MP3) - output_codec = ContentType.MP3 if enforce_mp3 else ContentType.FLAC - url = stream_job.resolve_stream_url(player_id, output_codec) - await self.hass_prov.hass.call_service( - domain="media_player", - service="play_media", - service_data={ - "media_content_id": url, - "media_content_type": "music", - "enqueue": "replace", - }, - target={"entity_id": player_id}, - ) - async def enqueue_next_queue_item(self, player_id: str, queue_item: QueueItem) -> None: """ Handle enqueuing of the next queue item on the player. @@ -321,6 +302,7 @@ class HomeAssistantPlayers(PlayerProvider): This will NOT be called if the player is using flow mode to playback the queue. """ url = await self.mass.streams.resolve_stream_url( + player_id, queue_item=queue_item, output_codec=ContentType.FLAC, ) diff --git a/music_assistant/server/providers/slimproto/__init__.py b/music_assistant/server/providers/slimproto/__init__.py index 27a7c347..93ae197c 100644 --- a/music_assistant/server/providers/slimproto/__init__.py +++ b/music_assistant/server/providers/slimproto/__init__.py @@ -62,7 +62,6 @@ if TYPE_CHECKING: from music_assistant.common.models.provider import ProviderManifest from music_assistant.common.models.queue_item import QueueItem from music_assistant.server import MusicAssistant - from music_assistant.server.controllers.streams import MultiClientStreamJob from music_assistant.server.models import ProviderInstanceType @@ -79,9 +78,10 @@ STATE_MAP = { REPEATMODE_MAP = {RepeatMode.OFF: 0, RepeatMode.ONE: 1, RepeatMode.ALL: 2} # sync constants -MIN_DEVIATION_ADJUST = 6 # 6 milliseconds -MIN_REQ_PLAYPOINTS = 8 # we need at least 8 measurements -MAX_SKIP_AHEAD_MS = 1500 # 1.5 seconds +MIN_DEVIATION_ADJUST = 8 # 8 milliseconds +MIN_REQ_PLAYPOINTS = 3 # we need at least 3 measurements +DEVIATION_JUMP_IGNORE = 2000 # ignore a sudden unrealistic jump +MAX_SKIP_AHEAD_MS = 500 # 0.5 seconds @dataclass @@ -215,7 +215,6 @@ class SlimprotoProvider(PlayerProvider): slimproto: SlimServer _sync_playpoints: dict[str, deque[SyncPlayPoint]] - _do_not_resync_before: dict[str, float] @property def supported_features(self) -> tuple[ProviderFeature, ...]: @@ -225,7 +224,6 @@ class SlimprotoProvider(PlayerProvider): async def handle_async_init(self) -> None: """Handle async initialization of the provider.""" self._sync_playpoints = {} - self._do_not_resync_before = {} self._resync_handle: asyncio.TimerHandle | None = None control_port = self.config.get_value(CONF_PORT) telnet_port = self.config.get_value(CONF_CLI_TELNET_PORT) @@ -360,6 +358,7 @@ class SlimprotoProvider(PlayerProvider): self._handle_play_url( slimplayer, url=stream_job.resolve_stream_url( + player_id, slimplayer.player_id, output_codec=ContentType.MP3 if enforce_mp3 else ContentType.FLAC, ), @@ -374,6 +373,7 @@ class SlimprotoProvider(PlayerProvider): if not slimplayer: return url = await self.mass.streams.resolve_stream_url( + player_id, queue_item=queue_item, # for now just hardcode flac as we assume that every (modern) # slimproto based player can handle that just fine @@ -390,39 +390,13 @@ class SlimprotoProvider(PlayerProvider): auto_play=True, ) - async def play_stream(self, player_id: str, stream_job: MultiClientStreamJob) -> None: - """Handle PLAY STREAM on given player. - - This is a special feature from the Universal Group provider. - """ - enforce_mp3 = await self.mass.config.get_player_config_value(player_id, CONF_ENFORCE_MP3) - # fix race condition where resync and play media are called at more or less the same time - if self._resync_handle: - self._resync_handle.cancel() - self._resync_handle = None - # forward command to player and any connected sync members - sync_clients = self._get_sync_clients(player_id) - async with asyncio.TaskGroup() as tg: - for slimplayer in sync_clients: - tg.create_task( - self._handle_play_url( - slimplayer, - url=stream_job.resolve_stream_url( - slimplayer.player_id, - output_codec=ContentType.MP3 if enforce_mp3 else ContentType.FLAC, - ), - queue_item=None, - send_flush=True, - auto_play=False, - ) - ) - async def enqueue_next_queue_item(self, player_id: str, queue_item: QueueItem) -> None: """Handle enqueuing of the next queue item on the player.""" if not (slimplayer := self.slimproto.get_player(player_id)): return enforce_mp3 = await self.mass.config.get_player_config_value(player_id, CONF_ENFORCE_MP3) url = await self.mass.streams.resolve_stream_url( + player_id, queue_item=queue_item, output_codec=ContentType.MP3 if enforce_mp3 else ContentType.FLAC, flow_mode=False, @@ -654,6 +628,11 @@ class SlimprotoProvider(PlayerProvider): x.player_id for x in self.slimproto.players if x.player_id != player_id ), ) + if slimplayer.device_type == "squeezeesp32": + # squeezeesp32 with default settings - override with sane defaults + if slimplayer.max_sample_rate == 192000: + player.max_sample_rate = 44100 + player.supports_24bit = False self.mass.players.register_or_update(player) # update player state on player events @@ -737,17 +716,12 @@ class SlimprotoProvider(PlayerProvider): return if slimplayer.state != SlimPlayerState.PLAYING: return - - if backoff_time := self._do_not_resync_before.get(slimplayer.player_id): - # player has set a timestamp we should backoff from syncing it - if time.time() < backoff_time: - return + if slimplayer.player_id not in self._sync_playpoints: + return # we collect a few playpoints of the player to determine # average lag/drift so we can adjust accordingly - sync_playpoints = self._sync_playpoints.setdefault( - slimplayer.player_id, deque(maxlen=MIN_REQ_PLAYPOINTS) - ) + sync_playpoints = self._sync_playpoints[slimplayer.player_id] active_queue = self.mass.player_queues.get_active_queue(slimplayer.player_id) stream_job = self.mass.streams.multi_client_jobs.get(active_queue.queue_id) @@ -755,8 +729,9 @@ class SlimprotoProvider(PlayerProvider): # should not happen, but just in case return + now = time.time() last_playpoint = sync_playpoints[-1] if sync_playpoints else None - if last_playpoint and (time.time() - last_playpoint.timestamp) > 10: + if last_playpoint and (now - last_playpoint.timestamp) > 10: # last playpoint is too old, invalidate sync_playpoints.clear() if last_playpoint and last_playpoint.sync_job_id != stream_job.job_id: @@ -768,8 +743,12 @@ class SlimprotoProvider(PlayerProvider): - self._get_corrected_elapsed_milliseconds(slimplayer) ) + if last_playpoint and abs(last_playpoint.diff - diff) > DEVIATION_JUMP_IGNORE: + # ignore unexpected spikes + return + # we can now append the current playpoint to our list - sync_playpoints.append(SyncPlayPoint(time.time(), stream_job.job_id, diff)) + sync_playpoints.append(SyncPlayPoint(now, stream_job.job_id, diff)) if len(sync_playpoints) < MIN_REQ_PLAYPOINTS: return @@ -783,24 +762,22 @@ class SlimprotoProvider(PlayerProvider): # resync the player by skipping ahead or pause for x amount of (milli)seconds sync_playpoints.clear() - self._do_not_resync_before[slimplayer.player_id] = time.time() + (delta / 1000) + 2 if avg_diff > MAX_SKIP_AHEAD_MS: # player lagging behind more than MAX_SKIP_AHEAD_MS, # we need to correct the sync_master - self.logger.warning( - "%s is lagging behind more than %s milliseconds!", - player.display_name, - MAX_SKIP_AHEAD_MS, - ) + self.logger.debug("%s resync: pauseFor %sms", sync_master.name, delta) self.mass.create_task(sync_master.pause_for(delta)) + sync_master._elapsed_milliseconds -= delta elif avg_diff > 0: # handle player lagging behind, fix with skip_ahead self.logger.debug("%s resync: skipAhead %sms", player.display_name, delta) self.mass.create_task(slimplayer.skip_over(delta)) + sync_master._elapsed_milliseconds += delta else: # handle player is drifting too far ahead, use pause_for to adjust self.logger.debug("%s resync: pauseFor %sms", player.display_name, delta) self.mass.create_task(slimplayer.pause_for(delta)) + sync_master._elapsed_milliseconds -= delta async def _handle_buffer_ready(self, slimplayer: SlimClient) -> None: """Handle buffer ready event, player has buffered a (new) track. @@ -813,29 +790,31 @@ class SlimprotoProvider(PlayerProvider): return if not player.group_childs: # not a sync group, continue - await slimplayer.play() + await slimplayer.unpause_at(slimplayer.jiffies) return count = 0 while count < 40: childs_total = 0 childs_ready = 0 + await asyncio.sleep(0.2) for sync_child in self._get_sync_clients(player.player_id): childs_total += 1 if sync_child.state == SlimPlayerState.BUFFER_READY: childs_ready += 1 if childs_total == childs_ready: break - await asyncio.sleep(0.1) + # all child's ready (or timeout) - start play async with asyncio.TaskGroup() as tg: for _client in self._get_sync_clients(player.player_id): - timestamp = _client.jiffies + 500 - sync_delay = self.mass.config.get_raw_player_config_value( - _client.player_id, CONF_SYNC_ADJUST, 0 - ) - timestamp -= sync_delay - self._do_not_resync_before[_client.player_id] = time.time() + 1 - tg.create_task(_client.unpause_at(int(timestamp))) + self._sync_playpoints.setdefault( + _client.player_id, deque(maxlen=MIN_REQ_PLAYPOINTS * 2) + ).clear() + # NOTE: Officially you should do an unpause_at based on the player timestamp + # but I did not have any good results with that. + # Instead just start playback on all players and let the sync logic work out + # the delays etc. + tg.create_task(_client.unpause_at(0)) async def _handle_connected(self, slimplayer: SlimClient) -> None: """Handle a slimplayer connected event.""" diff --git a/music_assistant/server/providers/snapcast/__init__.py b/music_assistant/server/providers/snapcast/__init__.py index 3e81f481..8cf61b0d 100644 --- a/music_assistant/server/providers/snapcast/__init__.py +++ b/music_assistant/server/providers/snapcast/__init__.py @@ -39,7 +39,6 @@ if TYPE_CHECKING: from music_assistant.common.models.provider import ProviderManifest from music_assistant.common.models.queue_item import QueueItem from music_assistant.server import MusicAssistant - from music_assistant.server.controllers.streams import MultiClientStreamJob from music_assistant.server.models import ProviderInstanceType CONF_SNAPCAST_SERVER_HOST = "snapcast_server_host" @@ -268,81 +267,41 @@ class SnapCastProvider(PlayerProvider): snap_group = self._get_snapgroup(player_id) await snap_group.set_stream(stream.identifier) - async def _streamer() -> None: - host = self.snapcast_server_host - _, writer = await asyncio.open_connection(host, port) - self.logger.debug("Opened connection to %s:%s", host, port) - player.current_item_id = f"{queue_item.queue_id}.{queue_item.queue_item_id}" - player.elapsed_time = 0 - player.elapsed_time_last_updated = time.time() - player.state = PlayerState.PLAYING - self._set_childs_state(player_id, PlayerState.PLAYING) - self.mass.players.register_or_update(player) - # TODO: can we handle 24 bits bit depth ? - pcm_format = AudioFormat( - content_type=ContentType.PCM_S16LE, - sample_rate=48000, - bit_depth=16, - channels=2, + # TODO: can we handle 24 bits bit depth ? + pcm_format = AudioFormat( + content_type=ContentType.PCM_S16LE, + sample_rate=48000, + bit_depth=16, + channels=2, + ) + # handle special case for UGP multi client stream + if stream_job := self.mass.streams.multi_client_jobs.get(queue_item.queue_id): + stream_job.expected_players.add(player_id) + audio_iterator = stream_job.subscribe( + player_id=player_id, + output_format=pcm_format, + ) + else: + audio_iterator = self.mass.streams.get_flow_stream( + queue, + start_queue_item=queue_item, + pcm_format=pcm_format, + seek_position=seek_position, + fade_in=fade_in, ) - try: - async for pcm_chunk in self.mass.streams.get_flow_stream( - queue, - start_queue_item=queue_item, - pcm_format=pcm_format, - seek_position=seek_position, - fade_in=fade_in, - ): - writer.write(pcm_chunk) - await writer.drain() - # end of the stream reached - if writer.can_write_eof(): - writer.write_eof() - await writer.drain() - # we need to wait a bit before removing the stream to ensure - # that all snapclients have consumed the audio - # https://github.com/music-assistant/hass-music-assistant/issues/1962 - await asyncio.sleep(30) - finally: - if not writer.is_closing(): - writer.close() - await self._snapserver.stream_remove_stream(stream.identifier) - self.logger.debug("Closed connection to %s:%s", host, port) - - # start streaming the queue (pcm) audio in a background task - self._stream_tasks[player_id] = asyncio.create_task(_streamer()) - - async def play_stream(self, player_id: str, stream_job: MultiClientStreamJob) -> None: - """Handle PLAY STREAM on given player. - - This is a special feature from the Universal Group provider. - """ - player = self.mass.players.get(player_id) - if player.synced_to: - msg = "A synced player cannot receive play commands directly" - raise RuntimeError(msg) - # stop any existing streams first - await self.cmd_stop(player_id) - if stream_job.pcm_format.bit_depth != 16 or stream_job.pcm_format.sample_rate != 48000: - # TODO: resample on the fly here ? - raise RuntimeError("Unsupported PCM format") - stream, port = await self._create_stream() - stream_job.expected_players.add(player_id) - snap_group = self._get_snapgroup(player_id) - await snap_group.set_stream(stream.identifier) async def _streamer() -> None: host = self.snapcast_server_host _, writer = await asyncio.open_connection(host, port) self.logger.debug("Opened connection to %s:%s", host, port) - player.current_item_id = f"flow/{stream_job.queue_id}" + player.current_item_id = f"{queue_item.queue_id}.{queue_item.queue_item_id}" player.elapsed_time = 0 player.elapsed_time_last_updated = time.time() player.state = PlayerState.PLAYING self._set_childs_state(player_id, PlayerState.PLAYING) self.mass.players.register_or_update(player) try: - async for pcm_chunk in stream_job.subscribe(player_id): + async for pcm_chunk in audio_iterator: writer.write(pcm_chunk) await writer.drain() # end of the stream reached diff --git a/music_assistant/server/providers/sonos/__init__.py b/music_assistant/server/providers/sonos/__init__.py index 01a677b0..af0c4135 100644 --- a/music_assistant/server/providers/sonos/__init__.py +++ b/music_assistant/server/providers/sonos/__init__.py @@ -9,7 +9,6 @@ from __future__ import annotations import asyncio import logging -import time from collections import OrderedDict from dataclasses import dataclass, field from typing import TYPE_CHECKING @@ -46,7 +45,6 @@ if TYPE_CHECKING: from music_assistant.common.models.provider import ProviderManifest from music_assistant.common.models.queue_item import QueueItem from music_assistant.server import MusicAssistant - from music_assistant.server.controllers.streams import MultiClientStreamJob from music_assistant.server.models import ProviderInstanceType @@ -353,6 +351,7 @@ class SonosPlayerProvider(PlayerProvider): - fade_in: Optionally fade in the item at playback start. """ url = await self.mass.streams.resolve_stream_url( + player_id, queue_item=queue_item, output_codec=ContentType.FLAC, seek_position=seek_position, @@ -367,32 +366,11 @@ class SonosPlayerProvider(PlayerProvider): "accept play_media command, it is synced to another player." ) raise PlayerCommandFailed(msg) - metadata = create_didl_metadata(self.mass, url, queue_item) - await self.mass.create_task(sonos_player.soco.play_uri, url, meta=metadata) - - async def play_stream(self, player_id: str, stream_job: MultiClientStreamJob) -> None: - """Handle PLAY STREAM on given player. - - This is a special feature from the Universal Group provider. - """ - url = stream_job.resolve_stream_url(player_id, ContentType.FLAC) - sonos_player = self.sonosplayers[player_id] - mass_player = self.mass.players.get(player_id) - if sonos_player.sync_coordinator: - # this should be already handled by the player manager, but just in case... - msg = ( - f"Player {mass_player.display_name} can not " - "accept play_stream command, it is synced to another player." - ) - raise PlayerCommandFailed(msg) - metadata = create_didl_metadata(self.mass, url, None) - # sonos players do not like our multi client stream - # add to the workaround players list - self.mass.streams.workaround_players.add(player_id) - await self.mass.create_task(sonos_player.soco.play_uri, url, meta=metadata) - # optimistically set this timestamp to help figure out elapsed time later - mass_player.elapsed_time = 0 - mass_player.elapsed_time_last_updated = time.time() + await self.mass.create_task( + sonos_player.soco.play_uri, + url, + meta=create_didl_metadata(self.mass, url, queue_item), + ) async def enqueue_next_queue_item(self, player_id: str, queue_item: QueueItem) -> None: """ @@ -411,6 +389,7 @@ class SonosPlayerProvider(PlayerProvider): """ sonos_player = self.sonosplayers[player_id] url = await self.mass.streams.resolve_stream_url( + player_id, queue_item=queue_item, output_codec=ContentType.FLAC, ) diff --git a/music_assistant/server/providers/spotify/__init__.py b/music_assistant/server/providers/spotify/__init__.py index c5c2dfb8..7733d216 100644 --- a/music_assistant/server/providers/spotify/__init__.py +++ b/music_assistant/server/providers/spotify/__init__.py @@ -612,7 +612,7 @@ class SpotifyProvider(MusicProvider): if retries > 2: # switch to ap workaround after 2 retries self._ap_workaround = True - except asyncio.exceptions.TimeoutError: + except TimeoutError: await asyncio.sleep(2) if tokeninfo and userinfo: self._auth_token = tokeninfo diff --git a/music_assistant/server/providers/ugp/__init__.py b/music_assistant/server/providers/ugp/__init__.py index 51ee24d7..6aa5b164 100644 --- a/music_assistant/server/providers/ugp/__init__.py +++ b/music_assistant/server/providers/ugp/__init__.py @@ -26,6 +26,7 @@ from music_assistant.common.models.enums import ( ProviderFeature, ) from music_assistant.common.models.player import DeviceInfo, Player +from music_assistant.common.models.queue_item import QueueItem from music_assistant.constants import CONF_CROSSFADE, CONF_GROUP_MEMBERS, SYNCGROUP_PREFIX from music_assistant.server.models.player_provider import PlayerProvider @@ -34,7 +35,6 @@ if TYPE_CHECKING: from music_assistant.common.models.config_entries import ProviderConfig from music_assistant.common.models.provider import ProviderManifest - from music_assistant.common.models.queue_item import QueueItem from music_assistant.server import MusicAssistant from music_assistant.server.models import ProviderInstanceType @@ -177,13 +177,18 @@ class UniversalGroupProvider(PlayerProvider): await self.cmd_power(player_id, True) group_player = self.mass.players.get(player_id) - # create multi-client stream job - stream_job = await self.mass.streams.create_multi_client_stream_job( + # create a multi-client stream job - all (direct) child's of this UGP group + # will subscribe to this multi client queue stream + await self.mass.streams.create_multi_client_stream_job( player_id, start_queue_item=queue_item, seek_position=seek_position, fade_in=fade_in, ) + # create a fake queue item to forward to downstream play_media commands + ugp_queue_item = QueueItem( + player_id, queue_item_id="flow", name=group_player.display_name, duration=None + ) # forward the stream job to all group members async with asyncio.TaskGroup() as tg: @@ -193,7 +198,7 @@ class UniversalGroupProvider(PlayerProvider): member = self.mass.players.get_sync_leader(member) # noqa: PLW2901 if member is None: continue - tg.create_task(player_prov.play_stream(member.player_id, stream_job)) + tg.create_task(player_prov.play_media(member.player_id, ugp_queue_item, 0, False)) async def poll_player(self, player_id: str) -> None: """Poll player for state updates.""" -- 2.34.1