From 188a89c63981de13f666246fcdb85b60c4b2e63b Mon Sep 17 00:00:00 2001 From: Kostas Chatzikokolakis Date: Fri, 25 Apr 2025 20:02:38 +0300 Subject: [PATCH] Several improvements to announcements (#2145) * Snapcast: remove obsolete cancel_timer * Snapcast: remove player update in cmd_volume_set No changes are made in the player here, and the update is called with snap_client_id, not player_id. * Snapcast: implement native announcements * Faster TTS pre-announce, without waiting for TTS data * Snapcast: use -probesize for faster announcements --- music_assistant/controllers/streams.py | 36 +++- .../providers/snapcast/__init__.py | 203 +++++++++++++----- 2 files changed, 177 insertions(+), 62 deletions(-) diff --git a/music_assistant/controllers/streams.py b/music_assistant/controllers/streams.py index a439f9b1..2dde6386 100644 --- a/music_assistant/controllers/streams.py +++ b/music_assistant/controllers/streams.py @@ -919,25 +919,39 @@ class StreamsController(CoreController): use_pre_announce: bool = False, ) -> AsyncGenerator[bytes, None]: """Get the special announcement stream.""" + filter_params = ["loudnorm=I=-10:LRA=11:TP=-2"] + + if use_pre_announce: + # Note: TTS URLs might take a while to load cause the actual data are often generated + # asynchronously by the TTS provider. If we ask ffmpeg to mix the pre-announce, it will + # wait until it reads the TTS data, so the whole stream will be delayed. It is much + # faster to first play the pre-announce using a separate ffmpeg stream, and only + # afterwards play the TTS itself. + # + # For this to be effective the player itself needs to be able to start playback fast. + # If the returned stream is used as input to ffmpeg we should pass -probesize 8096. + # + # Finally we also need to make sure we don't make other blocking requests to the TTS + # data, eg to get the duration (async_parse_tags). + # + async for chunk in get_ffmpeg_stream( + audio_input=ANNOUNCE_ALERT_FILE, + input_format=AudioFormat(content_type=ContentType.try_parse(ANNOUNCE_ALERT_FILE)), + output_format=output_format, + filter_params=filter_params, + ): + yield chunk + # work out output format/details fmt = announcement_url.rsplit(".")[-1] audio_format = AudioFormat(content_type=ContentType.try_parse(fmt)) - extra_args = [] - filter_params = ["loudnorm=I=-10:LRA=11:TP=-2"] - if use_pre_announce: - extra_args += [ - "-i", - ANNOUNCE_ALERT_FILE, - "-filter_complex", - "[1:a][0:a]concat=n=2:v=0:a=1,loudnorm=I=-10:LRA=11:TP=-1.5", - ] - filter_params = [] + extra_input_args = ["-probesize", "8096"] # start the stream before reading all TTS input async for chunk in get_ffmpeg_stream( audio_input=announcement_url, input_format=audio_format, output_format=output_format, - extra_args=extra_args, filter_params=filter_params, + extra_input_args=extra_input_args, ): yield chunk diff --git a/music_assistant/providers/snapcast/__init__.py b/music_assistant/providers/snapcast/__init__.py index 6c57939d..015b9a7f 100644 --- a/music_assistant/providers/snapcast/__init__.py +++ b/music_assistant/providers/snapcast/__init__.py @@ -11,6 +11,7 @@ import socket import time import urllib.parse from contextlib import suppress +from enum import StrEnum from typing import TYPE_CHECKING, cast from bidict import bidict @@ -83,7 +84,8 @@ DEFAULT_SNAPSERVER_IP = "127.0.0.1" DEFAULT_SNAPSERVER_PORT = 1705 DEFAULT_SNAPSTREAM_IDLE_THRESHOLD = 60000 -MASS_STREAM_POSTFIX = "Music Assistant" +MASS_STREAM_PREFIX = "Music Assistant - " +MASS_ANNOUNCEMENT_POSTFIX = " (announcement)" SNAPWEB_DIR = pathlib.Path(__file__).parent.resolve().joinpath("snapweb") CONTROL_SCRIPT = pathlib.Path(__file__).parent.resolve().joinpath("control.py") @@ -105,6 +107,13 @@ DEFAULT_SNAPCAST_PCM_FORMAT = AudioFormat( ) +class SnapCastStreamType(StrEnum): + """Enum for Snapcast Stream Type.""" + + MUSIC = "MUSIC" + ANNOUNCEMENT = "ANNOUNCEMENT" + + async def setup( mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig ) -> ProviderInstanceType: @@ -404,6 +413,7 @@ class SnapCastProvider(PlayerProvider): PlayerFeature.SET_MEMBERS, PlayerFeature.VOLUME_SET, PlayerFeature.VOLUME_MUTE, + PlayerFeature.PLAY_ANNOUNCEMENT, }, synced_to=self._synced_to(player_id), can_group_with={self.instance_id}, @@ -423,14 +433,18 @@ class SnapCastProvider(PlayerProvider): player.volume_muted = snap_client.muted player.available = snap_client.connected player.synced_to = self._synced_to(player_id) - # if player.active_group is None: - if stream := self._get_snapstream(player_id): + + # Note: when the active stream is a MASS stream the active_source is __not__ updated at all. + # So it doesn't matter whether a MASS stream is for music or announcements. + if stream := self._get_active_snapstream(player_id): if stream.identifier == "default": player.active_source = None - elif not stream.identifier.startswith(MASS_STREAM_POSTFIX): + elif not stream.identifier.startswith(MASS_STREAM_PREFIX): + # unknown source player.active_source = stream.identifier else: player.active_source = None + self._group_childs(player_id) self.mass.players.update(player_id) @@ -455,25 +469,30 @@ class SnapCastProvider(PlayerProvider): """Send VOLUME_SET command to given player.""" snap_client_id = self._get_snapclient_id(player_id) await self._snapserver.client(snap_client_id).set_volume(volume_level) - self.mass.players.update(snap_client_id) async def cmd_stop(self, player_id: str) -> None: """Send STOP command to given player.""" + # update the state first to avoid race conditions, if an active play_announcement + # finishes the player.state should be IDLE. player = self.mass.players.get(player_id, raise_unavailable=False) + player.state = PlayerState.IDLE + player.current_media = None + player.active_source = None + self._set_childs_state(player_id) + self.mass.players.update(player_id) - await self._get_snapgroup(player_id).set_stream("default") - await self._delete_current_snapstream(self._get_snapstream(player_id)) + # we change the active stream only if music was playing + if not player.announcement_in_progress: + await self._get_snapgroup(player_id).set_stream("default") + + # but we always delete the music stream (whether it was active or not) + await self._delete_stream(self._get_stream_name(player_id, SnapCastStreamType.MUSIC)) if stream_task := self._stream_tasks.pop(player_id, None): if not stream_task.done(): stream_task.cancel() with suppress(asyncio.CancelledError): await stream_task - player.state = PlayerState.IDLE - player.current_media = None - player.active_source = None - self._set_childs_state(player_id) - self.mass.players.update(player_id) async def cmd_volume_mute(self, player_id: str, muted: bool) -> None: """Send MUTE command to given player.""" @@ -518,7 +537,7 @@ class SnapCastProvider(PlayerProvider): self.mass.players.update(player_id, skip_forward=True) self.mass.players.update(mass_player.synced_to, skip_forward=True) - async def play_media(self, player_id: str, media: PlayerMedia) -> None: # noqa: PLR0915 + async def play_media(self, player_id: str, media: PlayerMedia) -> None: """Handle PLAY MEDIA on given player.""" player = self.mass.players.get(player_id) if player.synced_to: @@ -533,23 +552,20 @@ class SnapCastProvider(PlayerProvider): await stream_task # get stream or create new one - stream = await self._get_or_create_stream(player_id, media.queue_id or player_id) - snap_group = self._get_snapgroup(player_id) - await snap_group.set_stream(stream.identifier) + stream_name = self._get_stream_name(player_id, SnapCastStreamType.MUSIC) + stream = await self._get_or_create_stream(stream_name, media.queue_id or player_id) + + # if no announcement is playing we activate the stream now, otherwise it + # will be activated by play_announcement when the announcement is over. + if not player.announcement_in_progress: + snap_group = self._get_snapgroup(player_id) + await snap_group.set_stream(stream.identifier) player.current_media = media player.active_source = media.queue_id # select audio source - if media.media_type == MediaType.ANNOUNCEMENT: - # special case: stream announcement - input_format = DEFAULT_SNAPCAST_FORMAT - audio_source = self.mass.streams.get_announcement_stream( - media.custom_data["url"], - output_format=DEFAULT_SNAPCAST_FORMAT, - use_pre_announce=media.custom_data["use_pre_announce"], - ) - elif media.media_type == MediaType.PLUGIN_SOURCE: + if media.media_type == MediaType.PLUGIN_SOURCE: # special case: plugin source stream input_format = DEFAULT_SNAPCAST_FORMAT audio_source = self.mass.streams.get_plugin_source_stream( @@ -584,12 +600,7 @@ class SnapCastProvider(PlayerProvider): ) async def _streamer() -> None: - if stream.path: - stream_path = stream.path - if not stream.path: - stream_path = "tcp://" + stream._stream["uri"]["host"] - stream_path = stream_path.replace("0.0.0.0", self._snapcast_server_host) - + stream_path = self._get_stream_path(stream) self.logger.debug("Start streaming to %s", stream_path) async with FFMpeg( audio_input=audio_source, @@ -622,11 +633,101 @@ class SnapCastProvider(PlayerProvider): # start streaming the queue (pcm) audio in a background task self._stream_tasks[player_id] = self.mass.create_task(_streamer()) - async def _delete_current_snapstream(self, stream: Snapstream) -> None: - if not stream.identifier.startswith(MASS_STREAM_POSTFIX): - return - with suppress(TypeError, KeyError, AttributeError): - await self._snapserver.stream_remove_stream(stream.identifier) + async def play_announcement( + self, player_id: str, announcement: PlayerMedia, volume_level: int | None = None + ) -> None: + """Handle (provider native) playback of an announcement on given player.""" + # get stream or create new one + stream_name = self._get_stream_name(player_id, SnapCastStreamType.ANNOUNCEMENT) + stream = await self._get_or_create_stream(stream_name, None) + + # always activate the stream (announcements have priority over music) + snap_group = self._get_snapgroup(player_id) + await snap_group.set_stream(stream.identifier) + + # Unfortunately snapcast sets a volume per client (not per stream), so we need a way to + # set the announcement volume without affecting the music volume. + # We go for the simplest solution: save the previous volume, change it, restore later + # (with the downside that the change will be visible in the UI) + player = self.mass.players.get(player_id) + orig_volume_level = player.volume_level + volume_level = self.mass.players.get_announcement_volume(player_id, volume_level) + await self.cmd_volume_set(player_id, volume_level) + + input_format = DEFAULT_SNAPCAST_FORMAT + audio_source = self.mass.streams.get_announcement_stream( + announcement.custom_data["url"], + output_format=DEFAULT_SNAPCAST_FORMAT, + use_pre_announce=announcement.custom_data["use_pre_announce"], + ) + + # stream the audio, wait for it to finish (play_announcement should return after the + # announcement is over to avoid simultaneous announcements). + # + # Note: -probesize 8096 is needed to start playing the pre-announce before the TTS + # data arrive (they arrive late, see get_announcement_stream). + # + stream_path = self._get_stream_path(stream) + self.logger.debug("Start announcement streaming to %s", stream_path) + async with FFMpeg( + audio_input=audio_source, + input_format=input_format, + output_format=DEFAULT_SNAPCAST_FORMAT, + filter_params=get_player_filter_params( + self.mass, player_id, input_format, DEFAULT_SNAPCAST_FORMAT + ), + audio_output=stream_path, + extra_input_args=["-y", "-re", "-probesize", "8096"], + ) as ffmpeg_proc: + await ffmpeg_proc.wait() + + self.logger.debug("Finished announcement streaming to %s", stream_path) + # we need to wait a bit for the stream status to become idle + # to ensure that all snapclients have consumed the audio + while stream.status != "idle": + await asyncio.sleep(0.25) + + # delete the announcement stream + await self._delete_stream(stream_name) + + # restore volume, if it is still the same we set above + # (the user did not change it while the announcement was playing) + if player.volume_level == volume_level and orig_volume_level is not None: + await self.cmd_volume_set(player_id, orig_volume_level) + + # and restore the group to either the default or the music stream + if player.state == PlayerState.IDLE: + new_stream_name = "default" + else: + new_stream_name = self._get_stream_name(player_id, SnapCastStreamType.MUSIC) + await self._get_snapgroup(player_id).set_stream(new_stream_name) + + def _get_stream_name(self, player_id: str, stream_type: SnapCastStreamType) -> str: + """Return the name of the stream for the given player. + + Each player can have up to two concurrent streams, for music and announcements. + + The stream name depends only on player_id (not queue_id) for two reasones: + 1. Avoid issues when the same queue_id is simultaneously used by two players + (eg in universal groups). + 2. Easily identify which stream belongs to which player, for instance to be able to + delete a music stream even when it is not active due to an announcement. + """ + player = self.mass.players.get(player_id) + safe_name = create_safe_string(player.display_name, replace_space=True) + stream_name = f"{MASS_STREAM_PREFIX}{safe_name}" + if stream_type == SnapCastStreamType.ANNOUNCEMENT: + stream_name += MASS_ANNOUNCEMENT_POSTFIX + return stream_name + + def _get_stream_path(self, stream: Snapstream) -> str: + stream_path = stream.path or f"tcp://{stream._stream['uri']['host']}" + return stream_path.replace("0.0.0.0", self._snapcast_server_host) + + async def _delete_stream(self, stream_name: str) -> None: + if stream := self._get_snapstream(stream_name): + with suppress(TypeError, KeyError, AttributeError): + await self._snapserver.stream_remove_stream(stream.identifier) def _get_snapgroup(self, player_id: str) -> Snapgroup: """Get snapcast group for given player_id.""" @@ -634,11 +735,16 @@ class SnapCastProvider(PlayerProvider): client: Snapclient = self._snapserver.client(snap_client_id) return client.group - def _get_snapstream(self, player_id: str) -> Snapstream | None: - """Get snapcast stream for given player_id.""" + def _get_snapstream(self, stream_name: str) -> Snapstream | None: + """Get a stream by name.""" + with suppress(KeyError): + return self._snapserver.stream(stream_name) + return None + + def _get_active_snapstream(self, player_id: str) -> Snapstream | None: + """Get active stream for given player_id.""" if group := self._get_snapgroup(player_id): - with suppress(KeyError): - return self._snapserver.stream(group.stream) + return self._get_snapstream(group.stream) return None def _synced_to(self, player_id: str) -> str | None: @@ -665,20 +771,15 @@ class SnapCastProvider(PlayerProvider): and self._snapserver.client(snap_client_id).connected } - async def _get_or_create_stream(self, player_id: str, queue_id: str) -> Snapstream: + async def _get_or_create_stream(self, stream_name: str, queue_id: str | None) -> Snapstream: """Create new stream on snapcast server (or return existing one).""" - mass_queue = self.mass.player_queues.get(queue_id) - safe_name = create_safe_string(mass_queue.display_name, replace_space=True) - stream_name = f"{MASS_STREAM_POSTFIX} - {safe_name}" - # cancel any existing clear stream task - self.mass.cancel_timer(f"snapcast_clear_stream_{player_id}") - # prefer to reuse existing stream if possible - for stream in self._snapserver.streams: - if stream.identifier == stream_name: - return stream + if stream := self._get_snapstream(stream_name): + return stream - if self._use_builtin_server: + # The control script is used only for music streams in the builtin server + # (queue_id is None only for announcement streams). + if self._use_builtin_server and queue_id: extra_args = ( f"&controlscript={urllib.parse.quote_plus(str(CONTROL_SCRIPT))}" f"&controlscriptparams=--queueid={urllib.parse.quote_plus(queue_id)}%20" -- 2.34.1