import time
import urllib.parse
from contextlib import suppress
+from enum import StrEnum
from typing import TYPE_CHECKING, cast
from bidict import bidict
DEFAULT_SNAPSERVER_PORT = 1705
DEFAULT_SNAPSTREAM_IDLE_THRESHOLD = 60000
-MASS_STREAM_POSTFIX = "Music Assistant"
+MASS_STREAM_PREFIX = "Music Assistant - "
+MASS_ANNOUNCEMENT_POSTFIX = " (announcement)"
SNAPWEB_DIR = pathlib.Path(__file__).parent.resolve().joinpath("snapweb")
CONTROL_SCRIPT = pathlib.Path(__file__).parent.resolve().joinpath("control.py")
)
+class SnapCastStreamType(StrEnum):
+ """Enum for Snapcast Stream Type."""
+
+ MUSIC = "MUSIC"
+ ANNOUNCEMENT = "ANNOUNCEMENT"
+
+
async def setup(
mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
) -> ProviderInstanceType:
PlayerFeature.SET_MEMBERS,
PlayerFeature.VOLUME_SET,
PlayerFeature.VOLUME_MUTE,
+ PlayerFeature.PLAY_ANNOUNCEMENT,
},
synced_to=self._synced_to(player_id),
can_group_with={self.instance_id},
player.volume_muted = snap_client.muted
player.available = snap_client.connected
player.synced_to = self._synced_to(player_id)
- # if player.active_group is None:
- if stream := self._get_snapstream(player_id):
+
+ # Note: when the active stream is a MASS stream the active_source is __not__ updated at all.
+ # So it doesn't matter whether a MASS stream is for music or announcements.
+ if stream := self._get_active_snapstream(player_id):
if stream.identifier == "default":
player.active_source = None
- elif not stream.identifier.startswith(MASS_STREAM_POSTFIX):
+ elif not stream.identifier.startswith(MASS_STREAM_PREFIX):
+ # unknown source
player.active_source = stream.identifier
else:
player.active_source = None
+
self._group_childs(player_id)
self.mass.players.update(player_id)
"""Send VOLUME_SET command to given player."""
snap_client_id = self._get_snapclient_id(player_id)
await self._snapserver.client(snap_client_id).set_volume(volume_level)
- self.mass.players.update(snap_client_id)
async def cmd_stop(self, player_id: str) -> None:
"""Send STOP command to given player."""
+ # update the state first to avoid race conditions, if an active play_announcement
+ # finishes the player.state should be IDLE.
player = self.mass.players.get(player_id, raise_unavailable=False)
+ player.state = PlayerState.IDLE
+ player.current_media = None
+ player.active_source = None
+ self._set_childs_state(player_id)
+ self.mass.players.update(player_id)
- await self._get_snapgroup(player_id).set_stream("default")
- await self._delete_current_snapstream(self._get_snapstream(player_id))
+ # we change the active stream only if music was playing
+ if not player.announcement_in_progress:
+ await self._get_snapgroup(player_id).set_stream("default")
+
+ # but we always delete the music stream (whether it was active or not)
+ await self._delete_stream(self._get_stream_name(player_id, SnapCastStreamType.MUSIC))
if stream_task := self._stream_tasks.pop(player_id, None):
if not stream_task.done():
stream_task.cancel()
with suppress(asyncio.CancelledError):
await stream_task
- player.state = PlayerState.IDLE
- player.current_media = None
- player.active_source = None
- self._set_childs_state(player_id)
- self.mass.players.update(player_id)
async def cmd_volume_mute(self, player_id: str, muted: bool) -> None:
"""Send MUTE command to given player."""
self.mass.players.update(player_id, skip_forward=True)
self.mass.players.update(mass_player.synced_to, skip_forward=True)
- async def play_media(self, player_id: str, media: PlayerMedia) -> None: # noqa: PLR0915
+ async def play_media(self, player_id: str, media: PlayerMedia) -> None:
"""Handle PLAY MEDIA on given player."""
player = self.mass.players.get(player_id)
if player.synced_to:
await stream_task
# get stream or create new one
- stream = await self._get_or_create_stream(player_id, media.queue_id or player_id)
- snap_group = self._get_snapgroup(player_id)
- await snap_group.set_stream(stream.identifier)
+ stream_name = self._get_stream_name(player_id, SnapCastStreamType.MUSIC)
+ stream = await self._get_or_create_stream(stream_name, media.queue_id or player_id)
+
+ # if no announcement is playing we activate the stream now, otherwise it
+ # will be activated by play_announcement when the announcement is over.
+ if not player.announcement_in_progress:
+ snap_group = self._get_snapgroup(player_id)
+ await snap_group.set_stream(stream.identifier)
player.current_media = media
player.active_source = media.queue_id
# select audio source
- if media.media_type == MediaType.ANNOUNCEMENT:
- # special case: stream announcement
- input_format = DEFAULT_SNAPCAST_FORMAT
- audio_source = self.mass.streams.get_announcement_stream(
- media.custom_data["url"],
- output_format=DEFAULT_SNAPCAST_FORMAT,
- use_pre_announce=media.custom_data["use_pre_announce"],
- )
- elif media.media_type == MediaType.PLUGIN_SOURCE:
+ if media.media_type == MediaType.PLUGIN_SOURCE:
# special case: plugin source stream
input_format = DEFAULT_SNAPCAST_FORMAT
audio_source = self.mass.streams.get_plugin_source_stream(
)
async def _streamer() -> None:
- if stream.path:
- stream_path = stream.path
- if not stream.path:
- stream_path = "tcp://" + stream._stream["uri"]["host"]
- stream_path = stream_path.replace("0.0.0.0", self._snapcast_server_host)
-
+ stream_path = self._get_stream_path(stream)
self.logger.debug("Start streaming to %s", stream_path)
async with FFMpeg(
audio_input=audio_source,
# start streaming the queue (pcm) audio in a background task
self._stream_tasks[player_id] = self.mass.create_task(_streamer())
- async def _delete_current_snapstream(self, stream: Snapstream) -> None:
- if not stream.identifier.startswith(MASS_STREAM_POSTFIX):
- return
- with suppress(TypeError, KeyError, AttributeError):
- await self._snapserver.stream_remove_stream(stream.identifier)
+ async def play_announcement(
+ self, player_id: str, announcement: PlayerMedia, volume_level: int | None = None
+ ) -> None:
+ """Handle (provider native) playback of an announcement on given player."""
+ # get stream or create new one
+ stream_name = self._get_stream_name(player_id, SnapCastStreamType.ANNOUNCEMENT)
+ stream = await self._get_or_create_stream(stream_name, None)
+
+ # always activate the stream (announcements have priority over music)
+ snap_group = self._get_snapgroup(player_id)
+ await snap_group.set_stream(stream.identifier)
+
+ # Unfortunately snapcast sets a volume per client (not per stream), so we need a way to
+ # set the announcement volume without affecting the music volume.
+ # We go for the simplest solution: save the previous volume, change it, restore later
+ # (with the downside that the change will be visible in the UI)
+ player = self.mass.players.get(player_id)
+ orig_volume_level = player.volume_level
+ volume_level = self.mass.players.get_announcement_volume(player_id, volume_level)
+ await self.cmd_volume_set(player_id, volume_level)
+
+ input_format = DEFAULT_SNAPCAST_FORMAT
+ audio_source = self.mass.streams.get_announcement_stream(
+ announcement.custom_data["url"],
+ output_format=DEFAULT_SNAPCAST_FORMAT,
+ use_pre_announce=announcement.custom_data["use_pre_announce"],
+ )
+
+ # stream the audio, wait for it to finish (play_announcement should return after the
+ # announcement is over to avoid simultaneous announcements).
+ #
+ # Note: -probesize 8096 is needed to start playing the pre-announce before the TTS
+ # data arrive (they arrive late, see get_announcement_stream).
+ #
+ stream_path = self._get_stream_path(stream)
+ self.logger.debug("Start announcement streaming to %s", stream_path)
+ async with FFMpeg(
+ audio_input=audio_source,
+ input_format=input_format,
+ output_format=DEFAULT_SNAPCAST_FORMAT,
+ filter_params=get_player_filter_params(
+ self.mass, player_id, input_format, DEFAULT_SNAPCAST_FORMAT
+ ),
+ audio_output=stream_path,
+ extra_input_args=["-y", "-re", "-probesize", "8096"],
+ ) as ffmpeg_proc:
+ await ffmpeg_proc.wait()
+
+ self.logger.debug("Finished announcement streaming to %s", stream_path)
+ # we need to wait a bit for the stream status to become idle
+ # to ensure that all snapclients have consumed the audio
+ while stream.status != "idle":
+ await asyncio.sleep(0.25)
+
+ # delete the announcement stream
+ await self._delete_stream(stream_name)
+
+ # restore volume, if it is still the same we set above
+ # (the user did not change it while the announcement was playing)
+ if player.volume_level == volume_level and orig_volume_level is not None:
+ await self.cmd_volume_set(player_id, orig_volume_level)
+
+ # and restore the group to either the default or the music stream
+ if player.state == PlayerState.IDLE:
+ new_stream_name = "default"
+ else:
+ new_stream_name = self._get_stream_name(player_id, SnapCastStreamType.MUSIC)
+ await self._get_snapgroup(player_id).set_stream(new_stream_name)
+
+ def _get_stream_name(self, player_id: str, stream_type: SnapCastStreamType) -> str:
+ """Return the name of the stream for the given player.
+
+ Each player can have up to two concurrent streams, for music and announcements.
+
+ The stream name depends only on player_id (not queue_id) for two reasones:
+ 1. Avoid issues when the same queue_id is simultaneously used by two players
+ (eg in universal groups).
+ 2. Easily identify which stream belongs to which player, for instance to be able to
+ delete a music stream even when it is not active due to an announcement.
+ """
+ player = self.mass.players.get(player_id)
+ safe_name = create_safe_string(player.display_name, replace_space=True)
+ stream_name = f"{MASS_STREAM_PREFIX}{safe_name}"
+ if stream_type == SnapCastStreamType.ANNOUNCEMENT:
+ stream_name += MASS_ANNOUNCEMENT_POSTFIX
+ return stream_name
+
+ def _get_stream_path(self, stream: Snapstream) -> str:
+ stream_path = stream.path or f"tcp://{stream._stream['uri']['host']}"
+ return stream_path.replace("0.0.0.0", self._snapcast_server_host)
+
+ async def _delete_stream(self, stream_name: str) -> None:
+ if stream := self._get_snapstream(stream_name):
+ with suppress(TypeError, KeyError, AttributeError):
+ await self._snapserver.stream_remove_stream(stream.identifier)
def _get_snapgroup(self, player_id: str) -> Snapgroup:
"""Get snapcast group for given player_id."""
client: Snapclient = self._snapserver.client(snap_client_id)
return client.group
- def _get_snapstream(self, player_id: str) -> Snapstream | None:
- """Get snapcast stream for given player_id."""
+ def _get_snapstream(self, stream_name: str) -> Snapstream | None:
+ """Get a stream by name."""
+ with suppress(KeyError):
+ return self._snapserver.stream(stream_name)
+ return None
+
+ def _get_active_snapstream(self, player_id: str) -> Snapstream | None:
+ """Get active stream for given player_id."""
if group := self._get_snapgroup(player_id):
- with suppress(KeyError):
- return self._snapserver.stream(group.stream)
+ return self._get_snapstream(group.stream)
return None
def _synced_to(self, player_id: str) -> str | None:
and self._snapserver.client(snap_client_id).connected
}
- async def _get_or_create_stream(self, player_id: str, queue_id: str) -> Snapstream:
+ async def _get_or_create_stream(self, stream_name: str, queue_id: str | None) -> Snapstream:
"""Create new stream on snapcast server (or return existing one)."""
- mass_queue = self.mass.player_queues.get(queue_id)
- safe_name = create_safe_string(mass_queue.display_name, replace_space=True)
- stream_name = f"{MASS_STREAM_POSTFIX} - {safe_name}"
- # cancel any existing clear stream task
- self.mass.cancel_timer(f"snapcast_clear_stream_{player_id}")
-
# prefer to reuse existing stream if possible
- for stream in self._snapserver.streams:
- if stream.identifier == stream_name:
- return stream
+ if stream := self._get_snapstream(stream_name):
+ return stream
- if self._use_builtin_server:
+ # The control script is used only for music streams in the builtin server
+ # (queue_id is None only for announcement streams).
+ if self._use_builtin_server and queue_id:
extra_args = (
f"&controlscript={urllib.parse.quote_plus(str(CONTROL_SCRIPT))}"
f"&controlscriptparams=--queueid={urllib.parse.quote_plus(queue_id)}%20"