From c36e3a36e26c0bacc76a051236bbe90e317efac9 Mon Sep 17 00:00:00 2001 From: Mischa Siekmann <45062894+gnumpi@users.noreply.github.com> Date: Sun, 8 Feb 2026 20:18:50 +0100 Subject: [PATCH] Snapcast: Fix player availability issues and align state with server truth (#3104) --- .../controllers/players/sync_groups.py | 1 + .../providers/snapcast/constants.py | 8 - .../providers/snapcast/ma_stream.py | 518 ++++++++++++ music_assistant/providers/snapcast/player.py | 753 ++++++++++-------- .../providers/snapcast/provider.py | 401 ++++++++-- .../providers/snapcast/snap_cntrl_proto.py | 124 +++ 6 files changed, 1424 insertions(+), 381 deletions(-) create mode 100644 music_assistant/providers/snapcast/ma_stream.py create mode 100644 music_assistant/providers/snapcast/snap_cntrl_proto.py diff --git a/music_assistant/controllers/players/sync_groups.py b/music_assistant/controllers/players/sync_groups.py index 9da87d0e..11b4d5d1 100644 --- a/music_assistant/controllers/players/sync_groups.py +++ b/music_assistant/controllers/players/sync_groups.py @@ -53,6 +53,7 @@ SUPPORT_DYNAMIC_LEADER = { # and the music keeps playing uninterrupted. "airplay", "squeezelite", + "snapcast", # TODO: Get this working with Sonos as well (need to handle range requests) } diff --git a/music_assistant/providers/snapcast/constants.py b/music_assistant/providers/snapcast/constants.py index 7e5f5945..4764dcad 100644 --- a/music_assistant/providers/snapcast/constants.py +++ b/music_assistant/providers/snapcast/constants.py @@ -1,7 +1,6 @@ """Constants for snapcast provider.""" import pathlib -from enum import StrEnum from music_assistant_models.enums import ContentType from music_assistant_models.media_items.audio_format import AudioFormat @@ -60,10 +59,3 @@ DEFAULT_SNAPCAST_PCM_FORMAT = AudioFormat( bit_depth=16, channels=2, ) - - -class SnapCastStreamType(StrEnum): - """Enum for Snapcast Stream Type.""" - - MUSIC = "MUSIC" - ANNOUNCEMENT = "ANNOUNCEMENT" diff --git a/music_assistant/providers/snapcast/ma_stream.py b/music_assistant/providers/snapcast/ma_stream.py new file mode 100644 index 00000000..fe90c70f --- /dev/null +++ b/music_assistant/providers/snapcast/ma_stream.py @@ -0,0 +1,518 @@ +"""Music Assistant Snapcast source stream. + +This module implements a Music Assistant-managed Snapcast stream that is exposed to the +Snapcast server as a TCP source. The stream is produced by running an FFmpeg pipeline +which pulls audio from Music Assistant and pushes it to the Snapcast source URI. + +Optionally, a Unix socket server can be started to provide a control channel for a +Snapcast control script (used by the built-in Snapcast server integration). +""" + +from __future__ import annotations + +import asyncio +import random +import urllib.parse +from contextlib import suppress +from typing import TYPE_CHECKING, cast + +from music_assistant.helpers.audio import get_player_filter_params +from music_assistant.helpers.ffmpeg import FFMpeg +from music_assistant.providers.snapcast.socket_server import SnapcastSocketServer + +from .constants import ( + CONTROL_SOCKET_PATH_TEMPLATE, + DEFAULT_SNAPCAST_FORMAT, +) + +if TYPE_CHECKING: + from music_assistant_models.player import PlayerMedia + + from .provider import SnapCastProvider + from .snap_cntrl_proto import SnapstreamProto + + +class SnapcastMAStream: + """A Music Assistant-managed Snapcast stream. + + The stream lifecycle is: + - setup: ensure required server resources exist (Snapcast source, optional socket server) + - start_stream: start the FFmpeg streaming task + - request_stop_stream / wait_for_stopped: stop streaming and await termination + - destroy: stop streaming, remove Snapcast source, and stop ancillary services + + If `cntrl_queue_id` is provided, a Unix socket server is started to allow a Snapcast + control script to communicate with Music Assistant. + """ + + def __init__( + self, + provider: SnapCastProvider, + media: PlayerMedia, + stream_name: str, + source_id: str | None = None, + filter_settings_owner: str | None = None, + use_cntrl_script: bool = False, + destroy_on_stop: bool = False, + ) -> None: + """Initialize the stream. + + Args: + provider: The Snapcast provider instance. + media: The media item to stream. + stream_name: Name used to register the stream on the Snapcast server. + cntrl_queue_id: If set, enables the control socket server used by the control script. + filter_settings_owner: Player/entity id used to fetch DSP/filter parameters. + destroy_on_stop: If true, delete this MA stream once streaming stops. + """ + self.media = media + self.stream_name = stream_name + self.snap_stream: SnapstreamProto | None = None + + self._provider = provider + self._logger = provider.logger + self._mass = provider.mass + self._source_id = source_id + self._use_cntrl_script = use_cntrl_script + self._cntrl_queue_id = source_id if use_cntrl_script else None + self._filter_settings_owner = filter_settings_owner + self._destroy_on_stop = destroy_on_stop + + self._lifecycle_lock = asyncio.Lock() + self._destroyed = False + self._setup_done = False + self._is_streaming = False + self._restart_requested: bool = False + self._stop_requested: bool = False + + self._socket_server: SnapcastSocketServer | None = None + self._socket_path: str | None = None + self._streamer_task: asyncio.Task[None] | None = None + self._stop_streamer_evt = asyncio.Event() + self._streamer_started_evt = asyncio.Event() + self._stop_timer: asyncio.Handle | None = None + self._stop_timer_started_at: float | None = None + self._filter_settings: list[str] | None = None + + @property + def source_id(self) -> str | None: + """Return the source id this stream was created for.""" + return self._source_id + + @property + def stream_id(self) -> str | None: + """Return the Snapcast stream identifier, if registered.""" + if self.snap_stream: + return self.snap_stream.identifier + return None + + @property + def is_streaming(self) -> bool: + """Return True if the FFmpeg streaming task is currently running.""" + return self._is_streaming + + async def setup(self) -> None: + """Prepare the Snapcast stream resources. + + Ensures a Snapcast source exists on the server. If `cntrl_queue_id` is set, + also starts the Unix socket server used by the control script. + """ + async with self._lifecycle_lock: + if self._destroyed: + raise RuntimeError("Session is destroyed") + if self._setup_done: + return + if self._provider._snapserver is None: + raise RuntimeError("Snapserver needs to be setup first") + + if self._cntrl_queue_id: + await self._start_socket_server() + + await self._register_tcp_server_source() + self._setup_done = True + + async def destroy(self) -> None: + """Stop streaming and tear down all resources. + + This stops the streamer task (if running), removes the Snapcast source, + and stops the optional control socket server. + """ + async with self._lifecycle_lock: + if self._destroyed: + return + self._destroyed = True + + self.request_stop_stream() + await self.wait_for_stopped() + await self._remove_snap_source() + await self._stop_socket_server() + + async def start_stream(self, allow_restart: bool = False) -> None: + """Start streaming the configured media to the Snapcast source. + + Raises: + RuntimeError: If the streamer task is already running. + """ + await self.setup() + async with self._lifecycle_lock: + if self._streamer_task and not self._streamer_task.done(): + if not allow_restart: + raise RuntimeError("streamer already running") + self._restart_if_running() + return + + self._stop_requested = False + self._restart_requested = False + self._stop_streamer_evt.clear() + self._streamer_started_evt.clear() + self._streamer_task = self._mass.create_task(self._streamer_task_impl()) + self._streamer_task.add_done_callback(self._on_streamer_done) + + async def wait_for_started(self, timeout_sec: float | None = None) -> None: + """Wait until the streamer task signals it has started. + + Args: + timeout_sec: Optional timeout in seconds. + """ + try: + await asyncio.wait_for(self._streamer_started_evt.wait(), timeout_sec) + except TimeoutError: + self._logger.warning( + "Timeout waiting for stream %s to start; Canceling...", + self.stream_name, + ) + + def update_media(self, media: PlayerMedia) -> None: + """Update the media to play and restart the stream if required.""" + if media != self.media: + self.media = media + self._restart_if_running() + + def update_filter_settings(self, from_player: str | None = None) -> None: + """Update the filter setting.""" + take_from = from_player or self._filter_settings_owner + if not take_from: + raise RuntimeError("No player provided to read filter settings from.") + new_settings = get_player_filter_params( + self._mass, + take_from, + DEFAULT_SNAPCAST_FORMAT, + DEFAULT_SNAPCAST_FORMAT, + ) + if from_player: + self._filter_settings_owner = from_player + if new_settings != self._filter_settings: + self._restart_if_running() + + def request_stop_stream(self) -> None: + """Request the streamer task to stop. + + This is cooperative: the streamer task will stop when it observes the stop event. + Any pending inactivity stop timer is canceled. + """ + self._stop_requested = True + self._restart_requested = False # explicit stop cancels any pending restart + self._stop_streamer_evt.set() + + self._stop_timer_started_at = None + if self._stop_timer: + self._stop_timer.cancel() + + def set_in_use(self, in_use: bool) -> None: + """Mark the stream as in-use or idle. + + When marked idle, a delayed stop is scheduled. When marked in-use, any pending + delayed stop is canceled. + """ + if in_use: + self._stop_timer_started_at = None + if self._stop_timer: + self._stop_timer.cancel() + elif self._stop_timer_started_at is None: + self._stop_timer_started_at = self._mass.loop.time() + self._stop_timer = self._mass.loop.call_later(60.0, self.request_stop_stream) + + async def wait_for_stopped(self, timeout_sec: float | None = None) -> None: + """Wait for the streamer task to finish. + + If the task does not finish within the timeout, it is canceled and awaited. + + Args: + timeout_sec: Optional timeout in seconds. + """ + curr_task = self._streamer_task + if not curr_task: + return + try: + await asyncio.wait_for(curr_task, timeout_sec) + except asyncio.CancelledError: + self._logger.warning("Streamer task got canceled") + except TimeoutError: + self._logger.warning( + "Timeout waiting for stream %s to finish; Canceling...", + self.stream_name, + ) + curr_task.cancel() + await asyncio.gather(curr_task, return_exceptions=True) + + async def _streamer_task_impl(self) -> None: + """Streamer task implementation. + + Runs FFmpeg to push audio to the Snapcast TCP source until FFmpeg exits or a stop + request is received. After exit, waits briefly for the Snapcast stream to report + an idle state. + """ + stream_path = self._snap_get_stream_path() + if stream_path is None: + raise RuntimeError("The path to stream to is not set") + + self._logger.debug("Start streaming to %s", stream_path) + self._stop_streamer_evt.clear() + self._streamer_started_evt.clear() + if self._filter_settings_owner: + self._filter_settings = get_player_filter_params( + self._mass, + self._filter_settings_owner, + DEFAULT_SNAPCAST_FORMAT, + DEFAULT_SNAPCAST_FORMAT, + ) + audio_source = self._mass.streams.get_stream(self.media, DEFAULT_SNAPCAST_FORMAT) + try: + async with FFMpeg( + audio_input=audio_source, + input_format=DEFAULT_SNAPCAST_FORMAT, + output_format=DEFAULT_SNAPCAST_FORMAT, + filter_params=self._filter_settings or [], + audio_output=stream_path, + extra_input_args=["-y", "-re"], + ) as ffmpeg_proc: + wait_ffmpeg = self._mass.create_task(ffmpeg_proc.wait()) + wait_stop = self._mass.create_task(self._stop_streamer_evt.wait()) + self._streamer_started_evt.set() + self._is_streaming = True + + done, pending = await asyncio.wait( + {wait_ffmpeg, wait_stop}, + return_when=asyncio.FIRST_COMPLETED, + ) + + if wait_stop in done and wait_ffmpeg not in done: + self._logger.debug("Stopping stream %s requested.", self.stream_name) + wait_ffmpeg.cancel() + await asyncio.gather(wait_ffmpeg, return_exceptions=True) + return + + await wait_ffmpeg + for t in pending: + t.cancel() + await asyncio.gather(*pending, return_exceptions=True) + + finally: + self._is_streaming = False + self._logger.debug("Finished streaming to %s", stream_path) + # Wait a bit for snap stream to become idle + try: + + async def wait_until_idle() -> None: + while True: + stream_is_idle = False + with suppress(KeyError): + snap_stream = self._provider._snapserver.stream(self.stream_name) + stream_is_idle = snap_stream.status == "idle" + if self._mass.closing or stream_is_idle: + break + await asyncio.sleep(0.25) + + await asyncio.wait_for(wait_until_idle(), timeout=10.0) + + except TimeoutError: + self._logger.warning( + "Timeout waiting for stream %s to become idle", + self.stream_name, + ) + + def _on_streamer_done(self, t: asyncio.Task[None]) -> None: + """Handle streamer task completion and optional cleanup.""" + restart = False + try: + t.result() + except asyncio.CancelledError: + self._logger.debug("Streamer task cancelled: %s", self.stream_name) + except Exception: + self._logger.exception("Streamer task failed") + finally: + restart = self._restart_requested and not self._destroyed + + if self._streamer_task is t: + self._streamer_task = None + + # reset per-run state + self._restart_requested = False + self._stop_requested = False + self._stop_streamer_evt.clear() + self._streamer_started_evt.clear() + + if restart: + self._mass.create_task(self._restart_stream_locked()) + elif self._destroy_on_stop: + self._mass.create_task(self._provider.delete_ma_stream(self.stream_name)) + + def _restart_if_running(self) -> None: + """Request a running stream to restart.""" + t = self._streamer_task + if not t or t.done(): + return + + if self._stop_requested or self._stop_streamer_evt.is_set(): + return + + self._restart_requested = True + self._stop_requested = True + self._stop_streamer_evt.set() + + self._stop_timer_started_at = None + if self._stop_timer: + self._stop_timer.cancel() + + async def _restart_stream_locked(self) -> None: + """Restart the streamer under the lifecycle lock.""" + async with self._lifecycle_lock: + if self._destroyed: + return + if self._streamer_task and not self._streamer_task.done(): + return + + # reset state and start a fresh run + self._stop_requested = False + self._restart_requested = False + self._stop_streamer_evt.clear() + self._streamer_started_evt.clear() + + self._streamer_task = self._mass.create_task(self._streamer_task_impl()) + self._streamer_task.add_done_callback(self._on_streamer_done) + + async def _register_tcp_server_source(self) -> None: + """Create a Snapcast TCP source for this stream (or reuse an existing one).""" + # prefer to reuse existing stream if possible + if self.snap_stream: + return + + # The control script is used only for music streams in the builtin server + extra_args = "" + if (cntrl_queue_id := self._cntrl_queue_id) is not None: + # Create socket server for control script communication + socket_path = self._socket_path + if socket_path is None: + raise RuntimeError("socket_path needs to be set if cntrl_queue_id is set") + extra_args = ( + f"&controlscript={urllib.parse.quote_plus('control.py')}" + f"&controlscriptparams=--queueid={urllib.parse.quote_plus(cntrl_queue_id)}%20" + f"--socket={urllib.parse.quote_plus(socket_path)}%20" + f"--streamserver-ip={self._mass.streams.publish_ip}%20" + f"--streamserver-port={self._mass.streams.publish_port}" + ) + + attempts = 50 + while attempts: + attempts -= 1 + # pick a random port + port = random.randint(4953, 4953 + 200) + ## Do we need to add a time out here? + result = await self._provider._snapserver.stream_add_stream( + # NOTE: setting the sampleformat to something else + # (like 24 bits bit depth) does not seem to work at all! + f"tcp://0.0.0.0:{port}?sampleformat=48000:16:2" + f"&idle_threshold={self._provider._snapcast_stream_idle_threshold}" + f"{extra_args}&name={self.stream_name}" + ) + if result is None or "id" not in result: + # if the port is already taken, the result will be an error + self._logger.warning(result) + continue + ## Do we need to synchronize the snapserver repr first? + self.snap_stream = self._provider._snapserver.stream(result["id"]) + self.snap_stream.set_callback(self._snap_on_stream_update) + return + + if self._socket_server: + await self._stop_socket_server() + + msg = "Unable to create stream - No free port found?" + raise RuntimeError(msg) + + async def _remove_snap_source(self) -> None: + """Remove the Snapcast source created for this stream and detach groups.""" + if self._mass.closing or self.snap_stream is None: + return + + for snap_group in self._provider._snapserver.groups: + if snap_group.stream != self.snap_stream.identifier: + continue + self._logger.debug(f"Set stream of group {snap_group.name} to default.") + await snap_group.set_stream("default") + + with suppress(KeyError, AttributeError): + snap_stream = self._provider._snapserver.stream(self.stream_name) + await self._provider._snapserver.stream_remove_stream(snap_stream.identifier) + + if self._socket_server: + await self._stop_socket_server() + self._snap_on_stream_update() + + return + + def _snap_get_stream_path(self) -> str | None: + """Return the Snapcast TCP URI to stream to.""" + if self.snap_stream is None: + return None + + uri = self.snap_stream._stream.get("uri", {}) + uri_host = uri.get("host", "") + stream_path = self.snap_stream.path or f"tcp://{uri_host}" + return stream_path.replace("0.0.0.0", self._provider._snapcast_server_host) + + def _snap_on_stream_update(self, stream: SnapstreamProto | None = None) -> None: + """Handle Snapcast stream updates and trigger group member refresh.""" + if self.snap_stream is None: + return + + for snap_group in self._provider._snapserver.groups: + if snap_group.stream != self.snap_stream.identifier: + continue + self._provider.poke_group_members(snap_group) + + async def _start_socket_server(self) -> str: + """Get or create a socket server for the given queue. + + :return: The path to the Unix socket. + """ + if self._socket_server: + return self._socket_server.socket_path + + if self._cntrl_queue_id is None: + raise RuntimeError("Socket server require _cntrl_queue_id to be set") + + socket_path = CONTROL_SOCKET_PATH_TEMPLATE.format(queue_id=self._cntrl_queue_id) + socket_server = SnapcastSocketServer( + mass=self._mass, + queue_id=self._cntrl_queue_id, + socket_path=socket_path, + streamserver_ip=str(self._mass.streams.publish_ip), + streamserver_port=cast("int", self._mass.streams.publish_port), + ) + await socket_server.start() + self._socket_server = socket_server + self._socket_path = socket_path + self._logger.debug( + "Created socket server for queue %s at %s", self._cntrl_queue_id, socket_path + ) + return socket_path + + async def _stop_socket_server(self) -> None: + """Stop and remove the socket server for the given queue.""" + if not self._socket_server: + return + + await self._socket_server.stop() + self._socket_server = None + self._logger.debug("Stopped socket server for queue %s", self._cntrl_queue_id) diff --git a/music_assistant/providers/snapcast/player.py b/music_assistant/providers/snapcast/player.py index 221a3f74..7e6e7716 100644 --- a/music_assistant/providers/snapcast/player.py +++ b/music_assistant/providers/snapcast/player.py @@ -1,34 +1,58 @@ """Snapcast Player.""" +from __future__ import annotations + import asyncio -import random -import time -import urllib.parse from contextlib import suppress -from typing import TYPE_CHECKING, cast +from typing import TYPE_CHECKING, TypedDict, cast -from music_assistant_models.config_entries import ConfigEntry, ConfigValueType -from music_assistant_models.enums import PlaybackState, PlayerFeature +from music_assistant_models.enums import MediaType, PlaybackState, PlayerFeature from music_assistant_models.player import DeviceInfo, PlayerMedia -from snapcast.control.client import Snapclient -from snapcast.control.group import Snapgroup -from snapcast.control.stream import Snapstream - -from music_assistant.constants import ATTR_ANNOUNCEMENT_IN_PROGRESS -from music_assistant.helpers.audio import get_player_filter_params -from music_assistant.helpers.compare import create_safe_string -from music_assistant.helpers.ffmpeg import FFMpeg -from music_assistant.models.player import Player -from music_assistant.providers.snapcast.constants import ( - CONF_ENTRY_SAMPLE_RATES_SNAPCAST, - DEFAULT_SNAPCAST_FORMAT, - MASS_ANNOUNCEMENT_POSTFIX, - MASS_STREAM_PREFIX, - SnapCastStreamType, +from propcache import under_cached_property as cached_property + +from music_assistant.constants import ( + ATTR_ANNOUNCEMENT_IN_PROGRESS, + CONF_ENTRY_HTTP_PROFILE_HIDDEN, + SYNCGROUP_PREFIX, ) +from music_assistant.models.player import Player +from music_assistant.providers.snapcast.constants import CONF_ENTRY_SAMPLE_RATES_SNAPCAST +from music_assistant.providers.snapcast.ma_stream import SnapcastMAStream if TYPE_CHECKING: + from music_assistant_models.config_entries import ConfigEntry, ConfigValueType + from music_assistant.providers.snapcast.provider import SnapCastProvider + from music_assistant.providers.snapcast.snap_cntrl_proto import ( + SnapclientProto, + SnapstreamProto, + ) + + +class TrackedPlayerState(TypedDict, total=False): + """Tracked state for the Snapcast MA player. + + It is used for change detection and state synchronization, and may be + partially populated depending on which information is + currently available. + + Keys prefixed with ``_attr_`` are exposed as player attributes, while the + remaining keys represent internal Snapcast grouping and connection state. + """ + + # Player attribute fields + _attr_name: str + _attr_volume_level: float + _attr_volume_muted: bool + _attr_available: bool + + # snapclient fields + connected: bool + stream_id: str + stream_status: str | None + grp_name: str + grp_member_ids: list[str] + grp_member_avail: list[bool] class SnapCastPlayer(Player): @@ -36,92 +60,136 @@ class SnapCastPlayer(Player): def __init__( self, - provider: "SnapCastProvider", + provider: SnapCastProvider, player_id: str, - snap_client: Snapclient, - snap_client_id: str, + snap_client: SnapclientProto, ) -> None: """Init.""" - self.provider: SnapCastProvider # type: ignore[misc] self.snap_client = snap_client - self.snap_client_id = snap_client_id super().__init__(provider, player_id) - self._stream_task: asyncio.Task[None] | None = None + + self._snap_ma_stream: SnapcastMAStream | None = None + + self._update_worker: asyncio.Task[None] | None = None + self._poke_evt = asyncio.Event() + self._state_update_lock = asyncio.Lock() + self._last_tracked_state: TrackedPlayerState | None = None + + @property + def snap_provider(self) -> SnapCastProvider: + """Return the Snapcast provider instance.""" + return cast("SnapCastProvider", self.provider) @property def requires_flow_mode(self) -> bool: """Return if the player requires flow mode.""" return True - @property + @cached_property def synced_to(self) -> str | None: - """ - Return the id of the player this player is synced to (sync leader). + """Return the id of the player this player is synced to (sync leader).""" + grp_name = self.snap_group_name + if grp_name == self.player_id: + # is group leader + return None - If this player is not synced to another player (or is the sync leader itself), - this should return None. - If it is part of a (permanent) group, this should also return None. - """ - snap_group = self._get_snapgroup() - assert snap_group is not None # for type checking - master_id: str = self.provider._get_ma_id(snap_group.clients[0]) - if len(snap_group.clients) < 2 or self.player_id == master_id: + grp_player_ids = self._get_player_ids_of_curr_group() + if len(grp_player_ids) < 2 or grp_name not in grp_player_ids: return None - return master_id + + if leader_player := self.mass.players.get(grp_name): + return grp_name if leader_player.available else None + + return None + + @cached_property + def group_members(self) -> list[str]: + """Return the group members of the player.""" + if not self._attr_available: + return [] + + grp_name = self.snap_group_name + if grp_name != self.player_id: + # only group leaders can have members + return [] + + player_ids = self._get_player_ids_of_curr_group() + if self.player_id not in player_ids: + # should not happen, unless the current + # state repr is invalid + return [] + + player_ids.remove(self.player_id) + connected = [ + player_id + for player_id in player_ids + if (client := self.snap_provider.get_snap_client(player_id=player_id)) + and client.connected + ] + if connected: + return [self.player_id, *connected] + + return [] + + @property + def playback_state(self) -> PlaybackState: + """Return the current playback state of the player.""" + snap_stream = self._get_active_snapstream() + if snap_stream is None: + return PlaybackState.IDLE + + if snap_stream.identifier == "default" or snap_stream.status == "idle": + return PlaybackState.IDLE + + return PlaybackState.PLAYING def setup(self) -> None: """Set up player.""" self._attr_name = self.snap_client.friendly_name self._attr_available = self.snap_client.connected + + host_dict = self.snap_client._client.get("host", {}) + os, arch, ip = (host_dict.get(key, "") for key in ["os", "arch", "ip"]) self._attr_device_info = DeviceInfo( - model=self.snap_client._client.get("host").get("os"), - manufacturer=self.snap_client._client.get("host").get("arch"), + model=os, + manufacturer=arch, ) - self._attr_device_info.ip_address = self.snap_client._client.get("host").get("ip") + self._attr_device_info.ip_address = ip self._attr_supported_features = { PlayerFeature.SET_MEMBERS, PlayerFeature.VOLUME_SET, PlayerFeature.VOLUME_MUTE, PlayerFeature.PLAY_ANNOUNCEMENT, } - self._attr_can_group_with = {self.provider.instance_id} + self._attr_can_group_with = {self.snap_provider.instance_id} + if not self._update_worker: + self._update_worker = self.mass.create_task(self._player_update_worker) async def volume_set(self, volume_level: int) -> None: """Send VOLUME_SET command to given player.""" + # Use optimistic server state for now + # not guaranteed that the client respects it await self.snap_client.set_volume(volume_level) async def stop(self) -> None: """Send STOP command to given player.""" - # update the state first to avoid race conditions, if an active play_announcement - # finishes the player.state should be IDLE. - self._attr_playback_state = PlaybackState.IDLE - self._attr_current_media = None - self._set_childs_state() - - self.update_state() - - # we change the active stream only if music was playing - if not self.extra_data.get(ATTR_ANNOUNCEMENT_IN_PROGRESS): - snapgroup = self._get_snapgroup() - assert snapgroup is not None # for type checking - await snapgroup.set_stream("default") - - # but we always delete the music stream (whether it was active or not) - await self._delete_stream(self._get_stream_name(SnapCastStreamType.MUSIC)) + if ma_stream := self.active_snap_ma_stream: + ma_stream.request_stop_stream() + return - if self._stream_task is not None: - if not self._stream_task.done(): - self._stream_task.cancel() - with suppress(asyncio.CancelledError): - await self._stream_task - self._stream_task = None + self.poke_player_update() async def volume_mute(self, muted: bool) -> None: """Send MUTE command to given player.""" - # Using optimistic value because the library does not return the response from the api - await self.snap_client.set_muted(muted) - self._attr_volume_muted = muted - self.update_state() + # Use optimistic server state for now + # not guaranteed that the client respects it + # TODO: move this to the snapcast python library + vol = self.snap_client._client["config"]["volume"] + vol["muted"] = muted + res = await self.snap_provider._snapserver.client_volume(self.snap_client.identifier, vol) + if res and "muted" in res: + self.snap_client._client["config"]["volume"] = res + self.snap_client.callback() async def set_members( self, @@ -129,28 +197,76 @@ class SnapCastPlayer(Player): player_ids_to_remove: list[str] | None = None, ) -> None: """Handle SET_MEMBERS command on the player.""" - group = self._get_snapgroup() - assert group is not None # for type checking - # handle client additions - for player_id in player_ids_to_add or []: - snapcast_id = self.provider._get_snapclient_id(player_id) - if snapcast_id not in group.clients: - await group.add_client(snapcast_id) - if player_id not in self._attr_group_members: - self._attr_group_members.append(player_id) - # handle client removals - for player_id in player_ids_to_remove or []: - snapcast_id = self.provider._get_snapclient_id(player_id) - if snapcast_id in group.clients: - await group.remove_client(snapcast_id) - if player_id in self._attr_group_members: - self._attr_group_members.remove(player_id) - # Set default stream and stop ungrouped players - removed_snapclient = self.provider._snapserver.client(snapcast_id) - await removed_snapclient.group.set_stream("default") - if removed_player := self.mass.players.get(player_id): - await removed_player.stop() - self.update_state() + # get the group owned by this player (identified by the group name) + player_group = await self.snap_provider.ensure_player_owned_group(self.player_id) + + if player_group is None: + return + + player_group.set_callback(None) + + curr_ma_player_ids = [ + ma_id + for cli_id in player_group.clients + if (ma_id := self.snap_provider._get_ma_id(cli_id)) + ] + + curr_stream_id = player_group.stream + sync_group_player = None + if curr_ma_stream := self.snap_provider.get_snap_ma_stream(curr_stream_id): + media = curr_ma_stream.media + if media.media_type == MediaType.PLUGIN_SOURCE: + custom_data = media.custom_data or {} + assigned_player = custom_data.get("player_id", "") + if assigned_player.startswith(SYNCGROUP_PREFIX): + sync_group_player = self.mass.players.get(assigned_player) + else: + media_src_id = media.source_id or "" + if media_src_id.startswith(SYNCGROUP_PREFIX): + sync_group_player = self.mass.players.get(media_src_id) + + if sync_group_player and self.player_id in (player_ids_to_remove or []): + # players in sync_group_player.group_members will be rejoined + # remove others first + for id_to_remove in player_ids_to_remove or []: + if id_to_remove == self.player_id: + continue + if ( + id_to_remove in curr_ma_player_ids + and id_to_remove not in sync_group_player.group_members + ): + await self.snap_provider.isolate_player_to_dedicated_group( + id_to_remove, target_stream_id="default" + ) + + # split remaining group into individual groups, + # keeps the current stream, set this group to default stream + await self.snap_provider.isolate_player_to_dedicated_group( + target_player_id=self.player_id, + target_stream_id="default", + others_stream_id=curr_stream_id, + ) + else: + for player_id in player_ids_to_remove or []: + if player_id not in curr_ma_player_ids: + continue + await self.snap_provider.isolate_player_to_dedicated_group( + player_id, target_stream_id="default" + ) + curr_ma_player_ids.remove(player_id) + + for ma_id in player_ids_to_add or []: + if ( + snap_id := self.snap_provider._get_snapclient_id(ma_id) + ) and ma_id not in curr_ma_player_ids: + await player_group.add_client(snap_id) + + # some caller require instant state updates before returning + async with self._state_update_lock: + if await self._process_snapcast_client_state(): + self.update_state() + + self.snap_provider._update_group_callbacks(poke=True) async def play_media(self, media: PlayerMedia) -> None: """Handle PLAY MEDIA on given player.""" @@ -158,134 +274,70 @@ class SnapCastPlayer(Player): msg = "A synced player cannot receive play commands directly" raise RuntimeError(msg) - # stop any existing streamtasks first - if self._stream_task is not None: - if not self._stream_task.done(): - self._stream_task.cancel() - with suppress(asyncio.CancelledError): - await self._stream_task - self._stream_task = None + ma_stream = await self.snap_provider.get_snapcast_media_stream( + media, filter_settings_owner=self.player_id + ) + + if ma_stream is None or ma_stream.stream_id is None: + return + + self._snap_ma_stream = ma_stream - # get stream or create new one - stream_name = self._get_stream_name(SnapCastStreamType.MUSIC) - stream = await self._get_or_create_stream(stream_name, media.source_id or self.player_id) + # e.g. DSP settings require a restart + await self._snap_ma_stream.start_stream(allow_restart=True) # if no announcement is playing we activate the stream now, otherwise it # will be activated by play_announcement when the announcement is over. if not self.extra_data.get(ATTR_ANNOUNCEMENT_IN_PROGRESS): - snap_group = self._get_snapgroup() - assert snap_group is not None # for type checking - await snap_group.set_stream(stream.identifier) - - self._attr_current_media = media - - # select audio source - audio_source = self.mass.streams.get_stream(media, DEFAULT_SNAPCAST_FORMAT) - - async def _streamer() -> None: - stream_path = self._get_stream_path(stream) - self.logger.debug("Start streaming to %s", stream_path) - async with FFMpeg( - audio_input=audio_source, - input_format=DEFAULT_SNAPCAST_FORMAT, - output_format=DEFAULT_SNAPCAST_FORMAT, - filter_params=get_player_filter_params( - self.mass, self.player_id, DEFAULT_SNAPCAST_FORMAT, DEFAULT_SNAPCAST_FORMAT - ), - audio_output=stream_path, - extra_input_args=["-y", "-re"], - ) as ffmpeg_proc: - self._attr_playback_state = PlaybackState.PLAYING - self._attr_current_media = media - self._attr_elapsed_time = 0 - self._attr_elapsed_time_last_updated = time.time() - self.update_state() - - self._set_childs_state() - await ffmpeg_proc.wait() + player_group = await self.snap_provider.ensure_player_owned_group(self.player_id) + assert player_group is not None # for type checking + await player_group.set_stream(ma_stream.stream_id) - self.logger.debug("Finished streaming to %s", stream_path) - # we need to wait a bit for the stream status to become idle - # to ensure that all snapclients have consumed the audio - while stream.status != "idle": - await asyncio.sleep(0.25) - self._attr_playback_state = PlaybackState.IDLE - self._attr_elapsed_time = time.time() - self._attr_elapsed_time_last_updated - self.update_state() - self._set_childs_state() - - # start streaming the queue (pcm) audio in a background task - self._stream_task = self.mass.create_task(_streamer()) + self.poke_player_update() async def play_announcement( self, announcement: PlayerMedia, volume_level: int | None = None ) -> None: """Handle (provider native) playback of an announcement on given player.""" - # get stream or create new one - stream_name = self._get_stream_name(SnapCastStreamType.ANNOUNCEMENT) - stream = await self._get_or_create_stream(stream_name, None) + was_synced_to: str | None = self.synced_to + orig_volume_level: int | None = self.volume_level + + prev_stream = self.active_snap_ma_stream + + ma_stream = await self.snap_provider.get_snapcast_media_stream( + announcement, filter_settings_owner=self.player_id + ) + player_group = await self.snap_provider.ensure_player_owned_group(self.player_id) + + if ma_stream is None or ma_stream.stream_id is None or player_group is None: + return - # always activate the stream (announcements have priority over music) - snap_group = self._get_snapgroup() - assert snap_group is not None # for type checking - await snap_group.set_stream(stream.identifier) + await player_group.set_stream(ma_stream.stream_id) - # Unfortunately snapcast sets a volume per client (not per stream), so we need a way to - # set the announcement volume without affecting the music volume. - # We go for the simplest solution: save the previous volume, change it, restore later - # (with the downside that the change will be visible in the UI) - orig_volume_level = self.volume_level # Note: might be None + if self.snap_provider._use_builtin_server: + await asyncio.sleep(self.snap_provider._snapcast_server_buffer_size / 1000.0) if volume_level is not None: await self.volume_set(volume_level) - input_format = DEFAULT_SNAPCAST_FORMAT - assert announcement.custom_data is not None # for type checking - audio_source = self.mass.streams.get_announcement_stream( - announcement.custom_data["announcement_url"], - output_format=DEFAULT_SNAPCAST_FORMAT, - pre_announce=announcement.custom_data["pre_announce"], - pre_announce_url=announcement.custom_data["pre_announce_url"], - ) + await ma_stream.start_stream() + await ma_stream.wait_for_stopped() - # stream the audio, wait for it to finish (play_announcement should return after the - # announcement is over to avoid simultaneous announcements). - stream_path = self._get_stream_path(stream) - self.logger.debug("Start announcement streaming to %s", stream_path) - async with FFMpeg( - audio_input=audio_source, - input_format=input_format, - output_format=DEFAULT_SNAPCAST_FORMAT, - filter_params=get_player_filter_params( - self.mass, self.player_id, input_format, DEFAULT_SNAPCAST_FORMAT - ), - audio_output=stream_path, - extra_input_args=["-y", "-re"], - ) as ffmpeg_proc: - await ffmpeg_proc.wait() - - self.logger.debug("Finished announcement streaming to %s", stream_path) - # we need to wait a bit for the stream status to become idle - # to ensure that all snapclients have consumed the audio - while stream.status != "idle": - await asyncio.sleep(0.25) - - # delete the announcement stream - await self._delete_stream(stream_name) - - # restore volume, if we changed it above and it's still the same we set - # (the user did not change it himself while the announcement was playing) if self.volume_level == volume_level and orig_volume_level is not None: await self.volume_set(orig_volume_level) - # and restore the group to either the default or the music stream - if self.playback_state == PlaybackState.IDLE: - new_stream_name = "default" + if was_synced_to: + if ( + leader_group := await self.snap_provider.ensure_player_owned_group(was_synced_to) + ) is None: + return + await leader_group.add_client(self.snap_client.identifier) else: - new_stream_name = self._get_stream_name(SnapCastStreamType.MUSIC) - group = self._get_snapgroup() - assert group is not None # for type checking - await group.set_stream(new_stream_name) + await player_group.set_stream( + prev_stream.stream_id + if prev_stream and prev_stream.stream_id is not None + else "default" + ) async def get_config_entries( self, @@ -295,139 +347,206 @@ class SnapCastPlayer(Player): """Player config.""" return [ CONF_ENTRY_SAMPLE_RATES_SNAPCAST, + # we don't use the http server for streaming + CONF_ENTRY_HTTP_PROFILE_HIDDEN, ] - def _handle_player_update(self, snap_client: Snapclient) -> None: - """Process Snapcast update to Player controller. - - This is a callback function + def _handle_player_update(self, snap_client: SnapclientProto) -> None: + """Forward snap_client updates.""" + self.poke_player_update() + + def poke_player_update(self) -> None: + """Signal that a player state update should be processed.""" + self._poke_evt.set() + + async def _player_update_worker(self) -> None: + """Aggregate and process player state update requests.""" + while True: + await self._poke_evt.wait() + self._poke_evt.clear() + while True: + call_update: bool = False + async with self._state_update_lock: + call_update = await self._process_snapcast_client_state() + if call_update: + self.update_state() + if self._poke_evt.is_set(): + self._poke_evt.clear() + continue + break + + async def _process_snapcast_client_state(self) -> bool: + """Process the latest Snapcast client state and apply changes to this player. + + Returns: + True if changes were applied and a state update should be emitted via + ``update_state()``; False if no update is necessary (or if required data + is temporarily unavailable and the update should be retried later). """ - self._attr_name = self.snap_client.friendly_name - self._attr_volume_level = self.snap_client.volume - self._attr_volume_muted = self.snap_client.muted - self._attr_available = self.snap_client.connected + snap_group = self.snap_client.group + if snap_group is None: + # some data syncing error, a client is always a group member + # retry again later, don't call update now + return False + + stream_id = snap_group.stream + snap_stream: SnapstreamProto | None = None + with suppress(KeyError): + snap_stream = self.snap_provider._snapserver.stream(stream_id) + + members = list(snap_group.clients) # snapshot + + curr_state: TrackedPlayerState = { + "_attr_name": self.snap_client.friendly_name, + "_attr_volume_level": self.snap_client.volume, + "_attr_volume_muted": self.snap_client.muted, + "_attr_available": self.snap_client.connected, + "connected": self.snap_client.connected, + "stream_id": snap_group.stream, + "stream_status": snap_stream.status if snap_stream is not None else None, + "grp_name": snap_group.name, + "grp_member_ids": members, + "grp_member_avail": [ + pl.available + for cl_id in members + if (pl_id := self.snap_provider._get_ma_id(cl_id)) + and (pl := self.mass.players.get(pl_id)) + ], + } - # Note: when the active stream is a MASS stream the active_source is __not__ updated at all. - # So it doesn't matter whether a MASS stream is for music or announcements. - if stream := self._get_active_snapstream(): - if stream.identifier == "default": - self._attr_active_source = None - elif not stream.identifier.startswith(MASS_STREAM_PREFIX): - # unknown source - self._attr_active_source = stream.identifier - else: - self._attr_active_source = None + prev_state: TrackedPlayerState = ( + self._last_tracked_state if self._last_tracked_state is not None else {} + ) + self._last_tracked_state = curr_state + + # change detection for simple attrs + changed_attrs = { + k: v for k, v in curr_state.items() if k.startswith("_attr_") and prev_state.get(k) != v + } - self._group_childs() + prev_connected = prev_state.get("connected", False) + now_connected = curr_state.get("connected", False) + connection_changed = prev_connected != now_connected - self.update_state() + prev_stream_id = prev_state.get("stream_id") + curr_stream_id = curr_state["stream_id"] + prev_stream_status = prev_state.get("stream_status") + curr_stream_status = curr_state.get("stream_status") - def _get_stream_name(self, stream_type: SnapCastStreamType) -> str: - """Return the name of the stream for the given player. + stream_changed = ( + prev_stream_id != curr_stream_id or prev_stream_status != curr_stream_status + ) - Each player can have up to two concurrent streams, for music and announcements. + grouping_changed = any( + prev_state.get(k) != curr_state.get(k) + for k in ("grp_name", "grp_member_ids", "grp_member_avail") + ) - The stream name depends only on player_id (not queue_id) for two reasones: - 1. Avoid issues when the same queue_id is simultaneously used by two players - (eg in universal groups). - 2. Easily identify which stream belongs to which player, for instance to be able to - delete a music stream even when it is not active due to an announcement. - """ - safe_name = create_safe_string(self.player_id, replace_space=True) - stream_name = f"{MASS_STREAM_PREFIX}{safe_name}" - if stream_type == SnapCastStreamType.ANNOUNCEMENT: - stream_name += MASS_ANNOUNCEMENT_POSTFIX - return stream_name - - async def _get_or_create_stream(self, stream_name: str, queue_id: str | None) -> Snapstream: - """Create new stream on snapcast server (or return existing one).""" - # prefer to reuse existing stream if possible - if stream := self._get_snapstream(stream_name): - return stream - # The control script is used only for music streams in the builtin server - # (queue_id is None only for announcement streams). - extra_args = "" - if ( - self.provider._use_builtin_server - and queue_id - and self.provider._controlscript_available + needs_processing = bool( + changed_attrs or grouping_changed or stream_changed or connection_changed + ) + if not needs_processing: + return False + + if connection_changed or grouping_changed: + self.snap_provider.poke_group_members(snap_group) + + # help cleaning up unused streams + if curr_stream_id == "default" or ( + (my_stream := self._snap_ma_stream) + and my_stream.stream_id in {prev_stream_id, curr_stream_id} ): - # Create socket server for control script communication - socket_path = await self.provider.get_or_create_socket_server(queue_id) - extra_args = ( - f"&controlscript={urllib.parse.quote_plus('control.py')}" - f"&controlscriptparams=--queueid={urllib.parse.quote_plus(queue_id)}%20" - f"--socket={urllib.parse.quote_plus(socket_path)}%20" - f"--streamserver-ip={self.mass.streams.publish_ip}%20" - f"--streamserver-port={self.mass.streams.publish_port}" - ) + self.snap_provider.update_stream_usage() - attempts = 50 - while attempts: - attempts -= 1 - # pick a random port - port = random.randint(4953, 4953 + 200) - result = await self.provider._snapserver.stream_add_stream( - # NOTE: setting the sampleformat to something else - # (like 24 bits bit depth) does not seem to work at all! - f"tcp://0.0.0.0:{port}?sampleformat=48000:16:2" - f"&idle_threshold={self.provider._snapcast_stream_idle_threshold}" - f"{extra_args}&name={stream_name}" - ) - if "id" not in result: - # if the port is already taken, the result will be an error - self.logger.warning(result) - continue - return self.provider._snapserver.stream(result["id"]) - msg = "Unable to create stream - No free port found?" - raise RuntimeError(msg) - - def _get_snapstream(self, stream_name: str) -> Snapstream | None: - """Get a stream by name.""" - with suppress(KeyError): - return self.provider._snapserver.stream(stream_name) + # apply changed attrs + for key, value in changed_attrs.items(): + setattr(self, key, value) + + # finally notify state update once + return True + + @property + def active_snap_ma_stream(self) -> SnapcastMAStream | None: + """Return the MA stream source of the active group.""" + grp = self.snap_client.group + if grp is None or grp.stream is None: + return None + + if grp.stream == "default": + return None + + return self.snap_provider.get_snap_ma_stream(grp.stream) + + @property + def snap_group_name(self) -> str: + """Return the name of the active group.""" + snap_group = self.snap_client.group + if snap_group is None: + return "" + return snap_group.name + + @cached_property + def _current_media(self) -> PlayerMedia | None: + """ + Return the current media being played by the player. + + Note that this is NOT the final current media of the player, + as it may be overridden by a active group/sync membership. + Hence it's marked as a private property. + The final current media can be retrieved by using the 'current_media' property. + """ + if snap_ma_stream := self.active_snap_ma_stream: + return snap_ma_stream.media return None - def _get_stream_path(self, stream: Snapstream) -> str: - stream_path = stream.path or f"tcp://{stream._stream['uri']['host']}" - return stream_path.replace("0.0.0.0", self.provider._snapcast_server_host) - - async def _delete_stream(self, stream_name: str) -> None: - if stream := self._get_snapstream(stream_name): - with suppress(TypeError, KeyError, AttributeError): - await self.provider._snapserver.stream_remove_stream(stream.identifier) - - def _get_snapgroup(self) -> Snapgroup | None: - """Get snapcast group for given player_id.""" - return cast("Snapgroup | None", self.snap_client.group) - - def _set_childs_state(self) -> None: - """Set the state of the child`s of the player.""" - for child_player_id in self.group_members: - if child_player_id == self.player_id: - continue - if mass_child_player := self.mass.players.get(child_player_id): - mass_child_player._attr_playback_state = self.playback_state - mass_child_player.update_state() - - def _get_active_snapstream(self) -> Snapstream | None: + @property + def _active_source(self) -> str | None: + """ + Return the (id of) the active source of the player. + + Only required if the player supports PlayerFeature.SELECT_SOURCE. + + Set to None if the player is not currently playing a source or + the player_id if the player is currently playing a MA queue. + + Note that this is NOT the final active source of the player, + as it may be overridden by a active group/sync membership. + Hence it's marked as a private property. + The final active source can be retrieved by using the 'active_source' property. + """ + grp = self.snap_client.group + if grp is None or grp.stream is None: + return None + + if grp.stream == "default": + return None + + if ma_stream := self.snap_provider.get_snap_ma_stream(grp.stream): + return ma_stream.source_id + + # external snapcast stream + return grp.stream or None + + def _get_active_snapstream(self) -> SnapstreamProto | None: """Get active stream for given player_id.""" - if group := self._get_snapgroup(): - return self._get_snapstream(group.stream) + if group := self.snap_client.group: + with suppress(KeyError): + return self.snap_provider._snapserver.stream(group.stream) return None - def _group_childs(self) -> None: - """Return player_ids of the players synced to this player.""" - snap_group = self._get_snapgroup() - assert snap_group is not None # for type checking - self._attr_group_members.clear() - if self.synced_to is not None: - return - self._attr_group_members.append(self.player_id) - for snap_client_id in snap_group.clients: - if ( - self.provider._get_ma_id(snap_client_id) != self.player_id - and self.provider._snapserver.client(snap_client_id).connected - ): - self._attr_group_members.append(self.provider._get_ma_id(snap_client_id)) - self.update_state() + def _get_player_ids_of_curr_group(self) -> list[str]: + snap_group = self.snap_client.group + if snap_group is None: + return [] + return [ + ma_id + for client_id in snap_group.clients + if (ma_id := self.snap_provider._get_ma_id(client_id)) + ] + + def _get_players_of_curr_group(self) -> list[Player]: + return [ + ma_player + for ma_id in self._get_player_ids_of_curr_group() + if (ma_player := self.mass.players.get(ma_id)) + ] diff --git a/music_assistant/providers/snapcast/provider.py b/music_assistant/providers/snapcast/provider.py index 6ef28602..f46f9195 100644 --- a/music_assistant/providers/snapcast/provider.py +++ b/music_assistant/providers/snapcast/provider.py @@ -1,23 +1,25 @@ """SnapCastProvider.""" +from __future__ import annotations + import asyncio +import hashlib import logging import re import shutil import socket +from contextlib import suppress from pathlib import Path -from typing import cast +from typing import TYPE_CHECKING, cast from bidict import bidict -from music_assistant_models.enums import PlaybackState +from music_assistant_models.enums import MediaType, PlaybackState from music_assistant_models.errors import SetupFailedError -from snapcast.control import create_server -from snapcast.control.client import Snapclient -from snapcast.control.group import Snapgroup -from snapcast.control.server import Snapserver +from snapcast.control.server import CONTROL_PORT, Snapserver from zeroconf import NonUniqueNameException from zeroconf.asyncio import AsyncServiceInfo +from music_assistant.helpers.compare import create_safe_string from music_assistant.helpers.process import AsyncProcess from music_assistant.helpers.util import get_ip_pton from music_assistant.models.player_provider import PlayerProvider @@ -32,18 +34,37 @@ from music_assistant.providers.snapcast.constants import ( CONF_STREAM_IDLE_THRESHOLD, CONF_USE_EXTERNAL_SERVER, CONTROL_SCRIPT, - CONTROL_SOCKET_PATH_TEMPLATE, DEFAULT_SNAPSERVER_PORT, + MASS_ANNOUNCEMENT_POSTFIX, + MASS_STREAM_PREFIX, SNAPWEB_DIR, ) +from music_assistant.providers.snapcast.ma_stream import SnapcastMAStream from music_assistant.providers.snapcast.player import SnapCastPlayer -from music_assistant.providers.snapcast.socket_server import SnapcastSocketServer +from music_assistant.providers.universal_group.constants import UGP_PREFIX + +if TYPE_CHECKING: + from music_assistant_models.player import PlayerMedia + + from .snap_cntrl_proto import SnapclientProto, SnapgroupProto, SnapserverProto + + +async def _create_cntrl_server( + loop: asyncio.AbstractEventLoop, + host: str, + port: int = CONTROL_PORT, + reconnect: bool = False, +) -> SnapserverProto: + """Server factory.""" + server = Snapserver(loop, host, port, reconnect) + await server.start() + return cast("SnapserverProto", server) class SnapCastProvider(PlayerProvider): """SnapCastProvider.""" - _snapserver: Snapserver + _snapserver: SnapserverProto _snapserver_runner: asyncio.Task[None] | None _snapserver_started: asyncio.Event | None _snapcast_server_host: str @@ -52,7 +73,17 @@ class SnapCastProvider(PlayerProvider): _use_builtin_server: bool _stop_called: bool _controlscript_available: bool - _socket_servers: dict[str, SnapcastSocketServer] # queue_id -> socket server + _snapcast_ma_streams: dict[str, SnapcastMAStream] + _snapcast_ma_streams_lock: asyncio.Lock + + @property + def use_queue_control(self) -> bool: + """Return whether queue-based control scripts are available. + + Indicates if the Snapcast control script has been successfully initialized + and can be used to control playback via a queue-specific control channel. + """ + return self._controlscript_available async def handle_async_init(self) -> None: """Handle async initialization of the provider.""" @@ -61,11 +92,12 @@ class SnapCastProvider(PlayerProvider): self._use_builtin_server = not self.config.get_value(CONF_USE_EXTERNAL_SERVER) self._stop_called = False self._controlscript_available = False - self._socket_servers = {} if self._use_builtin_server: self._snapcast_server_host = "127.0.0.1" self._snapcast_server_control_port = DEFAULT_SNAPSERVER_PORT - self._snapcast_server_buffer_size = self.config.get_value(CONF_SERVER_BUFFER_SIZE) + self._snapcast_server_buffer_size = cast( + "int", self.config.get_value(CONF_SERVER_BUFFER_SIZE) + ) self._snapcast_server_chunk_ms = self.config.get_value(CONF_SERVER_CHUNK_MS) self._snapcast_server_initial_volume = self.config.get_value(CONF_SERVER_INITIAL_VOLUME) self._snapcast_server_send_to_muted = self.config.get_value( @@ -82,13 +114,16 @@ class SnapCastProvider(PlayerProvider): self._snapcast_stream_idle_threshold = self.config.get_value(CONF_STREAM_IDLE_THRESHOLD) self._ids_map = bidict({}) + self._snapcast_ma_streams = {} + self._snapcast_ma_streams_lock = asyncio.Lock() + if self._use_builtin_server: await self._start_builtin_server() else: self._snapserver_runner = None self._snapserver_started = None try: - self._snapserver = await create_server( + self._snapserver = await _create_cntrl_server( self.mass.loop, self._snapcast_server_host, port=self._snapcast_server_control_port, @@ -123,6 +158,10 @@ class SnapCastProvider(PlayerProvider): if player.playback_state != PlaybackState.PLAYING: continue await player.stop() + + for stream_name in list(self._snapcast_ma_streams): + await self.delete_ma_stream(stream_name) + self._snapserver.stop() await self._stop_builtin_server() @@ -252,9 +291,9 @@ class SnapCastProvider(PlayerProvider): # The snapserver doesn't always cleanup the control script processes # properly. We do it explicitly when closing a socket server. # Should be fixed on the server side, though. - for socket_server in list(self._socket_servers.values()): - await socket_server.stop() - self._socket_servers.clear() + for stream_name in list(self._snapcast_ma_streams): + await self.delete_ma_stream(stream_name) + self._snapcast_ma_streams.clear() raise def _get_ma_id(self, snap_client_id: str) -> str: @@ -277,19 +316,16 @@ class SnapCastProvider(PlayerProvider): return new_id return self._get_ma_id(snap_client_id) - def _handle_player_init(self, snap_client: Snapclient) -> SnapCastPlayer: + def _handle_player_init(self, snap_client: SnapclientProto) -> SnapCastPlayer: """Process Snapcast add to Player controller.""" player_id = self._generate_and_register_id(snap_client.identifier) player = self.mass.players.get(player_id, raise_unavailable=False) if not player: - snap_client = cast( - "Snapclient", self._snapserver.client(self._get_snapclient_id(player_id)) - ) + snap_client = self._snapserver.client(self._get_snapclient_id(player_id)) player = SnapCastPlayer( provider=self, player_id=player_id, snap_client=snap_client, - snap_client_id=self._get_snapclient_id(player_id), ) player.setup() else: @@ -310,22 +346,21 @@ class SnapCastProvider(PlayerProvider): if ma_player := self._handle_player_init(snap_client): snap_client.set_callback(ma_player._handle_player_update) for snap_client in self._snapserver.clients: - if player := self.mass.players.get(self._get_ma_id(snap_client.identifier)): - ma_player = cast("SnapCastPlayer", player) - snap_client.set_callback(ma_player._handle_player_update) - for snap_group in self._snapserver.groups: - snap_group.set_callback(self._handle_group_update) + if player := self.get_snap_player(client_id=snap_client.identifier): + snap_client.set_callback(player._handle_player_update) + self._update_group_callbacks() - def _handle_group_update(self, snap_group: Snapgroup) -> None: + def poke_group_members(self, snap_group: SnapgroupProto) -> None: """Process Snapcast group callback.""" - for snap_client in self._snapserver.clients: - if ma_player := self.mass.players.get(self._get_ma_id(snap_client.identifier)): - assert isinstance(ma_player, SnapCastPlayer) # for type checking - ma_player._handle_player_update(snap_client) + for snap_client_id in snap_group.clients: + if ma_player := self.get_snap_player(client_id=snap_client_id): + ma_player.poke_player_update() def _handle_disconnect(self, exc: Exception) -> None: """Handle disconnect callback from snapserver.""" if self._stop_called or self.mass.closing: + # prevent auto-reconnecting of snapcast controller + self._snapserver.stop() # we're instructed to stop/exit, so no need to restart the connection return self.logger.info( @@ -345,34 +380,288 @@ class SnapCastProvider(PlayerProvider): else: self.logger.warning("Unable to remove snapclient %s: %s", player_id, error_msg) - async def get_or_create_socket_server(self, queue_id: str) -> str: - """Get or create a socket server for the given queue. + def _update_group_callbacks(self, poke: bool = False) -> None: + for grp in self._snapserver.groups: + grp.set_callback(self.poke_group_members) + if poke: + self.poke_group_members(grp) + + async def ensure_player_owned_group( + self, ma_player_id: str, set_stream_id: str | None = None + ) -> SnapgroupProto | None: + """Ensure a Snapcast group is owned by the given player. + + This method guarantees that the returned Snapcast group is *owned* by the + specified Music Assistant player, meaning the group name equals the + player's ID and the player is the group leader. + + Behavior: + - If the player is already the leader of its current group, that group is + returned unchanged. + - If the player is a member of another group (but not the leader), the + player is removed from that group, which causes Snapcast to create a new + single-client group for the player. + - The resulting group is renamed to the player's ID. - :param queue_id: The queue ID to create a socket server for. - :return: The path to the Unix socket. + If `set_stream_id` is provided and a new group is created, the group's + stream is updated accordingly. + + Args: + ma_player_id: Music Assistant player ID. + set_stream_id: Optional Snapcast stream ID to assign to the player's group. + + Returns: + The Snapcast group owned by the player, or ``None`` if the player is not + currently part of any group. """ - if queue_id in self._socket_servers: - return self._socket_servers[queue_id].socket_path - - socket_path = CONTROL_SOCKET_PATH_TEMPLATE.format(queue_id=queue_id) - socket_server = SnapcastSocketServer( - mass=self.mass, - queue_id=queue_id, - socket_path=socket_path, - streamserver_ip=str(self.mass.streams.publish_ip), - streamserver_port=cast("int", self.mass.streams.publish_port), + player_client = self.get_snap_client(player_id=ma_player_id) + if player_client is None: + return None + + curr_group = player_client.group + + if curr_group is None: + return None + + if curr_group.name == ma_player_id: + return curr_group + + group_members = list(curr_group.clients) + if len(group_members) > 1 and curr_group.name: + # player is member of other player group, remove it, which results in a new group + group_members.remove(player_client.identifier) + res = await self._snapserver.group_clients(curr_group.identifier, group_members) + if not (isinstance(res, dict) and "server" in res): + raise RuntimeError("Couldn't remove client from group") + self._snapserver.synchronize(res) + curr_group = player_client.group + if curr_group is None: + return None + if set_stream_id: + await curr_group.set_stream(set_stream_id) + + await curr_group.set_name(ma_player_id) + return curr_group + + async def isolate_player_to_dedicated_group( + self, + target_player_id: str, + target_stream_id: str | None = None, + others_stream_id: str | None = "default", + ) -> None: + """Isolate a player into a dedicated Snapcast group. + + Ensures that the target player ends up in a group where it is the sole + member and group leader. + + Behavior: + - The target player is first ensured to own its group. + - All other members of that group are removed. + - Each removed player is placed into its own dedicated group. + - Removed players' groups are optionally assigned `others_stream_id`. + - The target group is optionally assigned `target_stream_id`. + + Callbacks for affected clients and groups are temporarily disabled during + the operation to avoid intermediate state updates. + + Args: + target_player_id: Music Assistant player ID to isolate. + target_stream_id: Optional stream ID to assign to the target player's group. + others_stream_id: Stream ID assigned to newly created groups for removed players. + """ + this_client_id = self._get_snapclient_id(target_player_id) + target_group = await self.ensure_player_owned_group( + target_player_id, set_stream_id=target_stream_id ) - await socket_server.start() - self._socket_servers[queue_id] = socket_server - self.logger.debug("Created socket server for queue %s at %s", queue_id, socket_path) - return socket_path - async def stop_socket_server(self, queue_id: str) -> None: - """Stop and remove the socket server for the given queue. + if target_group is None: + return + + target_group.set_callback(None) + group_members = list(target_group.clients) + group_members.remove(this_client_id) + for client_id in group_members: + client = self._snapserver.client(client_id) + client.set_callback(None) + if group_members: + res = await self._snapserver.group_clients(target_group.identifier, [this_client_id]) + if not (isinstance(res, dict) and "server" in res): + raise RuntimeError("Couldn't remove client from group") + self._snapserver.synchronize(res) + for client_id in group_members: + ma_player_id = self._get_ma_id(client_id) + if ma_player := cast("SnapCastPlayer", self.mass.players.get(ma_player_id)): + client = self._snapserver.client(client_id) + if client is not None: + if client.group is not None: + await client.group.set_name(ma_player_id) + if others_stream_id: + await client.group.set_stream(others_stream_id) + client.set_callback(ma_player._handle_player_update) + + if target_stream_id is not None: + await target_group.set_stream(target_stream_id) + + async def get_snapcast_media_stream( + self, + media: PlayerMedia, + filter_settings_owner: str | None = None, + existing_only: bool = False, + ) -> SnapcastMAStream | None: + """Get or create a Snapcast Music Assistant stream for the given media. - :param queue_id: The queue ID to stop the socket server for. + Determines a deterministic Snapcast stream name based on the media type + and source, and either returns an existing stream or creates a new one. + + Behavior: + - Announcement and generic media streams use a hashed name. + - Plugin and queue-backed sources reuse a stable stream name. + - Queue-backed streams may persist across playback sessions. + - If `existing_only` is True, no new stream will be created. + + Newly created streams are registered with the Snapcast server and fully + set up before being returned. + + Args: + media: Media item to stream. + filter_settings_owner: Optional player/entity ID used to resolve DSP filters. + existing_only: If True, only return an existing stream. + + Returns: + A ``SnapcastMAStream`` instance, or ``None`` if no stream exists and + `existing_only` is True. + """ + stream_name: str = "" + name_suffix: str = "" + queue_id: str | None = None + source_id: str | None = None + destroy_on_stop = True + + if media.media_type == MediaType.ANNOUNCEMENT: + stream_name += hashlib.md5(media.uri.encode()).hexdigest()[:6] + name_suffix = MASS_ANNOUNCEMENT_POSTFIX + elif media.media_type == MediaType.PLUGIN_SOURCE: + custom_data = media.custom_data or {} + plugin: str = media.title or custom_data.get("provider") or "" + player: str = f" {custom_data.get('player_id', '')}" + stream_name += f"{plugin} {player}" + source_id = custom_data.get("source_id") + elif media.source_id and media.source_id.startswith(UGP_PREFIX): + stream_name += media.source_id + elif media.source_id and media.queue_item_id: + stream_name += media.source_id + queue_id = media.source_id + source_id = media.source_id + destroy_on_stop = False + else: + stream_name += hashlib.md5(media.uri.encode()).hexdigest()[:6] + + stream_name = create_safe_string(stream_name, lowercase=False) + stream_name = f"{MASS_STREAM_PREFIX}{stream_name}{name_suffix}" + async with self._snapcast_ma_streams_lock: + if not (stream := self._snapcast_ma_streams.get(stream_name)): + if existing_only: + return None + + stream = SnapcastMAStream( + provider=self, + media=media, + stream_name=stream_name, + filter_settings_owner=filter_settings_owner, + source_id=source_id, + use_cntrl_script=bool(queue_id) and self.use_queue_control, + destroy_on_stop=destroy_on_stop, + ) + self._snapcast_ma_streams[stream_name] = stream + else: + stream.update_media(media) + await stream.setup() + return stream + + def get_snap_ma_stream(self, stream_name: str) -> SnapcastMAStream | None: + """Return an existing Music Assistant Snapcast stream by name. + + Args: + stream_name: Snapcast stream name. + + Returns: + The corresponding ``SnapcastMAStream`` instance, or ``None`` if not found. + """ + return self._snapcast_ma_streams.get(stream_name) + + async def delete_ma_stream(self, stream_name: str) -> None: + """Remove and destroy a Music Assistant Snapcast stream. + + The stream is removed from internal tracking and its resources are + destroyed asynchronously. Errors during destruction are logged but + otherwise ignored. + + Args: + stream_name: Snapcast stream name to delete. """ - if queue_id in self._socket_servers: - await self._socket_servers[queue_id].stop() - del self._socket_servers[queue_id] - self.logger.debug("Stopped socket server for queue %s", queue_id) + async with self._snapcast_ma_streams_lock: + stream = self._snapcast_ma_streams.pop(stream_name, None) + + if not stream: + return + + try: + await stream.destroy() + except Exception: + self.logger.exception("Failed to destroy stream session %s", stream_name) + + def update_stream_usage(self) -> None: + """Update usage state for all tracked Snapcast streams. + + Marks streams as "in use" if they are currently assigned to any Snapcast + group, and schedules unused streams for delayed shutdown. + + This method should be called whenever group or stream assignments change + on the Snapcast server. + """ + unused_streams = set(self._snapcast_ma_streams.keys()) + for grp in self._snapserver.groups: + stream_id = grp.stream + if stream_id in self._snapcast_ma_streams: + ma_stream = self._snapcast_ma_streams[stream_id] + ma_stream.set_in_use(True) + unused_streams.discard(stream_id) + + if not unused_streams: + break + + for stream_id in unused_streams: + self._snapcast_ma_streams[stream_id].set_in_use(False) + + def get_snap_client( + self, *, client_id: str | None = None, player_id: str | None = None + ) -> SnapclientProto | None: + """Return the snapclient for either given client_id or player_id.""" + if player_id is not None: + if client_id is not None and client_id != self._get_snapclient_id(client_id): + raise ValueError("provided client_id and player_id do not match") + client_id = self._get_snapclient_id(player_id) + + if client_id: + with suppress(KeyError): + return self._snapserver.client(client_id) + + return None + + def get_snap_player( + self, *, client_id: str | None = None, player_id: str | None = None + ) -> SnapCastPlayer | None: + """Return the MA SnapCastPlayer for either given client_id or player_id.""" + if client_id is not None: + if player_id is not None and player_id != self._get_ma_id(client_id): + raise ValueError("provided client_id and player_id do not match") + player_id = self._get_ma_id(client_id) + + if player_id is None: + return None + + if ma_player := self.mass.players.get(player_id): + assert isinstance(ma_player, SnapCastPlayer) # for type checking + return ma_player + + return None diff --git a/music_assistant/providers/snapcast/snap_cntrl_proto.py b/music_assistant/providers/snapcast/snap_cntrl_proto.py new file mode 100644 index 00000000..95933453 --- /dev/null +++ b/music_assistant/providers/snapcast/snap_cntrl_proto.py @@ -0,0 +1,124 @@ +# ruff: noqa: D100, D101, D102 + +from __future__ import annotations + +from collections.abc import Callable +from typing import Any, Protocol + +type RetVal = dict[str, Any] | None +type RetTuple = tuple[RetVal, RetVal] + +SnapclientCallback = Callable[["SnapclientProto"], Any] +SnapgroupCallback = Callable[["SnapgroupProto"], Any] +SnapstreamCallback = Callable[["SnapstreamProto"], Any] + + +class SnapgroupProto(Protocol): + @property + def identifier(self) -> str: ... + @property + def name(self) -> str: ... + @property + def clients(self) -> list[str]: ... + @property + def stream(self) -> str: ... + def callback(self) -> None: ... + def set_callback(self, func: SnapgroupCallback | None) -> None: ... + async def set_name(self, name: str) -> RetVal: ... + async def set_stream(self, stream_id: str) -> RetVal: ... + async def add_client(self, client_identifier: str) -> None: ... + + +class SnapclientProto(Protocol): + # read-only properties used by your code + @property + def identifier(self) -> str: ... + + @property + def group(self) -> SnapgroupProto | None: ... + + @property + def friendly_name(self) -> str: ... + + @property + def connected(self) -> bool: ... + + @property + def name(self) -> str: ... + + @property + def latency(self) -> int: ... + + @property + def muted(self) -> bool: ... + + @property + def volume(self) -> int: ... + + _client: dict[str, Any] + + def groups_available(self) -> list[SnapgroupProto]: ... + + async def set_muted(self, status: bool) -> None: ... + async def set_volume(self, percent: int, update_group: bool = True) -> None: ... + + def callback(self) -> None: ... + def set_callback(self, func: SnapclientCallback | None) -> None: ... + + +class SnapstreamProto(Protocol): + _stream: dict[str, Any] + + @property + def identifier(self) -> str: ... + @property + def friendly_name(self) -> str: ... + @property + def status(self) -> str: ... + @property + def path(self) -> str | None: ... + def callback(self) -> None: ... + def set_callback(self, func: SnapstreamCallback | None) -> None: ... + + +class SnapserverProto(Protocol): + # lifecycle + async def start(self) -> Any: ... + def stop(self) -> None: ... + + # collections (as in the external lib: list(...) views) + @property + def groups(self) -> list[SnapgroupProto]: ... + @property + def clients(self) -> list[SnapclientProto]: ... + @property + def streams(self) -> list[SnapstreamProto]: ... + + # accessors + def group(self, group_identifier: str) -> SnapgroupProto: ... + def client(self, client_identifier: str) -> SnapclientProto: ... + def stream(self, stream_identifier: str) -> SnapstreamProto: ... + + async def delete_client(self, identifier: str) -> RetTuple: ... + + async def status(self) -> tuple[Any, Any]: ... + + async def group_clients(self, identifier: str, clients: list[str]) -> RetVal: ... + async def group_name(self, identifier: str, name: str) -> RetVal: ... + async def group_stream(self, identifier: str, stream_id: str) -> RetVal: ... + + async def client_name(self, identifier: str, name: str) -> RetVal: ... + async def client_latency(self, identifier: str, latency: int) -> RetVal: ... + async def client_volume(self, identifier: str, volume: dict[str, Any]) -> RetVal: ... + + async def stream_add_stream(self, stream_uri: str) -> RetVal: ... + async def stream_remove_stream(self, identifier: str) -> RetVal: ... + + # state sync + def synchronize(self, status: dict[str, Any]) -> None: ... + + # callbacks (if you use them) + def set_on_update_callback(self, func: Callable[[], Any] | None) -> None: ... + def set_on_connect_callback(self, func: Callable[[], Any] | None) -> None: ... + def set_on_disconnect_callback(self, func: Callable[[Exception], Any] | None) -> None: ... + def set_new_client_callback(self, func: Callable[[SnapclientProto], Any] | None) -> None: ... -- 2.34.1