# and the music keeps playing uninterrupted.
"airplay",
"squeezelite",
+ "snapcast",
# TODO: Get this working with Sonos as well (need to handle range requests)
}
"""Constants for snapcast provider."""
import pathlib
-from enum import StrEnum
from music_assistant_models.enums import ContentType
from music_assistant_models.media_items.audio_format import AudioFormat
bit_depth=16,
channels=2,
)
-
-
-class SnapCastStreamType(StrEnum):
- """Enum for Snapcast Stream Type."""
-
- MUSIC = "MUSIC"
- ANNOUNCEMENT = "ANNOUNCEMENT"
--- /dev/null
+"""Music Assistant Snapcast source stream.
+
+This module implements a Music Assistant-managed Snapcast stream that is exposed to the
+Snapcast server as a TCP source. The stream is produced by running an FFmpeg pipeline
+which pulls audio from Music Assistant and pushes it to the Snapcast source URI.
+
+Optionally, a Unix socket server can be started to provide a control channel for a
+Snapcast control script (used by the built-in Snapcast server integration).
+"""
+
+from __future__ import annotations
+
+import asyncio
+import random
+import urllib.parse
+from contextlib import suppress
+from typing import TYPE_CHECKING, cast
+
+from music_assistant.helpers.audio import get_player_filter_params
+from music_assistant.helpers.ffmpeg import FFMpeg
+from music_assistant.providers.snapcast.socket_server import SnapcastSocketServer
+
+from .constants import (
+ CONTROL_SOCKET_PATH_TEMPLATE,
+ DEFAULT_SNAPCAST_FORMAT,
+)
+
+if TYPE_CHECKING:
+ from music_assistant_models.player import PlayerMedia
+
+ from .provider import SnapCastProvider
+ from .snap_cntrl_proto import SnapstreamProto
+
+
+class SnapcastMAStream:
+ """A Music Assistant-managed Snapcast stream.
+
+ The stream lifecycle is:
+ - setup: ensure required server resources exist (Snapcast source, optional socket server)
+ - start_stream: start the FFmpeg streaming task
+ - request_stop_stream / wait_for_stopped: stop streaming and await termination
+ - destroy: stop streaming, remove Snapcast source, and stop ancillary services
+
+ If `cntrl_queue_id` is provided, a Unix socket server is started to allow a Snapcast
+ control script to communicate with Music Assistant.
+ """
+
+ def __init__(
+ self,
+ provider: SnapCastProvider,
+ media: PlayerMedia,
+ stream_name: str,
+ source_id: str | None = None,
+ filter_settings_owner: str | None = None,
+ use_cntrl_script: bool = False,
+ destroy_on_stop: bool = False,
+ ) -> None:
+ """Initialize the stream.
+
+ Args:
+ provider: The Snapcast provider instance.
+ media: The media item to stream.
+ stream_name: Name used to register the stream on the Snapcast server.
+ cntrl_queue_id: If set, enables the control socket server used by the control script.
+ filter_settings_owner: Player/entity id used to fetch DSP/filter parameters.
+ destroy_on_stop: If true, delete this MA stream once streaming stops.
+ """
+ self.media = media
+ self.stream_name = stream_name
+ self.snap_stream: SnapstreamProto | None = None
+
+ self._provider = provider
+ self._logger = provider.logger
+ self._mass = provider.mass
+ self._source_id = source_id
+ self._use_cntrl_script = use_cntrl_script
+ self._cntrl_queue_id = source_id if use_cntrl_script else None
+ self._filter_settings_owner = filter_settings_owner
+ self._destroy_on_stop = destroy_on_stop
+
+ self._lifecycle_lock = asyncio.Lock()
+ self._destroyed = False
+ self._setup_done = False
+ self._is_streaming = False
+ self._restart_requested: bool = False
+ self._stop_requested: bool = False
+
+ self._socket_server: SnapcastSocketServer | None = None
+ self._socket_path: str | None = None
+ self._streamer_task: asyncio.Task[None] | None = None
+ self._stop_streamer_evt = asyncio.Event()
+ self._streamer_started_evt = asyncio.Event()
+ self._stop_timer: asyncio.Handle | None = None
+ self._stop_timer_started_at: float | None = None
+ self._filter_settings: list[str] | None = None
+
+ @property
+ def source_id(self) -> str | None:
+ """Return the source id this stream was created for."""
+ return self._source_id
+
+ @property
+ def stream_id(self) -> str | None:
+ """Return the Snapcast stream identifier, if registered."""
+ if self.snap_stream:
+ return self.snap_stream.identifier
+ return None
+
+ @property
+ def is_streaming(self) -> bool:
+ """Return True if the FFmpeg streaming task is currently running."""
+ return self._is_streaming
+
+ async def setup(self) -> None:
+ """Prepare the Snapcast stream resources.
+
+ Ensures a Snapcast source exists on the server. If `cntrl_queue_id` is set,
+ also starts the Unix socket server used by the control script.
+ """
+ async with self._lifecycle_lock:
+ if self._destroyed:
+ raise RuntimeError("Session is destroyed")
+ if self._setup_done:
+ return
+ if self._provider._snapserver is None:
+ raise RuntimeError("Snapserver needs to be setup first")
+
+ if self._cntrl_queue_id:
+ await self._start_socket_server()
+
+ await self._register_tcp_server_source()
+ self._setup_done = True
+
+ async def destroy(self) -> None:
+ """Stop streaming and tear down all resources.
+
+ This stops the streamer task (if running), removes the Snapcast source,
+ and stops the optional control socket server.
+ """
+ async with self._lifecycle_lock:
+ if self._destroyed:
+ return
+ self._destroyed = True
+
+ self.request_stop_stream()
+ await self.wait_for_stopped()
+ await self._remove_snap_source()
+ await self._stop_socket_server()
+
+ async def start_stream(self, allow_restart: bool = False) -> None:
+ """Start streaming the configured media to the Snapcast source.
+
+ Raises:
+ RuntimeError: If the streamer task is already running.
+ """
+ await self.setup()
+ async with self._lifecycle_lock:
+ if self._streamer_task and not self._streamer_task.done():
+ if not allow_restart:
+ raise RuntimeError("streamer already running")
+ self._restart_if_running()
+ return
+
+ self._stop_requested = False
+ self._restart_requested = False
+ self._stop_streamer_evt.clear()
+ self._streamer_started_evt.clear()
+ self._streamer_task = self._mass.create_task(self._streamer_task_impl())
+ self._streamer_task.add_done_callback(self._on_streamer_done)
+
+ async def wait_for_started(self, timeout_sec: float | None = None) -> None:
+ """Wait until the streamer task signals it has started.
+
+ Args:
+ timeout_sec: Optional timeout in seconds.
+ """
+ try:
+ await asyncio.wait_for(self._streamer_started_evt.wait(), timeout_sec)
+ except TimeoutError:
+ self._logger.warning(
+ "Timeout waiting for stream %s to start; Canceling...",
+ self.stream_name,
+ )
+
+ def update_media(self, media: PlayerMedia) -> None:
+ """Update the media to play and restart the stream if required."""
+ if media != self.media:
+ self.media = media
+ self._restart_if_running()
+
+ def update_filter_settings(self, from_player: str | None = None) -> None:
+ """Update the filter setting."""
+ take_from = from_player or self._filter_settings_owner
+ if not take_from:
+ raise RuntimeError("No player provided to read filter settings from.")
+ new_settings = get_player_filter_params(
+ self._mass,
+ take_from,
+ DEFAULT_SNAPCAST_FORMAT,
+ DEFAULT_SNAPCAST_FORMAT,
+ )
+ if from_player:
+ self._filter_settings_owner = from_player
+ if new_settings != self._filter_settings:
+ self._restart_if_running()
+
+ def request_stop_stream(self) -> None:
+ """Request the streamer task to stop.
+
+ This is cooperative: the streamer task will stop when it observes the stop event.
+ Any pending inactivity stop timer is canceled.
+ """
+ self._stop_requested = True
+ self._restart_requested = False # explicit stop cancels any pending restart
+ self._stop_streamer_evt.set()
+
+ self._stop_timer_started_at = None
+ if self._stop_timer:
+ self._stop_timer.cancel()
+
+ def set_in_use(self, in_use: bool) -> None:
+ """Mark the stream as in-use or idle.
+
+ When marked idle, a delayed stop is scheduled. When marked in-use, any pending
+ delayed stop is canceled.
+ """
+ if in_use:
+ self._stop_timer_started_at = None
+ if self._stop_timer:
+ self._stop_timer.cancel()
+ elif self._stop_timer_started_at is None:
+ self._stop_timer_started_at = self._mass.loop.time()
+ self._stop_timer = self._mass.loop.call_later(60.0, self.request_stop_stream)
+
+ async def wait_for_stopped(self, timeout_sec: float | None = None) -> None:
+ """Wait for the streamer task to finish.
+
+ If the task does not finish within the timeout, it is canceled and awaited.
+
+ Args:
+ timeout_sec: Optional timeout in seconds.
+ """
+ curr_task = self._streamer_task
+ if not curr_task:
+ return
+ try:
+ await asyncio.wait_for(curr_task, timeout_sec)
+ except asyncio.CancelledError:
+ self._logger.warning("Streamer task got canceled")
+ except TimeoutError:
+ self._logger.warning(
+ "Timeout waiting for stream %s to finish; Canceling...",
+ self.stream_name,
+ )
+ curr_task.cancel()
+ await asyncio.gather(curr_task, return_exceptions=True)
+
+ async def _streamer_task_impl(self) -> None:
+ """Streamer task implementation.
+
+ Runs FFmpeg to push audio to the Snapcast TCP source until FFmpeg exits or a stop
+ request is received. After exit, waits briefly for the Snapcast stream to report
+ an idle state.
+ """
+ stream_path = self._snap_get_stream_path()
+ if stream_path is None:
+ raise RuntimeError("The path to stream to is not set")
+
+ self._logger.debug("Start streaming to %s", stream_path)
+ self._stop_streamer_evt.clear()
+ self._streamer_started_evt.clear()
+ if self._filter_settings_owner:
+ self._filter_settings = get_player_filter_params(
+ self._mass,
+ self._filter_settings_owner,
+ DEFAULT_SNAPCAST_FORMAT,
+ DEFAULT_SNAPCAST_FORMAT,
+ )
+ audio_source = self._mass.streams.get_stream(self.media, DEFAULT_SNAPCAST_FORMAT)
+ try:
+ async with FFMpeg(
+ audio_input=audio_source,
+ input_format=DEFAULT_SNAPCAST_FORMAT,
+ output_format=DEFAULT_SNAPCAST_FORMAT,
+ filter_params=self._filter_settings or [],
+ audio_output=stream_path,
+ extra_input_args=["-y", "-re"],
+ ) as ffmpeg_proc:
+ wait_ffmpeg = self._mass.create_task(ffmpeg_proc.wait())
+ wait_stop = self._mass.create_task(self._stop_streamer_evt.wait())
+ self._streamer_started_evt.set()
+ self._is_streaming = True
+
+ done, pending = await asyncio.wait(
+ {wait_ffmpeg, wait_stop},
+ return_when=asyncio.FIRST_COMPLETED,
+ )
+
+ if wait_stop in done and wait_ffmpeg not in done:
+ self._logger.debug("Stopping stream %s requested.", self.stream_name)
+ wait_ffmpeg.cancel()
+ await asyncio.gather(wait_ffmpeg, return_exceptions=True)
+ return
+
+ await wait_ffmpeg
+ for t in pending:
+ t.cancel()
+ await asyncio.gather(*pending, return_exceptions=True)
+
+ finally:
+ self._is_streaming = False
+ self._logger.debug("Finished streaming to %s", stream_path)
+ # Wait a bit for snap stream to become idle
+ try:
+
+ async def wait_until_idle() -> None:
+ while True:
+ stream_is_idle = False
+ with suppress(KeyError):
+ snap_stream = self._provider._snapserver.stream(self.stream_name)
+ stream_is_idle = snap_stream.status == "idle"
+ if self._mass.closing or stream_is_idle:
+ break
+ await asyncio.sleep(0.25)
+
+ await asyncio.wait_for(wait_until_idle(), timeout=10.0)
+
+ except TimeoutError:
+ self._logger.warning(
+ "Timeout waiting for stream %s to become idle",
+ self.stream_name,
+ )
+
+ def _on_streamer_done(self, t: asyncio.Task[None]) -> None:
+ """Handle streamer task completion and optional cleanup."""
+ restart = False
+ try:
+ t.result()
+ except asyncio.CancelledError:
+ self._logger.debug("Streamer task cancelled: %s", self.stream_name)
+ except Exception:
+ self._logger.exception("Streamer task failed")
+ finally:
+ restart = self._restart_requested and not self._destroyed
+
+ if self._streamer_task is t:
+ self._streamer_task = None
+
+ # reset per-run state
+ self._restart_requested = False
+ self._stop_requested = False
+ self._stop_streamer_evt.clear()
+ self._streamer_started_evt.clear()
+
+ if restart:
+ self._mass.create_task(self._restart_stream_locked())
+ elif self._destroy_on_stop:
+ self._mass.create_task(self._provider.delete_ma_stream(self.stream_name))
+
+ def _restart_if_running(self) -> None:
+ """Request a running stream to restart."""
+ t = self._streamer_task
+ if not t or t.done():
+ return
+
+ if self._stop_requested or self._stop_streamer_evt.is_set():
+ return
+
+ self._restart_requested = True
+ self._stop_requested = True
+ self._stop_streamer_evt.set()
+
+ self._stop_timer_started_at = None
+ if self._stop_timer:
+ self._stop_timer.cancel()
+
+ async def _restart_stream_locked(self) -> None:
+ """Restart the streamer under the lifecycle lock."""
+ async with self._lifecycle_lock:
+ if self._destroyed:
+ return
+ if self._streamer_task and not self._streamer_task.done():
+ return
+
+ # reset state and start a fresh run
+ self._stop_requested = False
+ self._restart_requested = False
+ self._stop_streamer_evt.clear()
+ self._streamer_started_evt.clear()
+
+ self._streamer_task = self._mass.create_task(self._streamer_task_impl())
+ self._streamer_task.add_done_callback(self._on_streamer_done)
+
+ async def _register_tcp_server_source(self) -> None:
+ """Create a Snapcast TCP source for this stream (or reuse an existing one)."""
+ # prefer to reuse existing stream if possible
+ if self.snap_stream:
+ return
+
+ # The control script is used only for music streams in the builtin server
+ extra_args = ""
+ if (cntrl_queue_id := self._cntrl_queue_id) is not None:
+ # Create socket server for control script communication
+ socket_path = self._socket_path
+ if socket_path is None:
+ raise RuntimeError("socket_path needs to be set if cntrl_queue_id is set")
+ extra_args = (
+ f"&controlscript={urllib.parse.quote_plus('control.py')}"
+ f"&controlscriptparams=--queueid={urllib.parse.quote_plus(cntrl_queue_id)}%20"
+ f"--socket={urllib.parse.quote_plus(socket_path)}%20"
+ f"--streamserver-ip={self._mass.streams.publish_ip}%20"
+ f"--streamserver-port={self._mass.streams.publish_port}"
+ )
+
+ attempts = 50
+ while attempts:
+ attempts -= 1
+ # pick a random port
+ port = random.randint(4953, 4953 + 200)
+ ## Do we need to add a time out here?
+ result = await self._provider._snapserver.stream_add_stream(
+ # NOTE: setting the sampleformat to something else
+ # (like 24 bits bit depth) does not seem to work at all!
+ f"tcp://0.0.0.0:{port}?sampleformat=48000:16:2"
+ f"&idle_threshold={self._provider._snapcast_stream_idle_threshold}"
+ f"{extra_args}&name={self.stream_name}"
+ )
+ if result is None or "id" not in result:
+ # if the port is already taken, the result will be an error
+ self._logger.warning(result)
+ continue
+ ## Do we need to synchronize the snapserver repr first?
+ self.snap_stream = self._provider._snapserver.stream(result["id"])
+ self.snap_stream.set_callback(self._snap_on_stream_update)
+ return
+
+ if self._socket_server:
+ await self._stop_socket_server()
+
+ msg = "Unable to create stream - No free port found?"
+ raise RuntimeError(msg)
+
+ async def _remove_snap_source(self) -> None:
+ """Remove the Snapcast source created for this stream and detach groups."""
+ if self._mass.closing or self.snap_stream is None:
+ return
+
+ for snap_group in self._provider._snapserver.groups:
+ if snap_group.stream != self.snap_stream.identifier:
+ continue
+ self._logger.debug(f"Set stream of group {snap_group.name} to default.")
+ await snap_group.set_stream("default")
+
+ with suppress(KeyError, AttributeError):
+ snap_stream = self._provider._snapserver.stream(self.stream_name)
+ await self._provider._snapserver.stream_remove_stream(snap_stream.identifier)
+
+ if self._socket_server:
+ await self._stop_socket_server()
+ self._snap_on_stream_update()
+
+ return
+
+ def _snap_get_stream_path(self) -> str | None:
+ """Return the Snapcast TCP URI to stream to."""
+ if self.snap_stream is None:
+ return None
+
+ uri = self.snap_stream._stream.get("uri", {})
+ uri_host = uri.get("host", "")
+ stream_path = self.snap_stream.path or f"tcp://{uri_host}"
+ return stream_path.replace("0.0.0.0", self._provider._snapcast_server_host)
+
+ def _snap_on_stream_update(self, stream: SnapstreamProto | None = None) -> None:
+ """Handle Snapcast stream updates and trigger group member refresh."""
+ if self.snap_stream is None:
+ return
+
+ for snap_group in self._provider._snapserver.groups:
+ if snap_group.stream != self.snap_stream.identifier:
+ continue
+ self._provider.poke_group_members(snap_group)
+
+ async def _start_socket_server(self) -> str:
+ """Get or create a socket server for the given queue.
+
+ :return: The path to the Unix socket.
+ """
+ if self._socket_server:
+ return self._socket_server.socket_path
+
+ if self._cntrl_queue_id is None:
+ raise RuntimeError("Socket server require _cntrl_queue_id to be set")
+
+ socket_path = CONTROL_SOCKET_PATH_TEMPLATE.format(queue_id=self._cntrl_queue_id)
+ socket_server = SnapcastSocketServer(
+ mass=self._mass,
+ queue_id=self._cntrl_queue_id,
+ socket_path=socket_path,
+ streamserver_ip=str(self._mass.streams.publish_ip),
+ streamserver_port=cast("int", self._mass.streams.publish_port),
+ )
+ await socket_server.start()
+ self._socket_server = socket_server
+ self._socket_path = socket_path
+ self._logger.debug(
+ "Created socket server for queue %s at %s", self._cntrl_queue_id, socket_path
+ )
+ return socket_path
+
+ async def _stop_socket_server(self) -> None:
+ """Stop and remove the socket server for the given queue."""
+ if not self._socket_server:
+ return
+
+ await self._socket_server.stop()
+ self._socket_server = None
+ self._logger.debug("Stopped socket server for queue %s", self._cntrl_queue_id)
"""Snapcast Player."""
+from __future__ import annotations
+
import asyncio
-import random
-import time
-import urllib.parse
from contextlib import suppress
-from typing import TYPE_CHECKING, cast
+from typing import TYPE_CHECKING, TypedDict, cast
-from music_assistant_models.config_entries import ConfigEntry, ConfigValueType
-from music_assistant_models.enums import PlaybackState, PlayerFeature
+from music_assistant_models.enums import MediaType, PlaybackState, PlayerFeature
from music_assistant_models.player import DeviceInfo, PlayerMedia
-from snapcast.control.client import Snapclient
-from snapcast.control.group import Snapgroup
-from snapcast.control.stream import Snapstream
-
-from music_assistant.constants import ATTR_ANNOUNCEMENT_IN_PROGRESS
-from music_assistant.helpers.audio import get_player_filter_params
-from music_assistant.helpers.compare import create_safe_string
-from music_assistant.helpers.ffmpeg import FFMpeg
-from music_assistant.models.player import Player
-from music_assistant.providers.snapcast.constants import (
- CONF_ENTRY_SAMPLE_RATES_SNAPCAST,
- DEFAULT_SNAPCAST_FORMAT,
- MASS_ANNOUNCEMENT_POSTFIX,
- MASS_STREAM_PREFIX,
- SnapCastStreamType,
+from propcache import under_cached_property as cached_property
+
+from music_assistant.constants import (
+ ATTR_ANNOUNCEMENT_IN_PROGRESS,
+ CONF_ENTRY_HTTP_PROFILE_HIDDEN,
+ SYNCGROUP_PREFIX,
)
+from music_assistant.models.player import Player
+from music_assistant.providers.snapcast.constants import CONF_ENTRY_SAMPLE_RATES_SNAPCAST
+from music_assistant.providers.snapcast.ma_stream import SnapcastMAStream
if TYPE_CHECKING:
+ from music_assistant_models.config_entries import ConfigEntry, ConfigValueType
+
from music_assistant.providers.snapcast.provider import SnapCastProvider
+ from music_assistant.providers.snapcast.snap_cntrl_proto import (
+ SnapclientProto,
+ SnapstreamProto,
+ )
+
+
+class TrackedPlayerState(TypedDict, total=False):
+ """Tracked state for the Snapcast MA player.
+
+ It is used for change detection and state synchronization, and may be
+ partially populated depending on which information is
+ currently available.
+
+ Keys prefixed with ``_attr_`` are exposed as player attributes, while the
+ remaining keys represent internal Snapcast grouping and connection state.
+ """
+
+ # Player attribute fields
+ _attr_name: str
+ _attr_volume_level: float
+ _attr_volume_muted: bool
+ _attr_available: bool
+
+ # snapclient fields
+ connected: bool
+ stream_id: str
+ stream_status: str | None
+ grp_name: str
+ grp_member_ids: list[str]
+ grp_member_avail: list[bool]
class SnapCastPlayer(Player):
def __init__(
self,
- provider: "SnapCastProvider",
+ provider: SnapCastProvider,
player_id: str,
- snap_client: Snapclient,
- snap_client_id: str,
+ snap_client: SnapclientProto,
) -> None:
"""Init."""
- self.provider: SnapCastProvider # type: ignore[misc]
self.snap_client = snap_client
- self.snap_client_id = snap_client_id
super().__init__(provider, player_id)
- self._stream_task: asyncio.Task[None] | None = None
+
+ self._snap_ma_stream: SnapcastMAStream | None = None
+
+ self._update_worker: asyncio.Task[None] | None = None
+ self._poke_evt = asyncio.Event()
+ self._state_update_lock = asyncio.Lock()
+ self._last_tracked_state: TrackedPlayerState | None = None
+
+ @property
+ def snap_provider(self) -> SnapCastProvider:
+ """Return the Snapcast provider instance."""
+ return cast("SnapCastProvider", self.provider)
@property
def requires_flow_mode(self) -> bool:
"""Return if the player requires flow mode."""
return True
- @property
+ @cached_property
def synced_to(self) -> str | None:
- """
- Return the id of the player this player is synced to (sync leader).
+ """Return the id of the player this player is synced to (sync leader)."""
+ grp_name = self.snap_group_name
+ if grp_name == self.player_id:
+ # is group leader
+ return None
- If this player is not synced to another player (or is the sync leader itself),
- this should return None.
- If it is part of a (permanent) group, this should also return None.
- """
- snap_group = self._get_snapgroup()
- assert snap_group is not None # for type checking
- master_id: str = self.provider._get_ma_id(snap_group.clients[0])
- if len(snap_group.clients) < 2 or self.player_id == master_id:
+ grp_player_ids = self._get_player_ids_of_curr_group()
+ if len(grp_player_ids) < 2 or grp_name not in grp_player_ids:
return None
- return master_id
+
+ if leader_player := self.mass.players.get(grp_name):
+ return grp_name if leader_player.available else None
+
+ return None
+
+ @cached_property
+ def group_members(self) -> list[str]:
+ """Return the group members of the player."""
+ if not self._attr_available:
+ return []
+
+ grp_name = self.snap_group_name
+ if grp_name != self.player_id:
+ # only group leaders can have members
+ return []
+
+ player_ids = self._get_player_ids_of_curr_group()
+ if self.player_id not in player_ids:
+ # should not happen, unless the current
+ # state repr is invalid
+ return []
+
+ player_ids.remove(self.player_id)
+ connected = [
+ player_id
+ for player_id in player_ids
+ if (client := self.snap_provider.get_snap_client(player_id=player_id))
+ and client.connected
+ ]
+ if connected:
+ return [self.player_id, *connected]
+
+ return []
+
+ @property
+ def playback_state(self) -> PlaybackState:
+ """Return the current playback state of the player."""
+ snap_stream = self._get_active_snapstream()
+ if snap_stream is None:
+ return PlaybackState.IDLE
+
+ if snap_stream.identifier == "default" or snap_stream.status == "idle":
+ return PlaybackState.IDLE
+
+ return PlaybackState.PLAYING
def setup(self) -> None:
"""Set up player."""
self._attr_name = self.snap_client.friendly_name
self._attr_available = self.snap_client.connected
+
+ host_dict = self.snap_client._client.get("host", {})
+ os, arch, ip = (host_dict.get(key, "") for key in ["os", "arch", "ip"])
self._attr_device_info = DeviceInfo(
- model=self.snap_client._client.get("host").get("os"),
- manufacturer=self.snap_client._client.get("host").get("arch"),
+ model=os,
+ manufacturer=arch,
)
- self._attr_device_info.ip_address = self.snap_client._client.get("host").get("ip")
+ self._attr_device_info.ip_address = ip
self._attr_supported_features = {
PlayerFeature.SET_MEMBERS,
PlayerFeature.VOLUME_SET,
PlayerFeature.VOLUME_MUTE,
PlayerFeature.PLAY_ANNOUNCEMENT,
}
- self._attr_can_group_with = {self.provider.instance_id}
+ self._attr_can_group_with = {self.snap_provider.instance_id}
+ if not self._update_worker:
+ self._update_worker = self.mass.create_task(self._player_update_worker)
async def volume_set(self, volume_level: int) -> None:
"""Send VOLUME_SET command to given player."""
+ # Use optimistic server state for now
+ # not guaranteed that the client respects it
await self.snap_client.set_volume(volume_level)
async def stop(self) -> None:
"""Send STOP command to given player."""
- # update the state first to avoid race conditions, if an active play_announcement
- # finishes the player.state should be IDLE.
- self._attr_playback_state = PlaybackState.IDLE
- self._attr_current_media = None
- self._set_childs_state()
-
- self.update_state()
-
- # we change the active stream only if music was playing
- if not self.extra_data.get(ATTR_ANNOUNCEMENT_IN_PROGRESS):
- snapgroup = self._get_snapgroup()
- assert snapgroup is not None # for type checking
- await snapgroup.set_stream("default")
-
- # but we always delete the music stream (whether it was active or not)
- await self._delete_stream(self._get_stream_name(SnapCastStreamType.MUSIC))
+ if ma_stream := self.active_snap_ma_stream:
+ ma_stream.request_stop_stream()
+ return
- if self._stream_task is not None:
- if not self._stream_task.done():
- self._stream_task.cancel()
- with suppress(asyncio.CancelledError):
- await self._stream_task
- self._stream_task = None
+ self.poke_player_update()
async def volume_mute(self, muted: bool) -> None:
"""Send MUTE command to given player."""
- # Using optimistic value because the library does not return the response from the api
- await self.snap_client.set_muted(muted)
- self._attr_volume_muted = muted
- self.update_state()
+ # Use optimistic server state for now
+ # not guaranteed that the client respects it
+ # TODO: move this to the snapcast python library
+ vol = self.snap_client._client["config"]["volume"]
+ vol["muted"] = muted
+ res = await self.snap_provider._snapserver.client_volume(self.snap_client.identifier, vol)
+ if res and "muted" in res:
+ self.snap_client._client["config"]["volume"] = res
+ self.snap_client.callback()
async def set_members(
self,
player_ids_to_remove: list[str] | None = None,
) -> None:
"""Handle SET_MEMBERS command on the player."""
- group = self._get_snapgroup()
- assert group is not None # for type checking
- # handle client additions
- for player_id in player_ids_to_add or []:
- snapcast_id = self.provider._get_snapclient_id(player_id)
- if snapcast_id not in group.clients:
- await group.add_client(snapcast_id)
- if player_id not in self._attr_group_members:
- self._attr_group_members.append(player_id)
- # handle client removals
- for player_id in player_ids_to_remove or []:
- snapcast_id = self.provider._get_snapclient_id(player_id)
- if snapcast_id in group.clients:
- await group.remove_client(snapcast_id)
- if player_id in self._attr_group_members:
- self._attr_group_members.remove(player_id)
- # Set default stream and stop ungrouped players
- removed_snapclient = self.provider._snapserver.client(snapcast_id)
- await removed_snapclient.group.set_stream("default")
- if removed_player := self.mass.players.get(player_id):
- await removed_player.stop()
- self.update_state()
+ # get the group owned by this player (identified by the group name)
+ player_group = await self.snap_provider.ensure_player_owned_group(self.player_id)
+
+ if player_group is None:
+ return
+
+ player_group.set_callback(None)
+
+ curr_ma_player_ids = [
+ ma_id
+ for cli_id in player_group.clients
+ if (ma_id := self.snap_provider._get_ma_id(cli_id))
+ ]
+
+ curr_stream_id = player_group.stream
+ sync_group_player = None
+ if curr_ma_stream := self.snap_provider.get_snap_ma_stream(curr_stream_id):
+ media = curr_ma_stream.media
+ if media.media_type == MediaType.PLUGIN_SOURCE:
+ custom_data = media.custom_data or {}
+ assigned_player = custom_data.get("player_id", "")
+ if assigned_player.startswith(SYNCGROUP_PREFIX):
+ sync_group_player = self.mass.players.get(assigned_player)
+ else:
+ media_src_id = media.source_id or ""
+ if media_src_id.startswith(SYNCGROUP_PREFIX):
+ sync_group_player = self.mass.players.get(media_src_id)
+
+ if sync_group_player and self.player_id in (player_ids_to_remove or []):
+ # players in sync_group_player.group_members will be rejoined
+ # remove others first
+ for id_to_remove in player_ids_to_remove or []:
+ if id_to_remove == self.player_id:
+ continue
+ if (
+ id_to_remove in curr_ma_player_ids
+ and id_to_remove not in sync_group_player.group_members
+ ):
+ await self.snap_provider.isolate_player_to_dedicated_group(
+ id_to_remove, target_stream_id="default"
+ )
+
+ # split remaining group into individual groups,
+ # keeps the current stream, set this group to default stream
+ await self.snap_provider.isolate_player_to_dedicated_group(
+ target_player_id=self.player_id,
+ target_stream_id="default",
+ others_stream_id=curr_stream_id,
+ )
+ else:
+ for player_id in player_ids_to_remove or []:
+ if player_id not in curr_ma_player_ids:
+ continue
+ await self.snap_provider.isolate_player_to_dedicated_group(
+ player_id, target_stream_id="default"
+ )
+ curr_ma_player_ids.remove(player_id)
+
+ for ma_id in player_ids_to_add or []:
+ if (
+ snap_id := self.snap_provider._get_snapclient_id(ma_id)
+ ) and ma_id not in curr_ma_player_ids:
+ await player_group.add_client(snap_id)
+
+ # some caller require instant state updates before returning
+ async with self._state_update_lock:
+ if await self._process_snapcast_client_state():
+ self.update_state()
+
+ self.snap_provider._update_group_callbacks(poke=True)
async def play_media(self, media: PlayerMedia) -> None:
"""Handle PLAY MEDIA on given player."""
msg = "A synced player cannot receive play commands directly"
raise RuntimeError(msg)
- # stop any existing streamtasks first
- if self._stream_task is not None:
- if not self._stream_task.done():
- self._stream_task.cancel()
- with suppress(asyncio.CancelledError):
- await self._stream_task
- self._stream_task = None
+ ma_stream = await self.snap_provider.get_snapcast_media_stream(
+ media, filter_settings_owner=self.player_id
+ )
+
+ if ma_stream is None or ma_stream.stream_id is None:
+ return
+
+ self._snap_ma_stream = ma_stream
- # get stream or create new one
- stream_name = self._get_stream_name(SnapCastStreamType.MUSIC)
- stream = await self._get_or_create_stream(stream_name, media.source_id or self.player_id)
+ # e.g. DSP settings require a restart
+ await self._snap_ma_stream.start_stream(allow_restart=True)
# if no announcement is playing we activate the stream now, otherwise it
# will be activated by play_announcement when the announcement is over.
if not self.extra_data.get(ATTR_ANNOUNCEMENT_IN_PROGRESS):
- snap_group = self._get_snapgroup()
- assert snap_group is not None # for type checking
- await snap_group.set_stream(stream.identifier)
-
- self._attr_current_media = media
-
- # select audio source
- audio_source = self.mass.streams.get_stream(media, DEFAULT_SNAPCAST_FORMAT)
-
- async def _streamer() -> None:
- stream_path = self._get_stream_path(stream)
- self.logger.debug("Start streaming to %s", stream_path)
- async with FFMpeg(
- audio_input=audio_source,
- input_format=DEFAULT_SNAPCAST_FORMAT,
- output_format=DEFAULT_SNAPCAST_FORMAT,
- filter_params=get_player_filter_params(
- self.mass, self.player_id, DEFAULT_SNAPCAST_FORMAT, DEFAULT_SNAPCAST_FORMAT
- ),
- audio_output=stream_path,
- extra_input_args=["-y", "-re"],
- ) as ffmpeg_proc:
- self._attr_playback_state = PlaybackState.PLAYING
- self._attr_current_media = media
- self._attr_elapsed_time = 0
- self._attr_elapsed_time_last_updated = time.time()
- self.update_state()
-
- self._set_childs_state()
- await ffmpeg_proc.wait()
+ player_group = await self.snap_provider.ensure_player_owned_group(self.player_id)
+ assert player_group is not None # for type checking
+ await player_group.set_stream(ma_stream.stream_id)
- self.logger.debug("Finished streaming to %s", stream_path)
- # we need to wait a bit for the stream status to become idle
- # to ensure that all snapclients have consumed the audio
- while stream.status != "idle":
- await asyncio.sleep(0.25)
- self._attr_playback_state = PlaybackState.IDLE
- self._attr_elapsed_time = time.time() - self._attr_elapsed_time_last_updated
- self.update_state()
- self._set_childs_state()
-
- # start streaming the queue (pcm) audio in a background task
- self._stream_task = self.mass.create_task(_streamer())
+ self.poke_player_update()
async def play_announcement(
self, announcement: PlayerMedia, volume_level: int | None = None
) -> None:
"""Handle (provider native) playback of an announcement on given player."""
- # get stream or create new one
- stream_name = self._get_stream_name(SnapCastStreamType.ANNOUNCEMENT)
- stream = await self._get_or_create_stream(stream_name, None)
+ was_synced_to: str | None = self.synced_to
+ orig_volume_level: int | None = self.volume_level
+
+ prev_stream = self.active_snap_ma_stream
+
+ ma_stream = await self.snap_provider.get_snapcast_media_stream(
+ announcement, filter_settings_owner=self.player_id
+ )
+ player_group = await self.snap_provider.ensure_player_owned_group(self.player_id)
+
+ if ma_stream is None or ma_stream.stream_id is None or player_group is None:
+ return
- # always activate the stream (announcements have priority over music)
- snap_group = self._get_snapgroup()
- assert snap_group is not None # for type checking
- await snap_group.set_stream(stream.identifier)
+ await player_group.set_stream(ma_stream.stream_id)
- # Unfortunately snapcast sets a volume per client (not per stream), so we need a way to
- # set the announcement volume without affecting the music volume.
- # We go for the simplest solution: save the previous volume, change it, restore later
- # (with the downside that the change will be visible in the UI)
- orig_volume_level = self.volume_level # Note: might be None
+ if self.snap_provider._use_builtin_server:
+ await asyncio.sleep(self.snap_provider._snapcast_server_buffer_size / 1000.0)
if volume_level is not None:
await self.volume_set(volume_level)
- input_format = DEFAULT_SNAPCAST_FORMAT
- assert announcement.custom_data is not None # for type checking
- audio_source = self.mass.streams.get_announcement_stream(
- announcement.custom_data["announcement_url"],
- output_format=DEFAULT_SNAPCAST_FORMAT,
- pre_announce=announcement.custom_data["pre_announce"],
- pre_announce_url=announcement.custom_data["pre_announce_url"],
- )
+ await ma_stream.start_stream()
+ await ma_stream.wait_for_stopped()
- # stream the audio, wait for it to finish (play_announcement should return after the
- # announcement is over to avoid simultaneous announcements).
- stream_path = self._get_stream_path(stream)
- self.logger.debug("Start announcement streaming to %s", stream_path)
- async with FFMpeg(
- audio_input=audio_source,
- input_format=input_format,
- output_format=DEFAULT_SNAPCAST_FORMAT,
- filter_params=get_player_filter_params(
- self.mass, self.player_id, input_format, DEFAULT_SNAPCAST_FORMAT
- ),
- audio_output=stream_path,
- extra_input_args=["-y", "-re"],
- ) as ffmpeg_proc:
- await ffmpeg_proc.wait()
-
- self.logger.debug("Finished announcement streaming to %s", stream_path)
- # we need to wait a bit for the stream status to become idle
- # to ensure that all snapclients have consumed the audio
- while stream.status != "idle":
- await asyncio.sleep(0.25)
-
- # delete the announcement stream
- await self._delete_stream(stream_name)
-
- # restore volume, if we changed it above and it's still the same we set
- # (the user did not change it himself while the announcement was playing)
if self.volume_level == volume_level and orig_volume_level is not None:
await self.volume_set(orig_volume_level)
- # and restore the group to either the default or the music stream
- if self.playback_state == PlaybackState.IDLE:
- new_stream_name = "default"
+ if was_synced_to:
+ if (
+ leader_group := await self.snap_provider.ensure_player_owned_group(was_synced_to)
+ ) is None:
+ return
+ await leader_group.add_client(self.snap_client.identifier)
else:
- new_stream_name = self._get_stream_name(SnapCastStreamType.MUSIC)
- group = self._get_snapgroup()
- assert group is not None # for type checking
- await group.set_stream(new_stream_name)
+ await player_group.set_stream(
+ prev_stream.stream_id
+ if prev_stream and prev_stream.stream_id is not None
+ else "default"
+ )
async def get_config_entries(
self,
"""Player config."""
return [
CONF_ENTRY_SAMPLE_RATES_SNAPCAST,
+ # we don't use the http server for streaming
+ CONF_ENTRY_HTTP_PROFILE_HIDDEN,
]
- def _handle_player_update(self, snap_client: Snapclient) -> None:
- """Process Snapcast update to Player controller.
-
- This is a callback function
+ def _handle_player_update(self, snap_client: SnapclientProto) -> None:
+ """Forward snap_client updates."""
+ self.poke_player_update()
+
+ def poke_player_update(self) -> None:
+ """Signal that a player state update should be processed."""
+ self._poke_evt.set()
+
+ async def _player_update_worker(self) -> None:
+ """Aggregate and process player state update requests."""
+ while True:
+ await self._poke_evt.wait()
+ self._poke_evt.clear()
+ while True:
+ call_update: bool = False
+ async with self._state_update_lock:
+ call_update = await self._process_snapcast_client_state()
+ if call_update:
+ self.update_state()
+ if self._poke_evt.is_set():
+ self._poke_evt.clear()
+ continue
+ break
+
+ async def _process_snapcast_client_state(self) -> bool:
+ """Process the latest Snapcast client state and apply changes to this player.
+
+ Returns:
+ True if changes were applied and a state update should be emitted via
+ ``update_state()``; False if no update is necessary (or if required data
+ is temporarily unavailable and the update should be retried later).
"""
- self._attr_name = self.snap_client.friendly_name
- self._attr_volume_level = self.snap_client.volume
- self._attr_volume_muted = self.snap_client.muted
- self._attr_available = self.snap_client.connected
+ snap_group = self.snap_client.group
+ if snap_group is None:
+ # some data syncing error, a client is always a group member
+ # retry again later, don't call update now
+ return False
+
+ stream_id = snap_group.stream
+ snap_stream: SnapstreamProto | None = None
+ with suppress(KeyError):
+ snap_stream = self.snap_provider._snapserver.stream(stream_id)
+
+ members = list(snap_group.clients) # snapshot
+
+ curr_state: TrackedPlayerState = {
+ "_attr_name": self.snap_client.friendly_name,
+ "_attr_volume_level": self.snap_client.volume,
+ "_attr_volume_muted": self.snap_client.muted,
+ "_attr_available": self.snap_client.connected,
+ "connected": self.snap_client.connected,
+ "stream_id": snap_group.stream,
+ "stream_status": snap_stream.status if snap_stream is not None else None,
+ "grp_name": snap_group.name,
+ "grp_member_ids": members,
+ "grp_member_avail": [
+ pl.available
+ for cl_id in members
+ if (pl_id := self.snap_provider._get_ma_id(cl_id))
+ and (pl := self.mass.players.get(pl_id))
+ ],
+ }
- # Note: when the active stream is a MASS stream the active_source is __not__ updated at all.
- # So it doesn't matter whether a MASS stream is for music or announcements.
- if stream := self._get_active_snapstream():
- if stream.identifier == "default":
- self._attr_active_source = None
- elif not stream.identifier.startswith(MASS_STREAM_PREFIX):
- # unknown source
- self._attr_active_source = stream.identifier
- else:
- self._attr_active_source = None
+ prev_state: TrackedPlayerState = (
+ self._last_tracked_state if self._last_tracked_state is not None else {}
+ )
+ self._last_tracked_state = curr_state
+
+ # change detection for simple attrs
+ changed_attrs = {
+ k: v for k, v in curr_state.items() if k.startswith("_attr_") and prev_state.get(k) != v
+ }
- self._group_childs()
+ prev_connected = prev_state.get("connected", False)
+ now_connected = curr_state.get("connected", False)
+ connection_changed = prev_connected != now_connected
- self.update_state()
+ prev_stream_id = prev_state.get("stream_id")
+ curr_stream_id = curr_state["stream_id"]
+ prev_stream_status = prev_state.get("stream_status")
+ curr_stream_status = curr_state.get("stream_status")
- def _get_stream_name(self, stream_type: SnapCastStreamType) -> str:
- """Return the name of the stream for the given player.
+ stream_changed = (
+ prev_stream_id != curr_stream_id or prev_stream_status != curr_stream_status
+ )
- Each player can have up to two concurrent streams, for music and announcements.
+ grouping_changed = any(
+ prev_state.get(k) != curr_state.get(k)
+ for k in ("grp_name", "grp_member_ids", "grp_member_avail")
+ )
- The stream name depends only on player_id (not queue_id) for two reasones:
- 1. Avoid issues when the same queue_id is simultaneously used by two players
- (eg in universal groups).
- 2. Easily identify which stream belongs to which player, for instance to be able to
- delete a music stream even when it is not active due to an announcement.
- """
- safe_name = create_safe_string(self.player_id, replace_space=True)
- stream_name = f"{MASS_STREAM_PREFIX}{safe_name}"
- if stream_type == SnapCastStreamType.ANNOUNCEMENT:
- stream_name += MASS_ANNOUNCEMENT_POSTFIX
- return stream_name
-
- async def _get_or_create_stream(self, stream_name: str, queue_id: str | None) -> Snapstream:
- """Create new stream on snapcast server (or return existing one)."""
- # prefer to reuse existing stream if possible
- if stream := self._get_snapstream(stream_name):
- return stream
- # The control script is used only for music streams in the builtin server
- # (queue_id is None only for announcement streams).
- extra_args = ""
- if (
- self.provider._use_builtin_server
- and queue_id
- and self.provider._controlscript_available
+ needs_processing = bool(
+ changed_attrs or grouping_changed or stream_changed or connection_changed
+ )
+ if not needs_processing:
+ return False
+
+ if connection_changed or grouping_changed:
+ self.snap_provider.poke_group_members(snap_group)
+
+ # help cleaning up unused streams
+ if curr_stream_id == "default" or (
+ (my_stream := self._snap_ma_stream)
+ and my_stream.stream_id in {prev_stream_id, curr_stream_id}
):
- # Create socket server for control script communication
- socket_path = await self.provider.get_or_create_socket_server(queue_id)
- extra_args = (
- f"&controlscript={urllib.parse.quote_plus('control.py')}"
- f"&controlscriptparams=--queueid={urllib.parse.quote_plus(queue_id)}%20"
- f"--socket={urllib.parse.quote_plus(socket_path)}%20"
- f"--streamserver-ip={self.mass.streams.publish_ip}%20"
- f"--streamserver-port={self.mass.streams.publish_port}"
- )
+ self.snap_provider.update_stream_usage()
- attempts = 50
- while attempts:
- attempts -= 1
- # pick a random port
- port = random.randint(4953, 4953 + 200)
- result = await self.provider._snapserver.stream_add_stream(
- # NOTE: setting the sampleformat to something else
- # (like 24 bits bit depth) does not seem to work at all!
- f"tcp://0.0.0.0:{port}?sampleformat=48000:16:2"
- f"&idle_threshold={self.provider._snapcast_stream_idle_threshold}"
- f"{extra_args}&name={stream_name}"
- )
- if "id" not in result:
- # if the port is already taken, the result will be an error
- self.logger.warning(result)
- continue
- return self.provider._snapserver.stream(result["id"])
- msg = "Unable to create stream - No free port found?"
- raise RuntimeError(msg)
-
- def _get_snapstream(self, stream_name: str) -> Snapstream | None:
- """Get a stream by name."""
- with suppress(KeyError):
- return self.provider._snapserver.stream(stream_name)
+ # apply changed attrs
+ for key, value in changed_attrs.items():
+ setattr(self, key, value)
+
+ # finally notify state update once
+ return True
+
+ @property
+ def active_snap_ma_stream(self) -> SnapcastMAStream | None:
+ """Return the MA stream source of the active group."""
+ grp = self.snap_client.group
+ if grp is None or grp.stream is None:
+ return None
+
+ if grp.stream == "default":
+ return None
+
+ return self.snap_provider.get_snap_ma_stream(grp.stream)
+
+ @property
+ def snap_group_name(self) -> str:
+ """Return the name of the active group."""
+ snap_group = self.snap_client.group
+ if snap_group is None:
+ return ""
+ return snap_group.name
+
+ @cached_property
+ def _current_media(self) -> PlayerMedia | None:
+ """
+ Return the current media being played by the player.
+
+ Note that this is NOT the final current media of the player,
+ as it may be overridden by a active group/sync membership.
+ Hence it's marked as a private property.
+ The final current media can be retrieved by using the 'current_media' property.
+ """
+ if snap_ma_stream := self.active_snap_ma_stream:
+ return snap_ma_stream.media
return None
- def _get_stream_path(self, stream: Snapstream) -> str:
- stream_path = stream.path or f"tcp://{stream._stream['uri']['host']}"
- return stream_path.replace("0.0.0.0", self.provider._snapcast_server_host)
-
- async def _delete_stream(self, stream_name: str) -> None:
- if stream := self._get_snapstream(stream_name):
- with suppress(TypeError, KeyError, AttributeError):
- await self.provider._snapserver.stream_remove_stream(stream.identifier)
-
- def _get_snapgroup(self) -> Snapgroup | None:
- """Get snapcast group for given player_id."""
- return cast("Snapgroup | None", self.snap_client.group)
-
- def _set_childs_state(self) -> None:
- """Set the state of the child`s of the player."""
- for child_player_id in self.group_members:
- if child_player_id == self.player_id:
- continue
- if mass_child_player := self.mass.players.get(child_player_id):
- mass_child_player._attr_playback_state = self.playback_state
- mass_child_player.update_state()
-
- def _get_active_snapstream(self) -> Snapstream | None:
+ @property
+ def _active_source(self) -> str | None:
+ """
+ Return the (id of) the active source of the player.
+
+ Only required if the player supports PlayerFeature.SELECT_SOURCE.
+
+ Set to None if the player is not currently playing a source or
+ the player_id if the player is currently playing a MA queue.
+
+ Note that this is NOT the final active source of the player,
+ as it may be overridden by a active group/sync membership.
+ Hence it's marked as a private property.
+ The final active source can be retrieved by using the 'active_source' property.
+ """
+ grp = self.snap_client.group
+ if grp is None or grp.stream is None:
+ return None
+
+ if grp.stream == "default":
+ return None
+
+ if ma_stream := self.snap_provider.get_snap_ma_stream(grp.stream):
+ return ma_stream.source_id
+
+ # external snapcast stream
+ return grp.stream or None
+
+ def _get_active_snapstream(self) -> SnapstreamProto | None:
"""Get active stream for given player_id."""
- if group := self._get_snapgroup():
- return self._get_snapstream(group.stream)
+ if group := self.snap_client.group:
+ with suppress(KeyError):
+ return self.snap_provider._snapserver.stream(group.stream)
return None
- def _group_childs(self) -> None:
- """Return player_ids of the players synced to this player."""
- snap_group = self._get_snapgroup()
- assert snap_group is not None # for type checking
- self._attr_group_members.clear()
- if self.synced_to is not None:
- return
- self._attr_group_members.append(self.player_id)
- for snap_client_id in snap_group.clients:
- if (
- self.provider._get_ma_id(snap_client_id) != self.player_id
- and self.provider._snapserver.client(snap_client_id).connected
- ):
- self._attr_group_members.append(self.provider._get_ma_id(snap_client_id))
- self.update_state()
+ def _get_player_ids_of_curr_group(self) -> list[str]:
+ snap_group = self.snap_client.group
+ if snap_group is None:
+ return []
+ return [
+ ma_id
+ for client_id in snap_group.clients
+ if (ma_id := self.snap_provider._get_ma_id(client_id))
+ ]
+
+ def _get_players_of_curr_group(self) -> list[Player]:
+ return [
+ ma_player
+ for ma_id in self._get_player_ids_of_curr_group()
+ if (ma_player := self.mass.players.get(ma_id))
+ ]
"""SnapCastProvider."""
+from __future__ import annotations
+
import asyncio
+import hashlib
import logging
import re
import shutil
import socket
+from contextlib import suppress
from pathlib import Path
-from typing import cast
+from typing import TYPE_CHECKING, cast
from bidict import bidict
-from music_assistant_models.enums import PlaybackState
+from music_assistant_models.enums import MediaType, PlaybackState
from music_assistant_models.errors import SetupFailedError
-from snapcast.control import create_server
-from snapcast.control.client import Snapclient
-from snapcast.control.group import Snapgroup
-from snapcast.control.server import Snapserver
+from snapcast.control.server import CONTROL_PORT, Snapserver
from zeroconf import NonUniqueNameException
from zeroconf.asyncio import AsyncServiceInfo
+from music_assistant.helpers.compare import create_safe_string
from music_assistant.helpers.process import AsyncProcess
from music_assistant.helpers.util import get_ip_pton
from music_assistant.models.player_provider import PlayerProvider
CONF_STREAM_IDLE_THRESHOLD,
CONF_USE_EXTERNAL_SERVER,
CONTROL_SCRIPT,
- CONTROL_SOCKET_PATH_TEMPLATE,
DEFAULT_SNAPSERVER_PORT,
+ MASS_ANNOUNCEMENT_POSTFIX,
+ MASS_STREAM_PREFIX,
SNAPWEB_DIR,
)
+from music_assistant.providers.snapcast.ma_stream import SnapcastMAStream
from music_assistant.providers.snapcast.player import SnapCastPlayer
-from music_assistant.providers.snapcast.socket_server import SnapcastSocketServer
+from music_assistant.providers.universal_group.constants import UGP_PREFIX
+
+if TYPE_CHECKING:
+ from music_assistant_models.player import PlayerMedia
+
+ from .snap_cntrl_proto import SnapclientProto, SnapgroupProto, SnapserverProto
+
+
+async def _create_cntrl_server(
+ loop: asyncio.AbstractEventLoop,
+ host: str,
+ port: int = CONTROL_PORT,
+ reconnect: bool = False,
+) -> SnapserverProto:
+ """Server factory."""
+ server = Snapserver(loop, host, port, reconnect)
+ await server.start()
+ return cast("SnapserverProto", server)
class SnapCastProvider(PlayerProvider):
"""SnapCastProvider."""
- _snapserver: Snapserver
+ _snapserver: SnapserverProto
_snapserver_runner: asyncio.Task[None] | None
_snapserver_started: asyncio.Event | None
_snapcast_server_host: str
_use_builtin_server: bool
_stop_called: bool
_controlscript_available: bool
- _socket_servers: dict[str, SnapcastSocketServer] # queue_id -> socket server
+ _snapcast_ma_streams: dict[str, SnapcastMAStream]
+ _snapcast_ma_streams_lock: asyncio.Lock
+
+ @property
+ def use_queue_control(self) -> bool:
+ """Return whether queue-based control scripts are available.
+
+ Indicates if the Snapcast control script has been successfully initialized
+ and can be used to control playback via a queue-specific control channel.
+ """
+ return self._controlscript_available
async def handle_async_init(self) -> None:
"""Handle async initialization of the provider."""
self._use_builtin_server = not self.config.get_value(CONF_USE_EXTERNAL_SERVER)
self._stop_called = False
self._controlscript_available = False
- self._socket_servers = {}
if self._use_builtin_server:
self._snapcast_server_host = "127.0.0.1"
self._snapcast_server_control_port = DEFAULT_SNAPSERVER_PORT
- self._snapcast_server_buffer_size = self.config.get_value(CONF_SERVER_BUFFER_SIZE)
+ self._snapcast_server_buffer_size = cast(
+ "int", self.config.get_value(CONF_SERVER_BUFFER_SIZE)
+ )
self._snapcast_server_chunk_ms = self.config.get_value(CONF_SERVER_CHUNK_MS)
self._snapcast_server_initial_volume = self.config.get_value(CONF_SERVER_INITIAL_VOLUME)
self._snapcast_server_send_to_muted = self.config.get_value(
self._snapcast_stream_idle_threshold = self.config.get_value(CONF_STREAM_IDLE_THRESHOLD)
self._ids_map = bidict({})
+ self._snapcast_ma_streams = {}
+ self._snapcast_ma_streams_lock = asyncio.Lock()
+
if self._use_builtin_server:
await self._start_builtin_server()
else:
self._snapserver_runner = None
self._snapserver_started = None
try:
- self._snapserver = await create_server(
+ self._snapserver = await _create_cntrl_server(
self.mass.loop,
self._snapcast_server_host,
port=self._snapcast_server_control_port,
if player.playback_state != PlaybackState.PLAYING:
continue
await player.stop()
+
+ for stream_name in list(self._snapcast_ma_streams):
+ await self.delete_ma_stream(stream_name)
+
self._snapserver.stop()
await self._stop_builtin_server()
# The snapserver doesn't always cleanup the control script processes
# properly. We do it explicitly when closing a socket server.
# Should be fixed on the server side, though.
- for socket_server in list(self._socket_servers.values()):
- await socket_server.stop()
- self._socket_servers.clear()
+ for stream_name in list(self._snapcast_ma_streams):
+ await self.delete_ma_stream(stream_name)
+ self._snapcast_ma_streams.clear()
raise
def _get_ma_id(self, snap_client_id: str) -> str:
return new_id
return self._get_ma_id(snap_client_id)
- def _handle_player_init(self, snap_client: Snapclient) -> SnapCastPlayer:
+ def _handle_player_init(self, snap_client: SnapclientProto) -> SnapCastPlayer:
"""Process Snapcast add to Player controller."""
player_id = self._generate_and_register_id(snap_client.identifier)
player = self.mass.players.get(player_id, raise_unavailable=False)
if not player:
- snap_client = cast(
- "Snapclient", self._snapserver.client(self._get_snapclient_id(player_id))
- )
+ snap_client = self._snapserver.client(self._get_snapclient_id(player_id))
player = SnapCastPlayer(
provider=self,
player_id=player_id,
snap_client=snap_client,
- snap_client_id=self._get_snapclient_id(player_id),
)
player.setup()
else:
if ma_player := self._handle_player_init(snap_client):
snap_client.set_callback(ma_player._handle_player_update)
for snap_client in self._snapserver.clients:
- if player := self.mass.players.get(self._get_ma_id(snap_client.identifier)):
- ma_player = cast("SnapCastPlayer", player)
- snap_client.set_callback(ma_player._handle_player_update)
- for snap_group in self._snapserver.groups:
- snap_group.set_callback(self._handle_group_update)
+ if player := self.get_snap_player(client_id=snap_client.identifier):
+ snap_client.set_callback(player._handle_player_update)
+ self._update_group_callbacks()
- def _handle_group_update(self, snap_group: Snapgroup) -> None:
+ def poke_group_members(self, snap_group: SnapgroupProto) -> None:
"""Process Snapcast group callback."""
- for snap_client in self._snapserver.clients:
- if ma_player := self.mass.players.get(self._get_ma_id(snap_client.identifier)):
- assert isinstance(ma_player, SnapCastPlayer) # for type checking
- ma_player._handle_player_update(snap_client)
+ for snap_client_id in snap_group.clients:
+ if ma_player := self.get_snap_player(client_id=snap_client_id):
+ ma_player.poke_player_update()
def _handle_disconnect(self, exc: Exception) -> None:
"""Handle disconnect callback from snapserver."""
if self._stop_called or self.mass.closing:
+ # prevent auto-reconnecting of snapcast controller
+ self._snapserver.stop()
# we're instructed to stop/exit, so no need to restart the connection
return
self.logger.info(
else:
self.logger.warning("Unable to remove snapclient %s: %s", player_id, error_msg)
- async def get_or_create_socket_server(self, queue_id: str) -> str:
- """Get or create a socket server for the given queue.
+ def _update_group_callbacks(self, poke: bool = False) -> None:
+ for grp in self._snapserver.groups:
+ grp.set_callback(self.poke_group_members)
+ if poke:
+ self.poke_group_members(grp)
+
+ async def ensure_player_owned_group(
+ self, ma_player_id: str, set_stream_id: str | None = None
+ ) -> SnapgroupProto | None:
+ """Ensure a Snapcast group is owned by the given player.
+
+ This method guarantees that the returned Snapcast group is *owned* by the
+ specified Music Assistant player, meaning the group name equals the
+ player's ID and the player is the group leader.
+
+ Behavior:
+ - If the player is already the leader of its current group, that group is
+ returned unchanged.
+ - If the player is a member of another group (but not the leader), the
+ player is removed from that group, which causes Snapcast to create a new
+ single-client group for the player.
+ - The resulting group is renamed to the player's ID.
- :param queue_id: The queue ID to create a socket server for.
- :return: The path to the Unix socket.
+ If `set_stream_id` is provided and a new group is created, the group's
+ stream is updated accordingly.
+
+ Args:
+ ma_player_id: Music Assistant player ID.
+ set_stream_id: Optional Snapcast stream ID to assign to the player's group.
+
+ Returns:
+ The Snapcast group owned by the player, or ``None`` if the player is not
+ currently part of any group.
"""
- if queue_id in self._socket_servers:
- return self._socket_servers[queue_id].socket_path
-
- socket_path = CONTROL_SOCKET_PATH_TEMPLATE.format(queue_id=queue_id)
- socket_server = SnapcastSocketServer(
- mass=self.mass,
- queue_id=queue_id,
- socket_path=socket_path,
- streamserver_ip=str(self.mass.streams.publish_ip),
- streamserver_port=cast("int", self.mass.streams.publish_port),
+ player_client = self.get_snap_client(player_id=ma_player_id)
+ if player_client is None:
+ return None
+
+ curr_group = player_client.group
+
+ if curr_group is None:
+ return None
+
+ if curr_group.name == ma_player_id:
+ return curr_group
+
+ group_members = list(curr_group.clients)
+ if len(group_members) > 1 and curr_group.name:
+ # player is member of other player group, remove it, which results in a new group
+ group_members.remove(player_client.identifier)
+ res = await self._snapserver.group_clients(curr_group.identifier, group_members)
+ if not (isinstance(res, dict) and "server" in res):
+ raise RuntimeError("Couldn't remove client from group")
+ self._snapserver.synchronize(res)
+ curr_group = player_client.group
+ if curr_group is None:
+ return None
+ if set_stream_id:
+ await curr_group.set_stream(set_stream_id)
+
+ await curr_group.set_name(ma_player_id)
+ return curr_group
+
+ async def isolate_player_to_dedicated_group(
+ self,
+ target_player_id: str,
+ target_stream_id: str | None = None,
+ others_stream_id: str | None = "default",
+ ) -> None:
+ """Isolate a player into a dedicated Snapcast group.
+
+ Ensures that the target player ends up in a group where it is the sole
+ member and group leader.
+
+ Behavior:
+ - The target player is first ensured to own its group.
+ - All other members of that group are removed.
+ - Each removed player is placed into its own dedicated group.
+ - Removed players' groups are optionally assigned `others_stream_id`.
+ - The target group is optionally assigned `target_stream_id`.
+
+ Callbacks for affected clients and groups are temporarily disabled during
+ the operation to avoid intermediate state updates.
+
+ Args:
+ target_player_id: Music Assistant player ID to isolate.
+ target_stream_id: Optional stream ID to assign to the target player's group.
+ others_stream_id: Stream ID assigned to newly created groups for removed players.
+ """
+ this_client_id = self._get_snapclient_id(target_player_id)
+ target_group = await self.ensure_player_owned_group(
+ target_player_id, set_stream_id=target_stream_id
)
- await socket_server.start()
- self._socket_servers[queue_id] = socket_server
- self.logger.debug("Created socket server for queue %s at %s", queue_id, socket_path)
- return socket_path
- async def stop_socket_server(self, queue_id: str) -> None:
- """Stop and remove the socket server for the given queue.
+ if target_group is None:
+ return
+
+ target_group.set_callback(None)
+ group_members = list(target_group.clients)
+ group_members.remove(this_client_id)
+ for client_id in group_members:
+ client = self._snapserver.client(client_id)
+ client.set_callback(None)
+ if group_members:
+ res = await self._snapserver.group_clients(target_group.identifier, [this_client_id])
+ if not (isinstance(res, dict) and "server" in res):
+ raise RuntimeError("Couldn't remove client from group")
+ self._snapserver.synchronize(res)
+ for client_id in group_members:
+ ma_player_id = self._get_ma_id(client_id)
+ if ma_player := cast("SnapCastPlayer", self.mass.players.get(ma_player_id)):
+ client = self._snapserver.client(client_id)
+ if client is not None:
+ if client.group is not None:
+ await client.group.set_name(ma_player_id)
+ if others_stream_id:
+ await client.group.set_stream(others_stream_id)
+ client.set_callback(ma_player._handle_player_update)
+
+ if target_stream_id is not None:
+ await target_group.set_stream(target_stream_id)
+
+ async def get_snapcast_media_stream(
+ self,
+ media: PlayerMedia,
+ filter_settings_owner: str | None = None,
+ existing_only: bool = False,
+ ) -> SnapcastMAStream | None:
+ """Get or create a Snapcast Music Assistant stream for the given media.
- :param queue_id: The queue ID to stop the socket server for.
+ Determines a deterministic Snapcast stream name based on the media type
+ and source, and either returns an existing stream or creates a new one.
+
+ Behavior:
+ - Announcement and generic media streams use a hashed name.
+ - Plugin and queue-backed sources reuse a stable stream name.
+ - Queue-backed streams may persist across playback sessions.
+ - If `existing_only` is True, no new stream will be created.
+
+ Newly created streams are registered with the Snapcast server and fully
+ set up before being returned.
+
+ Args:
+ media: Media item to stream.
+ filter_settings_owner: Optional player/entity ID used to resolve DSP filters.
+ existing_only: If True, only return an existing stream.
+
+ Returns:
+ A ``SnapcastMAStream`` instance, or ``None`` if no stream exists and
+ `existing_only` is True.
+ """
+ stream_name: str = ""
+ name_suffix: str = ""
+ queue_id: str | None = None
+ source_id: str | None = None
+ destroy_on_stop = True
+
+ if media.media_type == MediaType.ANNOUNCEMENT:
+ stream_name += hashlib.md5(media.uri.encode()).hexdigest()[:6]
+ name_suffix = MASS_ANNOUNCEMENT_POSTFIX
+ elif media.media_type == MediaType.PLUGIN_SOURCE:
+ custom_data = media.custom_data or {}
+ plugin: str = media.title or custom_data.get("provider") or ""
+ player: str = f" {custom_data.get('player_id', '')}"
+ stream_name += f"{plugin} {player}"
+ source_id = custom_data.get("source_id")
+ elif media.source_id and media.source_id.startswith(UGP_PREFIX):
+ stream_name += media.source_id
+ elif media.source_id and media.queue_item_id:
+ stream_name += media.source_id
+ queue_id = media.source_id
+ source_id = media.source_id
+ destroy_on_stop = False
+ else:
+ stream_name += hashlib.md5(media.uri.encode()).hexdigest()[:6]
+
+ stream_name = create_safe_string(stream_name, lowercase=False)
+ stream_name = f"{MASS_STREAM_PREFIX}{stream_name}{name_suffix}"
+ async with self._snapcast_ma_streams_lock:
+ if not (stream := self._snapcast_ma_streams.get(stream_name)):
+ if existing_only:
+ return None
+
+ stream = SnapcastMAStream(
+ provider=self,
+ media=media,
+ stream_name=stream_name,
+ filter_settings_owner=filter_settings_owner,
+ source_id=source_id,
+ use_cntrl_script=bool(queue_id) and self.use_queue_control,
+ destroy_on_stop=destroy_on_stop,
+ )
+ self._snapcast_ma_streams[stream_name] = stream
+ else:
+ stream.update_media(media)
+ await stream.setup()
+ return stream
+
+ def get_snap_ma_stream(self, stream_name: str) -> SnapcastMAStream | None:
+ """Return an existing Music Assistant Snapcast stream by name.
+
+ Args:
+ stream_name: Snapcast stream name.
+
+ Returns:
+ The corresponding ``SnapcastMAStream`` instance, or ``None`` if not found.
+ """
+ return self._snapcast_ma_streams.get(stream_name)
+
+ async def delete_ma_stream(self, stream_name: str) -> None:
+ """Remove and destroy a Music Assistant Snapcast stream.
+
+ The stream is removed from internal tracking and its resources are
+ destroyed asynchronously. Errors during destruction are logged but
+ otherwise ignored.
+
+ Args:
+ stream_name: Snapcast stream name to delete.
"""
- if queue_id in self._socket_servers:
- await self._socket_servers[queue_id].stop()
- del self._socket_servers[queue_id]
- self.logger.debug("Stopped socket server for queue %s", queue_id)
+ async with self._snapcast_ma_streams_lock:
+ stream = self._snapcast_ma_streams.pop(stream_name, None)
+
+ if not stream:
+ return
+
+ try:
+ await stream.destroy()
+ except Exception:
+ self.logger.exception("Failed to destroy stream session %s", stream_name)
+
+ def update_stream_usage(self) -> None:
+ """Update usage state for all tracked Snapcast streams.
+
+ Marks streams as "in use" if they are currently assigned to any Snapcast
+ group, and schedules unused streams for delayed shutdown.
+
+ This method should be called whenever group or stream assignments change
+ on the Snapcast server.
+ """
+ unused_streams = set(self._snapcast_ma_streams.keys())
+ for grp in self._snapserver.groups:
+ stream_id = grp.stream
+ if stream_id in self._snapcast_ma_streams:
+ ma_stream = self._snapcast_ma_streams[stream_id]
+ ma_stream.set_in_use(True)
+ unused_streams.discard(stream_id)
+
+ if not unused_streams:
+ break
+
+ for stream_id in unused_streams:
+ self._snapcast_ma_streams[stream_id].set_in_use(False)
+
+ def get_snap_client(
+ self, *, client_id: str | None = None, player_id: str | None = None
+ ) -> SnapclientProto | None:
+ """Return the snapclient for either given client_id or player_id."""
+ if player_id is not None:
+ if client_id is not None and client_id != self._get_snapclient_id(client_id):
+ raise ValueError("provided client_id and player_id do not match")
+ client_id = self._get_snapclient_id(player_id)
+
+ if client_id:
+ with suppress(KeyError):
+ return self._snapserver.client(client_id)
+
+ return None
+
+ def get_snap_player(
+ self, *, client_id: str | None = None, player_id: str | None = None
+ ) -> SnapCastPlayer | None:
+ """Return the MA SnapCastPlayer for either given client_id or player_id."""
+ if client_id is not None:
+ if player_id is not None and player_id != self._get_ma_id(client_id):
+ raise ValueError("provided client_id and player_id do not match")
+ player_id = self._get_ma_id(client_id)
+
+ if player_id is None:
+ return None
+
+ if ma_player := self.mass.players.get(player_id):
+ assert isinstance(ma_player, SnapCastPlayer) # for type checking
+ return ma_player
+
+ return None
--- /dev/null
+# ruff: noqa: D100, D101, D102
+
+from __future__ import annotations
+
+from collections.abc import Callable
+from typing import Any, Protocol
+
+type RetVal = dict[str, Any] | None
+type RetTuple = tuple[RetVal, RetVal]
+
+SnapclientCallback = Callable[["SnapclientProto"], Any]
+SnapgroupCallback = Callable[["SnapgroupProto"], Any]
+SnapstreamCallback = Callable[["SnapstreamProto"], Any]
+
+
+class SnapgroupProto(Protocol):
+ @property
+ def identifier(self) -> str: ...
+ @property
+ def name(self) -> str: ...
+ @property
+ def clients(self) -> list[str]: ...
+ @property
+ def stream(self) -> str: ...
+ def callback(self) -> None: ...
+ def set_callback(self, func: SnapgroupCallback | None) -> None: ...
+ async def set_name(self, name: str) -> RetVal: ...
+ async def set_stream(self, stream_id: str) -> RetVal: ...
+ async def add_client(self, client_identifier: str) -> None: ...
+
+
+class SnapclientProto(Protocol):
+ # read-only properties used by your code
+ @property
+ def identifier(self) -> str: ...
+
+ @property
+ def group(self) -> SnapgroupProto | None: ...
+
+ @property
+ def friendly_name(self) -> str: ...
+
+ @property
+ def connected(self) -> bool: ...
+
+ @property
+ def name(self) -> str: ...
+
+ @property
+ def latency(self) -> int: ...
+
+ @property
+ def muted(self) -> bool: ...
+
+ @property
+ def volume(self) -> int: ...
+
+ _client: dict[str, Any]
+
+ def groups_available(self) -> list[SnapgroupProto]: ...
+
+ async def set_muted(self, status: bool) -> None: ...
+ async def set_volume(self, percent: int, update_group: bool = True) -> None: ...
+
+ def callback(self) -> None: ...
+ def set_callback(self, func: SnapclientCallback | None) -> None: ...
+
+
+class SnapstreamProto(Protocol):
+ _stream: dict[str, Any]
+
+ @property
+ def identifier(self) -> str: ...
+ @property
+ def friendly_name(self) -> str: ...
+ @property
+ def status(self) -> str: ...
+ @property
+ def path(self) -> str | None: ...
+ def callback(self) -> None: ...
+ def set_callback(self, func: SnapstreamCallback | None) -> None: ...
+
+
+class SnapserverProto(Protocol):
+ # lifecycle
+ async def start(self) -> Any: ...
+ def stop(self) -> None: ...
+
+ # collections (as in the external lib: list(...) views)
+ @property
+ def groups(self) -> list[SnapgroupProto]: ...
+ @property
+ def clients(self) -> list[SnapclientProto]: ...
+ @property
+ def streams(self) -> list[SnapstreamProto]: ...
+
+ # accessors
+ def group(self, group_identifier: str) -> SnapgroupProto: ...
+ def client(self, client_identifier: str) -> SnapclientProto: ...
+ def stream(self, stream_identifier: str) -> SnapstreamProto: ...
+
+ async def delete_client(self, identifier: str) -> RetTuple: ...
+
+ async def status(self) -> tuple[Any, Any]: ...
+
+ async def group_clients(self, identifier: str, clients: list[str]) -> RetVal: ...
+ async def group_name(self, identifier: str, name: str) -> RetVal: ...
+ async def group_stream(self, identifier: str, stream_id: str) -> RetVal: ...
+
+ async def client_name(self, identifier: str, name: str) -> RetVal: ...
+ async def client_latency(self, identifier: str, latency: int) -> RetVal: ...
+ async def client_volume(self, identifier: str, volume: dict[str, Any]) -> RetVal: ...
+
+ async def stream_add_stream(self, stream_uri: str) -> RetVal: ...
+ async def stream_remove_stream(self, identifier: str) -> RetVal: ...
+
+ # state sync
+ def synchronize(self, status: dict[str, Any]) -> None: ...
+
+ # callbacks (if you use them)
+ def set_on_update_callback(self, func: Callable[[], Any] | None) -> None: ...
+ def set_on_connect_callback(self, func: Callable[[], Any] | None) -> None: ...
+ def set_on_disconnect_callback(self, func: Callable[[Exception], Any] | None) -> None: ...
+ def set_new_client_callback(self, func: Callable[[SnapclientProto], Any] | None) -> None: ...