Several improvements to announcements (#2145)
authorKostas Chatzikokolakis <kostas@chatzi.org>
Fri, 25 Apr 2025 17:02:38 +0000 (20:02 +0300)
committerGitHub <noreply@github.com>
Fri, 25 Apr 2025 17:02:38 +0000 (19:02 +0200)
* Snapcast: remove obsolete cancel_timer

* Snapcast: remove player update in cmd_volume_set

No changes are made in the player here, and the update is called with
snap_client_id, not player_id.

* Snapcast: implement native announcements

* Faster TTS pre-announce, without waiting for TTS data

* Snapcast: use -probesize for faster announcements

music_assistant/controllers/streams.py
music_assistant/providers/snapcast/__init__.py

index a439f9b18c84a3d12e834e283d2286ad72c25536..2dde638615ad8119c799b4aaea94d8d35fdcb1a9 100644 (file)
@@ -919,25 +919,39 @@ class StreamsController(CoreController):
         use_pre_announce: bool = False,
     ) -> AsyncGenerator[bytes, None]:
         """Get the special announcement stream."""
+        filter_params = ["loudnorm=I=-10:LRA=11:TP=-2"]
+
+        if use_pre_announce:
+            # Note: TTS URLs might take a while to load cause the actual data are often generated
+            # asynchronously by the TTS provider. If we ask ffmpeg to mix the pre-announce, it will
+            # wait until it reads the TTS data, so the whole stream will be delayed. It is much
+            # faster to first play the pre-announce using a separate ffmpeg stream, and only
+            # afterwards play the TTS itself.
+            #
+            # For this to be effective the player itself needs to be able to start playback fast.
+            # If the returned stream is used as input to ffmpeg we should pass -probesize 8096.
+            #
+            # Finally we also need to make sure we don't make other blocking requests to the TTS
+            # data, eg to get the duration (async_parse_tags).
+            #
+            async for chunk in get_ffmpeg_stream(
+                audio_input=ANNOUNCE_ALERT_FILE,
+                input_format=AudioFormat(content_type=ContentType.try_parse(ANNOUNCE_ALERT_FILE)),
+                output_format=output_format,
+                filter_params=filter_params,
+            ):
+                yield chunk
+
         # work out output format/details
         fmt = announcement_url.rsplit(".")[-1]
         audio_format = AudioFormat(content_type=ContentType.try_parse(fmt))
-        extra_args = []
-        filter_params = ["loudnorm=I=-10:LRA=11:TP=-2"]
-        if use_pre_announce:
-            extra_args += [
-                "-i",
-                ANNOUNCE_ALERT_FILE,
-                "-filter_complex",
-                "[1:a][0:a]concat=n=2:v=0:a=1,loudnorm=I=-10:LRA=11:TP=-1.5",
-            ]
-            filter_params = []
+        extra_input_args = ["-probesize", "8096"]  # start the stream before reading all TTS input
         async for chunk in get_ffmpeg_stream(
             audio_input=announcement_url,
             input_format=audio_format,
             output_format=output_format,
-            extra_args=extra_args,
             filter_params=filter_params,
+            extra_input_args=extra_input_args,
         ):
             yield chunk
 
index 6c57939d9e0ce23fc226e4af1315f177bf31e6a0..015b9a7f9659fb3332d0e29240b3dae5365a4cbb 100644 (file)
@@ -11,6 +11,7 @@ import socket
 import time
 import urllib.parse
 from contextlib import suppress
+from enum import StrEnum
 from typing import TYPE_CHECKING, cast
 
 from bidict import bidict
@@ -83,7 +84,8 @@ DEFAULT_SNAPSERVER_IP = "127.0.0.1"
 DEFAULT_SNAPSERVER_PORT = 1705
 DEFAULT_SNAPSTREAM_IDLE_THRESHOLD = 60000
 
-MASS_STREAM_POSTFIX = "Music Assistant"
+MASS_STREAM_PREFIX = "Music Assistant - "
+MASS_ANNOUNCEMENT_POSTFIX = " (announcement)"
 SNAPWEB_DIR = pathlib.Path(__file__).parent.resolve().joinpath("snapweb")
 CONTROL_SCRIPT = pathlib.Path(__file__).parent.resolve().joinpath("control.py")
 
@@ -105,6 +107,13 @@ DEFAULT_SNAPCAST_PCM_FORMAT = AudioFormat(
 )
 
 
+class SnapCastStreamType(StrEnum):
+    """Enum for Snapcast Stream Type."""
+
+    MUSIC = "MUSIC"
+    ANNOUNCEMENT = "ANNOUNCEMENT"
+
+
 async def setup(
     mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
 ) -> ProviderInstanceType:
@@ -404,6 +413,7 @@ class SnapCastProvider(PlayerProvider):
                     PlayerFeature.SET_MEMBERS,
                     PlayerFeature.VOLUME_SET,
                     PlayerFeature.VOLUME_MUTE,
+                    PlayerFeature.PLAY_ANNOUNCEMENT,
                 },
                 synced_to=self._synced_to(player_id),
                 can_group_with={self.instance_id},
@@ -423,14 +433,18 @@ class SnapCastProvider(PlayerProvider):
         player.volume_muted = snap_client.muted
         player.available = snap_client.connected
         player.synced_to = self._synced_to(player_id)
-        # if player.active_group is None:
-        if stream := self._get_snapstream(player_id):
+
+        # Note: when the active stream is a MASS stream the active_source is __not__ updated at all.
+        # So it doesn't matter whether a MASS stream is for music or announcements.
+        if stream := self._get_active_snapstream(player_id):
             if stream.identifier == "default":
                 player.active_source = None
-            elif not stream.identifier.startswith(MASS_STREAM_POSTFIX):
+            elif not stream.identifier.startswith(MASS_STREAM_PREFIX):
+                # unknown source
                 player.active_source = stream.identifier
         else:
             player.active_source = None
+
         self._group_childs(player_id)
         self.mass.players.update(player_id)
 
@@ -455,25 +469,30 @@ class SnapCastProvider(PlayerProvider):
         """Send VOLUME_SET command to given player."""
         snap_client_id = self._get_snapclient_id(player_id)
         await self._snapserver.client(snap_client_id).set_volume(volume_level)
-        self.mass.players.update(snap_client_id)
 
     async def cmd_stop(self, player_id: str) -> None:
         """Send STOP command to given player."""
+        # update the state first to avoid race conditions, if an active play_announcement
+        # finishes the player.state should be IDLE.
         player = self.mass.players.get(player_id, raise_unavailable=False)
+        player.state = PlayerState.IDLE
+        player.current_media = None
+        player.active_source = None
+        self._set_childs_state(player_id)
+        self.mass.players.update(player_id)
 
-        await self._get_snapgroup(player_id).set_stream("default")
-        await self._delete_current_snapstream(self._get_snapstream(player_id))
+        # we change the active stream only if music was playing
+        if not player.announcement_in_progress:
+            await self._get_snapgroup(player_id).set_stream("default")
+
+        # but we always delete the music stream (whether it was active or not)
+        await self._delete_stream(self._get_stream_name(player_id, SnapCastStreamType.MUSIC))
 
         if stream_task := self._stream_tasks.pop(player_id, None):
             if not stream_task.done():
                 stream_task.cancel()
                 with suppress(asyncio.CancelledError):
                     await stream_task
-        player.state = PlayerState.IDLE
-        player.current_media = None
-        player.active_source = None
-        self._set_childs_state(player_id)
-        self.mass.players.update(player_id)
 
     async def cmd_volume_mute(self, player_id: str, muted: bool) -> None:
         """Send MUTE command to given player."""
@@ -518,7 +537,7 @@ class SnapCastProvider(PlayerProvider):
         self.mass.players.update(player_id, skip_forward=True)
         self.mass.players.update(mass_player.synced_to, skip_forward=True)
 
-    async def play_media(self, player_id: str, media: PlayerMedia) -> None:  # noqa: PLR0915
+    async def play_media(self, player_id: str, media: PlayerMedia) -> None:
         """Handle PLAY MEDIA on given player."""
         player = self.mass.players.get(player_id)
         if player.synced_to:
@@ -533,23 +552,20 @@ class SnapCastProvider(PlayerProvider):
                     await stream_task
 
         # get stream or create new one
-        stream = await self._get_or_create_stream(player_id, media.queue_id or player_id)
-        snap_group = self._get_snapgroup(player_id)
-        await snap_group.set_stream(stream.identifier)
+        stream_name = self._get_stream_name(player_id, SnapCastStreamType.MUSIC)
+        stream = await self._get_or_create_stream(stream_name, media.queue_id or player_id)
+
+        # if no announcement is playing we activate the stream now, otherwise it
+        # will be activated by play_announcement when the announcement is over.
+        if not player.announcement_in_progress:
+            snap_group = self._get_snapgroup(player_id)
+            await snap_group.set_stream(stream.identifier)
 
         player.current_media = media
         player.active_source = media.queue_id
 
         # select audio source
-        if media.media_type == MediaType.ANNOUNCEMENT:
-            # special case: stream announcement
-            input_format = DEFAULT_SNAPCAST_FORMAT
-            audio_source = self.mass.streams.get_announcement_stream(
-                media.custom_data["url"],
-                output_format=DEFAULT_SNAPCAST_FORMAT,
-                use_pre_announce=media.custom_data["use_pre_announce"],
-            )
-        elif media.media_type == MediaType.PLUGIN_SOURCE:
+        if media.media_type == MediaType.PLUGIN_SOURCE:
             # special case: plugin source stream
             input_format = DEFAULT_SNAPCAST_FORMAT
             audio_source = self.mass.streams.get_plugin_source_stream(
@@ -584,12 +600,7 @@ class SnapCastProvider(PlayerProvider):
             )
 
         async def _streamer() -> None:
-            if stream.path:
-                stream_path = stream.path
-            if not stream.path:
-                stream_path = "tcp://" + stream._stream["uri"]["host"]
-            stream_path = stream_path.replace("0.0.0.0", self._snapcast_server_host)
-
+            stream_path = self._get_stream_path(stream)
             self.logger.debug("Start streaming to %s", stream_path)
             async with FFMpeg(
                 audio_input=audio_source,
@@ -622,11 +633,101 @@ class SnapCastProvider(PlayerProvider):
         # start streaming the queue (pcm) audio in a background task
         self._stream_tasks[player_id] = self.mass.create_task(_streamer())
 
-    async def _delete_current_snapstream(self, stream: Snapstream) -> None:
-        if not stream.identifier.startswith(MASS_STREAM_POSTFIX):
-            return
-        with suppress(TypeError, KeyError, AttributeError):
-            await self._snapserver.stream_remove_stream(stream.identifier)
+    async def play_announcement(
+        self, player_id: str, announcement: PlayerMedia, volume_level: int | None = None
+    ) -> None:
+        """Handle (provider native) playback of an announcement on given player."""
+        # get stream or create new one
+        stream_name = self._get_stream_name(player_id, SnapCastStreamType.ANNOUNCEMENT)
+        stream = await self._get_or_create_stream(stream_name, None)
+
+        # always activate the stream (announcements have priority over music)
+        snap_group = self._get_snapgroup(player_id)
+        await snap_group.set_stream(stream.identifier)
+
+        # Unfortunately snapcast sets a volume per client (not per stream), so we need a way to
+        # set the announcement volume without affecting the music volume.
+        # We go for the simplest solution: save the previous volume, change it, restore later
+        # (with the downside that the change will be visible in the UI)
+        player = self.mass.players.get(player_id)
+        orig_volume_level = player.volume_level
+        volume_level = self.mass.players.get_announcement_volume(player_id, volume_level)
+        await self.cmd_volume_set(player_id, volume_level)
+
+        input_format = DEFAULT_SNAPCAST_FORMAT
+        audio_source = self.mass.streams.get_announcement_stream(
+            announcement.custom_data["url"],
+            output_format=DEFAULT_SNAPCAST_FORMAT,
+            use_pre_announce=announcement.custom_data["use_pre_announce"],
+        )
+
+        # stream the audio, wait for it to finish (play_announcement should return after the
+        # announcement is over to avoid simultaneous announcements).
+        #
+        # Note: -probesize 8096 is needed to start playing the pre-announce before the TTS
+        #       data arrive (they arrive late, see get_announcement_stream).
+        #
+        stream_path = self._get_stream_path(stream)
+        self.logger.debug("Start announcement streaming to %s", stream_path)
+        async with FFMpeg(
+            audio_input=audio_source,
+            input_format=input_format,
+            output_format=DEFAULT_SNAPCAST_FORMAT,
+            filter_params=get_player_filter_params(
+                self.mass, player_id, input_format, DEFAULT_SNAPCAST_FORMAT
+            ),
+            audio_output=stream_path,
+            extra_input_args=["-y", "-re", "-probesize", "8096"],
+        ) as ffmpeg_proc:
+            await ffmpeg_proc.wait()
+
+        self.logger.debug("Finished announcement streaming to %s", stream_path)
+        # we need to wait a bit for the stream status to become idle
+        # to ensure that all snapclients have consumed the audio
+        while stream.status != "idle":
+            await asyncio.sleep(0.25)
+
+        # delete the announcement stream
+        await self._delete_stream(stream_name)
+
+        # restore volume, if it is still the same we set above
+        # (the user did not change it while the announcement was playing)
+        if player.volume_level == volume_level and orig_volume_level is not None:
+            await self.cmd_volume_set(player_id, orig_volume_level)
+
+        # and restore the group to either the default or the music stream
+        if player.state == PlayerState.IDLE:
+            new_stream_name = "default"
+        else:
+            new_stream_name = self._get_stream_name(player_id, SnapCastStreamType.MUSIC)
+        await self._get_snapgroup(player_id).set_stream(new_stream_name)
+
+    def _get_stream_name(self, player_id: str, stream_type: SnapCastStreamType) -> str:
+        """Return the name of the stream for the given player.
+
+        Each player can have up to two concurrent streams, for music and announcements.
+
+        The stream name depends only on player_id (not queue_id) for two reasones:
+        1. Avoid issues when the same queue_id is simultaneously used by two players
+           (eg in universal groups).
+        2. Easily identify which stream belongs to which player, for instance to be able to
+           delete a music stream even when it is not active due to an announcement.
+        """
+        player = self.mass.players.get(player_id)
+        safe_name = create_safe_string(player.display_name, replace_space=True)
+        stream_name = f"{MASS_STREAM_PREFIX}{safe_name}"
+        if stream_type == SnapCastStreamType.ANNOUNCEMENT:
+            stream_name += MASS_ANNOUNCEMENT_POSTFIX
+        return stream_name
+
+    def _get_stream_path(self, stream: Snapstream) -> str:
+        stream_path = stream.path or f"tcp://{stream._stream['uri']['host']}"
+        return stream_path.replace("0.0.0.0", self._snapcast_server_host)
+
+    async def _delete_stream(self, stream_name: str) -> None:
+        if stream := self._get_snapstream(stream_name):
+            with suppress(TypeError, KeyError, AttributeError):
+                await self._snapserver.stream_remove_stream(stream.identifier)
 
     def _get_snapgroup(self, player_id: str) -> Snapgroup:
         """Get snapcast group for given player_id."""
@@ -634,11 +735,16 @@ class SnapCastProvider(PlayerProvider):
         client: Snapclient = self._snapserver.client(snap_client_id)
         return client.group
 
-    def _get_snapstream(self, player_id: str) -> Snapstream | None:
-        """Get snapcast stream for given player_id."""
+    def _get_snapstream(self, stream_name: str) -> Snapstream | None:
+        """Get a stream by name."""
+        with suppress(KeyError):
+            return self._snapserver.stream(stream_name)
+        return None
+
+    def _get_active_snapstream(self, player_id: str) -> Snapstream | None:
+        """Get active stream for given player_id."""
         if group := self._get_snapgroup(player_id):
-            with suppress(KeyError):
-                return self._snapserver.stream(group.stream)
+            return self._get_snapstream(group.stream)
         return None
 
     def _synced_to(self, player_id: str) -> str | None:
@@ -665,20 +771,15 @@ class SnapCastProvider(PlayerProvider):
             and self._snapserver.client(snap_client_id).connected
         }
 
-    async def _get_or_create_stream(self, player_id: str, queue_id: str) -> Snapstream:
+    async def _get_or_create_stream(self, stream_name: str, queue_id: str | None) -> Snapstream:
         """Create new stream on snapcast server (or return existing one)."""
-        mass_queue = self.mass.player_queues.get(queue_id)
-        safe_name = create_safe_string(mass_queue.display_name, replace_space=True)
-        stream_name = f"{MASS_STREAM_POSTFIX} - {safe_name}"
-        # cancel any existing clear stream task
-        self.mass.cancel_timer(f"snapcast_clear_stream_{player_id}")
-
         # prefer to reuse existing stream if possible
-        for stream in self._snapserver.streams:
-            if stream.identifier == stream_name:
-                return stream
+        if stream := self._get_snapstream(stream_name):
+            return stream
 
-        if self._use_builtin_server:
+        # The control script is used only for music streams in the builtin server
+        # (queue_id is None only for announcement streams).
+        if self._use_builtin_server and queue_id:
             extra_args = (
                 f"&controlscript={urllib.parse.quote_plus(str(CONTROL_SCRIPT))}"
                 f"&controlscriptparams=--queueid={urllib.parse.quote_plus(queue_id)}%20"