Snapcast: Fix player availability issues and align state with server truth (#3104)
authorMischa Siekmann <45062894+gnumpi@users.noreply.github.com>
Sun, 8 Feb 2026 19:18:50 +0000 (20:18 +0100)
committerGitHub <noreply@github.com>
Sun, 8 Feb 2026 19:18:50 +0000 (20:18 +0100)
music_assistant/controllers/players/sync_groups.py
music_assistant/providers/snapcast/constants.py
music_assistant/providers/snapcast/ma_stream.py [new file with mode: 0644]
music_assistant/providers/snapcast/player.py
music_assistant/providers/snapcast/provider.py
music_assistant/providers/snapcast/snap_cntrl_proto.py [new file with mode: 0644]

index 9da87d0e7e3cd6323edd5aab0b34f46defb74592..11b4d5d121faa4f2a04cb5850b420313dec06465 100644 (file)
@@ -53,6 +53,7 @@ SUPPORT_DYNAMIC_LEADER = {
     # and the music keeps playing uninterrupted.
     "airplay",
     "squeezelite",
+    "snapcast",
     # TODO: Get this working with Sonos as well (need to handle range requests)
 }
 
index 7e5f5945d1334bb397293fc0e8ad20f9536f01cf..4764dcadb03e36632a32ccf7fd52e11cc9d1a60f 100644 (file)
@@ -1,7 +1,6 @@
 """Constants for snapcast provider."""
 
 import pathlib
-from enum import StrEnum
 
 from music_assistant_models.enums import ContentType
 from music_assistant_models.media_items.audio_format import AudioFormat
@@ -60,10 +59,3 @@ DEFAULT_SNAPCAST_PCM_FORMAT = AudioFormat(
     bit_depth=16,
     channels=2,
 )
-
-
-class SnapCastStreamType(StrEnum):
-    """Enum for Snapcast Stream Type."""
-
-    MUSIC = "MUSIC"
-    ANNOUNCEMENT = "ANNOUNCEMENT"
diff --git a/music_assistant/providers/snapcast/ma_stream.py b/music_assistant/providers/snapcast/ma_stream.py
new file mode 100644 (file)
index 0000000..fe90c70
--- /dev/null
@@ -0,0 +1,518 @@
+"""Music Assistant Snapcast source stream.
+
+This module implements a Music Assistant-managed Snapcast stream that is exposed to the
+Snapcast server as a TCP source. The stream is produced by running an FFmpeg pipeline
+which pulls audio from Music Assistant and pushes it to the Snapcast source URI.
+
+Optionally, a Unix socket server can be started to provide a control channel for a
+Snapcast control script (used by the built-in Snapcast server integration).
+"""
+
+from __future__ import annotations
+
+import asyncio
+import random
+import urllib.parse
+from contextlib import suppress
+from typing import TYPE_CHECKING, cast
+
+from music_assistant.helpers.audio import get_player_filter_params
+from music_assistant.helpers.ffmpeg import FFMpeg
+from music_assistant.providers.snapcast.socket_server import SnapcastSocketServer
+
+from .constants import (
+    CONTROL_SOCKET_PATH_TEMPLATE,
+    DEFAULT_SNAPCAST_FORMAT,
+)
+
+if TYPE_CHECKING:
+    from music_assistant_models.player import PlayerMedia
+
+    from .provider import SnapCastProvider
+    from .snap_cntrl_proto import SnapstreamProto
+
+
+class SnapcastMAStream:
+    """A Music Assistant-managed Snapcast stream.
+
+    The stream lifecycle is:
+    - setup: ensure required server resources exist (Snapcast source, optional socket server)
+    - start_stream: start the FFmpeg streaming task
+    - request_stop_stream / wait_for_stopped: stop streaming and await termination
+    - destroy: stop streaming, remove Snapcast source, and stop ancillary services
+
+    If `cntrl_queue_id` is provided, a Unix socket server is started to allow a Snapcast
+    control script to communicate with Music Assistant.
+    """
+
+    def __init__(
+        self,
+        provider: SnapCastProvider,
+        media: PlayerMedia,
+        stream_name: str,
+        source_id: str | None = None,
+        filter_settings_owner: str | None = None,
+        use_cntrl_script: bool = False,
+        destroy_on_stop: bool = False,
+    ) -> None:
+        """Initialize the stream.
+
+        Args:
+            provider: The Snapcast provider instance.
+            media: The media item to stream.
+            stream_name: Name used to register the stream on the Snapcast server.
+            cntrl_queue_id: If set, enables the control socket server used by the control script.
+            filter_settings_owner: Player/entity id used to fetch DSP/filter parameters.
+            destroy_on_stop: If true, delete this MA stream once streaming stops.
+        """
+        self.media = media
+        self.stream_name = stream_name
+        self.snap_stream: SnapstreamProto | None = None
+
+        self._provider = provider
+        self._logger = provider.logger
+        self._mass = provider.mass
+        self._source_id = source_id
+        self._use_cntrl_script = use_cntrl_script
+        self._cntrl_queue_id = source_id if use_cntrl_script else None
+        self._filter_settings_owner = filter_settings_owner
+        self._destroy_on_stop = destroy_on_stop
+
+        self._lifecycle_lock = asyncio.Lock()
+        self._destroyed = False
+        self._setup_done = False
+        self._is_streaming = False
+        self._restart_requested: bool = False
+        self._stop_requested: bool = False
+
+        self._socket_server: SnapcastSocketServer | None = None
+        self._socket_path: str | None = None
+        self._streamer_task: asyncio.Task[None] | None = None
+        self._stop_streamer_evt = asyncio.Event()
+        self._streamer_started_evt = asyncio.Event()
+        self._stop_timer: asyncio.Handle | None = None
+        self._stop_timer_started_at: float | None = None
+        self._filter_settings: list[str] | None = None
+
+    @property
+    def source_id(self) -> str | None:
+        """Return the source id this stream was created for."""
+        return self._source_id
+
+    @property
+    def stream_id(self) -> str | None:
+        """Return the Snapcast stream identifier, if registered."""
+        if self.snap_stream:
+            return self.snap_stream.identifier
+        return None
+
+    @property
+    def is_streaming(self) -> bool:
+        """Return True if the FFmpeg streaming task is currently running."""
+        return self._is_streaming
+
+    async def setup(self) -> None:
+        """Prepare the Snapcast stream resources.
+
+        Ensures a Snapcast source exists on the server. If `cntrl_queue_id` is set,
+        also starts the Unix socket server used by the control script.
+        """
+        async with self._lifecycle_lock:
+            if self._destroyed:
+                raise RuntimeError("Session is destroyed")
+            if self._setup_done:
+                return
+            if self._provider._snapserver is None:
+                raise RuntimeError("Snapserver needs to be setup first")
+
+            if self._cntrl_queue_id:
+                await self._start_socket_server()
+
+            await self._register_tcp_server_source()
+            self._setup_done = True
+
+    async def destroy(self) -> None:
+        """Stop streaming and tear down all resources.
+
+        This stops the streamer task (if running), removes the Snapcast source,
+        and stops the optional control socket server.
+        """
+        async with self._lifecycle_lock:
+            if self._destroyed:
+                return
+            self._destroyed = True
+
+        self.request_stop_stream()
+        await self.wait_for_stopped()
+        await self._remove_snap_source()
+        await self._stop_socket_server()
+
+    async def start_stream(self, allow_restart: bool = False) -> None:
+        """Start streaming the configured media to the Snapcast source.
+
+        Raises:
+            RuntimeError: If the streamer task is already running.
+        """
+        await self.setup()
+        async with self._lifecycle_lock:
+            if self._streamer_task and not self._streamer_task.done():
+                if not allow_restart:
+                    raise RuntimeError("streamer already running")
+                self._restart_if_running()
+                return
+
+            self._stop_requested = False
+            self._restart_requested = False
+            self._stop_streamer_evt.clear()
+            self._streamer_started_evt.clear()
+            self._streamer_task = self._mass.create_task(self._streamer_task_impl())
+            self._streamer_task.add_done_callback(self._on_streamer_done)
+
+    async def wait_for_started(self, timeout_sec: float | None = None) -> None:
+        """Wait until the streamer task signals it has started.
+
+        Args:
+            timeout_sec: Optional timeout in seconds.
+        """
+        try:
+            await asyncio.wait_for(self._streamer_started_evt.wait(), timeout_sec)
+        except TimeoutError:
+            self._logger.warning(
+                "Timeout waiting for stream %s to start; Canceling...",
+                self.stream_name,
+            )
+
+    def update_media(self, media: PlayerMedia) -> None:
+        """Update the media to play and restart the stream if required."""
+        if media != self.media:
+            self.media = media
+            self._restart_if_running()
+
+    def update_filter_settings(self, from_player: str | None = None) -> None:
+        """Update the filter setting."""
+        take_from = from_player or self._filter_settings_owner
+        if not take_from:
+            raise RuntimeError("No player provided to read filter settings from.")
+        new_settings = get_player_filter_params(
+            self._mass,
+            take_from,
+            DEFAULT_SNAPCAST_FORMAT,
+            DEFAULT_SNAPCAST_FORMAT,
+        )
+        if from_player:
+            self._filter_settings_owner = from_player
+        if new_settings != self._filter_settings:
+            self._restart_if_running()
+
+    def request_stop_stream(self) -> None:
+        """Request the streamer task to stop.
+
+        This is cooperative: the streamer task will stop when it observes the stop event.
+        Any pending inactivity stop timer is canceled.
+        """
+        self._stop_requested = True
+        self._restart_requested = False  # explicit stop cancels any pending restart
+        self._stop_streamer_evt.set()
+
+        self._stop_timer_started_at = None
+        if self._stop_timer:
+            self._stop_timer.cancel()
+
+    def set_in_use(self, in_use: bool) -> None:
+        """Mark the stream as in-use or idle.
+
+        When marked idle, a delayed stop is scheduled. When marked in-use, any pending
+        delayed stop is canceled.
+        """
+        if in_use:
+            self._stop_timer_started_at = None
+            if self._stop_timer:
+                self._stop_timer.cancel()
+        elif self._stop_timer_started_at is None:
+            self._stop_timer_started_at = self._mass.loop.time()
+            self._stop_timer = self._mass.loop.call_later(60.0, self.request_stop_stream)
+
+    async def wait_for_stopped(self, timeout_sec: float | None = None) -> None:
+        """Wait for the streamer task to finish.
+
+        If the task does not finish within the timeout, it is canceled and awaited.
+
+        Args:
+            timeout_sec: Optional timeout in seconds.
+        """
+        curr_task = self._streamer_task
+        if not curr_task:
+            return
+        try:
+            await asyncio.wait_for(curr_task, timeout_sec)
+        except asyncio.CancelledError:
+            self._logger.warning("Streamer task got canceled")
+        except TimeoutError:
+            self._logger.warning(
+                "Timeout waiting for stream %s to finish; Canceling...",
+                self.stream_name,
+            )
+            curr_task.cancel()
+            await asyncio.gather(curr_task, return_exceptions=True)
+
+    async def _streamer_task_impl(self) -> None:
+        """Streamer task implementation.
+
+        Runs FFmpeg to push audio to the Snapcast TCP source until FFmpeg exits or a stop
+        request is received. After exit, waits briefly for the Snapcast stream to report
+        an idle state.
+        """
+        stream_path = self._snap_get_stream_path()
+        if stream_path is None:
+            raise RuntimeError("The path to stream to is not set")
+
+        self._logger.debug("Start streaming to %s", stream_path)
+        self._stop_streamer_evt.clear()
+        self._streamer_started_evt.clear()
+        if self._filter_settings_owner:
+            self._filter_settings = get_player_filter_params(
+                self._mass,
+                self._filter_settings_owner,
+                DEFAULT_SNAPCAST_FORMAT,
+                DEFAULT_SNAPCAST_FORMAT,
+            )
+        audio_source = self._mass.streams.get_stream(self.media, DEFAULT_SNAPCAST_FORMAT)
+        try:
+            async with FFMpeg(
+                audio_input=audio_source,
+                input_format=DEFAULT_SNAPCAST_FORMAT,
+                output_format=DEFAULT_SNAPCAST_FORMAT,
+                filter_params=self._filter_settings or [],
+                audio_output=stream_path,
+                extra_input_args=["-y", "-re"],
+            ) as ffmpeg_proc:
+                wait_ffmpeg = self._mass.create_task(ffmpeg_proc.wait())
+                wait_stop = self._mass.create_task(self._stop_streamer_evt.wait())
+                self._streamer_started_evt.set()
+                self._is_streaming = True
+
+                done, pending = await asyncio.wait(
+                    {wait_ffmpeg, wait_stop},
+                    return_when=asyncio.FIRST_COMPLETED,
+                )
+
+                if wait_stop in done and wait_ffmpeg not in done:
+                    self._logger.debug("Stopping stream %s requested.", self.stream_name)
+                    wait_ffmpeg.cancel()
+                    await asyncio.gather(wait_ffmpeg, return_exceptions=True)
+                    return
+
+                await wait_ffmpeg
+                for t in pending:
+                    t.cancel()
+                await asyncio.gather(*pending, return_exceptions=True)
+
+        finally:
+            self._is_streaming = False
+            self._logger.debug("Finished streaming to %s", stream_path)
+            # Wait a bit for snap stream to become idle
+            try:
+
+                async def wait_until_idle() -> None:
+                    while True:
+                        stream_is_idle = False
+                        with suppress(KeyError):
+                            snap_stream = self._provider._snapserver.stream(self.stream_name)
+                            stream_is_idle = snap_stream.status == "idle"
+                        if self._mass.closing or stream_is_idle:
+                            break
+                        await asyncio.sleep(0.25)
+
+                await asyncio.wait_for(wait_until_idle(), timeout=10.0)
+
+            except TimeoutError:
+                self._logger.warning(
+                    "Timeout waiting for stream %s to become idle",
+                    self.stream_name,
+                )
+
+    def _on_streamer_done(self, t: asyncio.Task[None]) -> None:
+        """Handle streamer task completion and optional cleanup."""
+        restart = False
+        try:
+            t.result()
+        except asyncio.CancelledError:
+            self._logger.debug("Streamer task cancelled: %s", self.stream_name)
+        except Exception:
+            self._logger.exception("Streamer task failed")
+        finally:
+            restart = self._restart_requested and not self._destroyed
+
+            if self._streamer_task is t:
+                self._streamer_task = None
+
+            # reset per-run state
+            self._restart_requested = False
+            self._stop_requested = False
+            self._stop_streamer_evt.clear()
+            self._streamer_started_evt.clear()
+
+        if restart:
+            self._mass.create_task(self._restart_stream_locked())
+        elif self._destroy_on_stop:
+            self._mass.create_task(self._provider.delete_ma_stream(self.stream_name))
+
+    def _restart_if_running(self) -> None:
+        """Request a running stream to restart."""
+        t = self._streamer_task
+        if not t or t.done():
+            return
+
+        if self._stop_requested or self._stop_streamer_evt.is_set():
+            return
+
+        self._restart_requested = True
+        self._stop_requested = True
+        self._stop_streamer_evt.set()
+
+        self._stop_timer_started_at = None
+        if self._stop_timer:
+            self._stop_timer.cancel()
+
+    async def _restart_stream_locked(self) -> None:
+        """Restart the streamer under the lifecycle lock."""
+        async with self._lifecycle_lock:
+            if self._destroyed:
+                return
+            if self._streamer_task and not self._streamer_task.done():
+                return
+
+            # reset state and start a fresh run
+            self._stop_requested = False
+            self._restart_requested = False
+            self._stop_streamer_evt.clear()
+            self._streamer_started_evt.clear()
+
+            self._streamer_task = self._mass.create_task(self._streamer_task_impl())
+            self._streamer_task.add_done_callback(self._on_streamer_done)
+
+    async def _register_tcp_server_source(self) -> None:
+        """Create a Snapcast TCP source for this stream (or reuse an existing one)."""
+        # prefer to reuse existing stream if possible
+        if self.snap_stream:
+            return
+
+        # The control script is used only for music streams in the builtin server
+        extra_args = ""
+        if (cntrl_queue_id := self._cntrl_queue_id) is not None:
+            # Create socket server for control script communication
+            socket_path = self._socket_path
+            if socket_path is None:
+                raise RuntimeError("socket_path needs to be set if cntrl_queue_id is set")
+            extra_args = (
+                f"&controlscript={urllib.parse.quote_plus('control.py')}"
+                f"&controlscriptparams=--queueid={urllib.parse.quote_plus(cntrl_queue_id)}%20"
+                f"--socket={urllib.parse.quote_plus(socket_path)}%20"
+                f"--streamserver-ip={self._mass.streams.publish_ip}%20"
+                f"--streamserver-port={self._mass.streams.publish_port}"
+            )
+
+        attempts = 50
+        while attempts:
+            attempts -= 1
+            # pick a random port
+            port = random.randint(4953, 4953 + 200)
+            ## Do we need to add a time out here?
+            result = await self._provider._snapserver.stream_add_stream(
+                # NOTE: setting the sampleformat to something else
+                # (like 24 bits bit depth) does not seem to work at all!
+                f"tcp://0.0.0.0:{port}?sampleformat=48000:16:2"
+                f"&idle_threshold={self._provider._snapcast_stream_idle_threshold}"
+                f"{extra_args}&name={self.stream_name}"
+            )
+            if result is None or "id" not in result:
+                # if the port is already taken, the result will be an error
+                self._logger.warning(result)
+                continue
+            ## Do we need to synchronize the snapserver repr first?
+            self.snap_stream = self._provider._snapserver.stream(result["id"])
+            self.snap_stream.set_callback(self._snap_on_stream_update)
+            return
+
+        if self._socket_server:
+            await self._stop_socket_server()
+
+        msg = "Unable to create stream - No free port found?"
+        raise RuntimeError(msg)
+
+    async def _remove_snap_source(self) -> None:
+        """Remove the Snapcast source created for this stream and detach groups."""
+        if self._mass.closing or self.snap_stream is None:
+            return
+
+        for snap_group in self._provider._snapserver.groups:
+            if snap_group.stream != self.snap_stream.identifier:
+                continue
+            self._logger.debug(f"Set stream of group {snap_group.name} to default.")
+            await snap_group.set_stream("default")
+
+        with suppress(KeyError, AttributeError):
+            snap_stream = self._provider._snapserver.stream(self.stream_name)
+            await self._provider._snapserver.stream_remove_stream(snap_stream.identifier)
+
+        if self._socket_server:
+            await self._stop_socket_server()
+        self._snap_on_stream_update()
+
+        return
+
+    def _snap_get_stream_path(self) -> str | None:
+        """Return the Snapcast TCP URI to stream to."""
+        if self.snap_stream is None:
+            return None
+
+        uri = self.snap_stream._stream.get("uri", {})
+        uri_host = uri.get("host", "")
+        stream_path = self.snap_stream.path or f"tcp://{uri_host}"
+        return stream_path.replace("0.0.0.0", self._provider._snapcast_server_host)
+
+    def _snap_on_stream_update(self, stream: SnapstreamProto | None = None) -> None:
+        """Handle Snapcast stream updates and trigger group member refresh."""
+        if self.snap_stream is None:
+            return
+
+        for snap_group in self._provider._snapserver.groups:
+            if snap_group.stream != self.snap_stream.identifier:
+                continue
+            self._provider.poke_group_members(snap_group)
+
+    async def _start_socket_server(self) -> str:
+        """Get or create a socket server for the given queue.
+
+        :return: The path to the Unix socket.
+        """
+        if self._socket_server:
+            return self._socket_server.socket_path
+
+        if self._cntrl_queue_id is None:
+            raise RuntimeError("Socket server require _cntrl_queue_id to be set")
+
+        socket_path = CONTROL_SOCKET_PATH_TEMPLATE.format(queue_id=self._cntrl_queue_id)
+        socket_server = SnapcastSocketServer(
+            mass=self._mass,
+            queue_id=self._cntrl_queue_id,
+            socket_path=socket_path,
+            streamserver_ip=str(self._mass.streams.publish_ip),
+            streamserver_port=cast("int", self._mass.streams.publish_port),
+        )
+        await socket_server.start()
+        self._socket_server = socket_server
+        self._socket_path = socket_path
+        self._logger.debug(
+            "Created socket server for queue %s at %s", self._cntrl_queue_id, socket_path
+        )
+        return socket_path
+
+    async def _stop_socket_server(self) -> None:
+        """Stop and remove the socket server for the given queue."""
+        if not self._socket_server:
+            return
+
+        await self._socket_server.stop()
+        self._socket_server = None
+        self._logger.debug("Stopped socket server for queue %s", self._cntrl_queue_id)
index 221a3f74c605231b9a113303a78d67cbc5226bc1..7e6e77163c02dd79cc78c7cc27226c9c1bdf372a 100644 (file)
@@ -1,34 +1,58 @@
 """Snapcast Player."""
 
+from __future__ import annotations
+
 import asyncio
-import random
-import time
-import urllib.parse
 from contextlib import suppress
-from typing import TYPE_CHECKING, cast
+from typing import TYPE_CHECKING, TypedDict, cast
 
-from music_assistant_models.config_entries import ConfigEntry, ConfigValueType
-from music_assistant_models.enums import PlaybackState, PlayerFeature
+from music_assistant_models.enums import MediaType, PlaybackState, PlayerFeature
 from music_assistant_models.player import DeviceInfo, PlayerMedia
-from snapcast.control.client import Snapclient
-from snapcast.control.group import Snapgroup
-from snapcast.control.stream import Snapstream
-
-from music_assistant.constants import ATTR_ANNOUNCEMENT_IN_PROGRESS
-from music_assistant.helpers.audio import get_player_filter_params
-from music_assistant.helpers.compare import create_safe_string
-from music_assistant.helpers.ffmpeg import FFMpeg
-from music_assistant.models.player import Player
-from music_assistant.providers.snapcast.constants import (
-    CONF_ENTRY_SAMPLE_RATES_SNAPCAST,
-    DEFAULT_SNAPCAST_FORMAT,
-    MASS_ANNOUNCEMENT_POSTFIX,
-    MASS_STREAM_PREFIX,
-    SnapCastStreamType,
+from propcache import under_cached_property as cached_property
+
+from music_assistant.constants import (
+    ATTR_ANNOUNCEMENT_IN_PROGRESS,
+    CONF_ENTRY_HTTP_PROFILE_HIDDEN,
+    SYNCGROUP_PREFIX,
 )
+from music_assistant.models.player import Player
+from music_assistant.providers.snapcast.constants import CONF_ENTRY_SAMPLE_RATES_SNAPCAST
+from music_assistant.providers.snapcast.ma_stream import SnapcastMAStream
 
 if TYPE_CHECKING:
+    from music_assistant_models.config_entries import ConfigEntry, ConfigValueType
+
     from music_assistant.providers.snapcast.provider import SnapCastProvider
+    from music_assistant.providers.snapcast.snap_cntrl_proto import (
+        SnapclientProto,
+        SnapstreamProto,
+    )
+
+
+class TrackedPlayerState(TypedDict, total=False):
+    """Tracked state for the Snapcast MA player.
+
+    It is used for change detection and state synchronization, and may be
+    partially populated depending on which information is
+    currently available.
+
+    Keys prefixed with ``_attr_`` are exposed as player attributes, while the
+    remaining keys represent internal Snapcast grouping and connection state.
+    """
+
+    # Player attribute fields
+    _attr_name: str
+    _attr_volume_level: float
+    _attr_volume_muted: bool
+    _attr_available: bool
+
+    # snapclient fields
+    connected: bool
+    stream_id: str
+    stream_status: str | None
+    grp_name: str
+    grp_member_ids: list[str]
+    grp_member_avail: list[bool]
 
 
 class SnapCastPlayer(Player):
@@ -36,92 +60,136 @@ class SnapCastPlayer(Player):
 
     def __init__(
         self,
-        provider: "SnapCastProvider",
+        provider: SnapCastProvider,
         player_id: str,
-        snap_client: Snapclient,
-        snap_client_id: str,
+        snap_client: SnapclientProto,
     ) -> None:
         """Init."""
-        self.provider: SnapCastProvider  # type: ignore[misc]
         self.snap_client = snap_client
-        self.snap_client_id = snap_client_id
         super().__init__(provider, player_id)
-        self._stream_task: asyncio.Task[None] | None = None
+
+        self._snap_ma_stream: SnapcastMAStream | None = None
+
+        self._update_worker: asyncio.Task[None] | None = None
+        self._poke_evt = asyncio.Event()
+        self._state_update_lock = asyncio.Lock()
+        self._last_tracked_state: TrackedPlayerState | None = None
+
+    @property
+    def snap_provider(self) -> SnapCastProvider:
+        """Return the Snapcast provider instance."""
+        return cast("SnapCastProvider", self.provider)
 
     @property
     def requires_flow_mode(self) -> bool:
         """Return if the player requires flow mode."""
         return True
 
-    @property
+    @cached_property
     def synced_to(self) -> str | None:
-        """
-        Return the id of the player this player is synced to (sync leader).
+        """Return the id of the player this player is synced to (sync leader)."""
+        grp_name = self.snap_group_name
+        if grp_name == self.player_id:
+            # is group leader
+            return None
 
-        If this player is not synced to another player (or is the sync leader itself),
-        this should return None.
-        If it is part of a (permanent) group, this should also return None.
-        """
-        snap_group = self._get_snapgroup()
-        assert snap_group is not None  # for type checking
-        master_id: str = self.provider._get_ma_id(snap_group.clients[0])
-        if len(snap_group.clients) < 2 or self.player_id == master_id:
+        grp_player_ids = self._get_player_ids_of_curr_group()
+        if len(grp_player_ids) < 2 or grp_name not in grp_player_ids:
             return None
-        return master_id
+
+        if leader_player := self.mass.players.get(grp_name):
+            return grp_name if leader_player.available else None
+
+        return None
+
+    @cached_property
+    def group_members(self) -> list[str]:
+        """Return the group members of the player."""
+        if not self._attr_available:
+            return []
+
+        grp_name = self.snap_group_name
+        if grp_name != self.player_id:
+            # only group leaders can have members
+            return []
+
+        player_ids = self._get_player_ids_of_curr_group()
+        if self.player_id not in player_ids:
+            # should not happen, unless the current
+            # state repr is invalid
+            return []
+
+        player_ids.remove(self.player_id)
+        connected = [
+            player_id
+            for player_id in player_ids
+            if (client := self.snap_provider.get_snap_client(player_id=player_id))
+            and client.connected
+        ]
+        if connected:
+            return [self.player_id, *connected]
+
+        return []
+
+    @property
+    def playback_state(self) -> PlaybackState:
+        """Return the current playback state of the player."""
+        snap_stream = self._get_active_snapstream()
+        if snap_stream is None:
+            return PlaybackState.IDLE
+
+        if snap_stream.identifier == "default" or snap_stream.status == "idle":
+            return PlaybackState.IDLE
+
+        return PlaybackState.PLAYING
 
     def setup(self) -> None:
         """Set up player."""
         self._attr_name = self.snap_client.friendly_name
         self._attr_available = self.snap_client.connected
+
+        host_dict = self.snap_client._client.get("host", {})
+        os, arch, ip = (host_dict.get(key, "") for key in ["os", "arch", "ip"])
         self._attr_device_info = DeviceInfo(
-            model=self.snap_client._client.get("host").get("os"),
-            manufacturer=self.snap_client._client.get("host").get("arch"),
+            model=os,
+            manufacturer=arch,
         )
-        self._attr_device_info.ip_address = self.snap_client._client.get("host").get("ip")
+        self._attr_device_info.ip_address = ip
         self._attr_supported_features = {
             PlayerFeature.SET_MEMBERS,
             PlayerFeature.VOLUME_SET,
             PlayerFeature.VOLUME_MUTE,
             PlayerFeature.PLAY_ANNOUNCEMENT,
         }
-        self._attr_can_group_with = {self.provider.instance_id}
+        self._attr_can_group_with = {self.snap_provider.instance_id}
+        if not self._update_worker:
+            self._update_worker = self.mass.create_task(self._player_update_worker)
 
     async def volume_set(self, volume_level: int) -> None:
         """Send VOLUME_SET command to given player."""
+        # Use optimistic server state for now
+        # not guaranteed that the client respects it
         await self.snap_client.set_volume(volume_level)
 
     async def stop(self) -> None:
         """Send STOP command to given player."""
-        # update the state first to avoid race conditions, if an active play_announcement
-        # finishes the player.state should be IDLE.
-        self._attr_playback_state = PlaybackState.IDLE
-        self._attr_current_media = None
-        self._set_childs_state()
-
-        self.update_state()
-
-        # we change the active stream only if music was playing
-        if not self.extra_data.get(ATTR_ANNOUNCEMENT_IN_PROGRESS):
-            snapgroup = self._get_snapgroup()
-            assert snapgroup is not None  # for type checking
-            await snapgroup.set_stream("default")
-
-        # but we always delete the music stream (whether it was active or not)
-        await self._delete_stream(self._get_stream_name(SnapCastStreamType.MUSIC))
+        if ma_stream := self.active_snap_ma_stream:
+            ma_stream.request_stop_stream()
+            return
 
-        if self._stream_task is not None:
-            if not self._stream_task.done():
-                self._stream_task.cancel()
-                with suppress(asyncio.CancelledError):
-                    await self._stream_task
-            self._stream_task = None
+        self.poke_player_update()
 
     async def volume_mute(self, muted: bool) -> None:
         """Send MUTE command to given player."""
-        # Using optimistic value because the library does not return the response from the api
-        await self.snap_client.set_muted(muted)
-        self._attr_volume_muted = muted
-        self.update_state()
+        # Use optimistic server state for now
+        # not guaranteed that the client respects it
+        # TODO: move this to the snapcast python library
+        vol = self.snap_client._client["config"]["volume"]
+        vol["muted"] = muted
+        res = await self.snap_provider._snapserver.client_volume(self.snap_client.identifier, vol)
+        if res and "muted" in res:
+            self.snap_client._client["config"]["volume"] = res
+            self.snap_client.callback()
 
     async def set_members(
         self,
@@ -129,28 +197,76 @@ class SnapCastPlayer(Player):
         player_ids_to_remove: list[str] | None = None,
     ) -> None:
         """Handle SET_MEMBERS command on the player."""
-        group = self._get_snapgroup()
-        assert group is not None  # for type checking
-        # handle client additions
-        for player_id in player_ids_to_add or []:
-            snapcast_id = self.provider._get_snapclient_id(player_id)
-            if snapcast_id not in group.clients:
-                await group.add_client(snapcast_id)
-                if player_id not in self._attr_group_members:
-                    self._attr_group_members.append(player_id)
-        # handle client removals
-        for player_id in player_ids_to_remove or []:
-            snapcast_id = self.provider._get_snapclient_id(player_id)
-            if snapcast_id in group.clients:
-                await group.remove_client(snapcast_id)
-                if player_id in self._attr_group_members:
-                    self._attr_group_members.remove(player_id)
-                # Set default stream and stop ungrouped players
-                removed_snapclient = self.provider._snapserver.client(snapcast_id)
-                await removed_snapclient.group.set_stream("default")
-                if removed_player := self.mass.players.get(player_id):
-                    await removed_player.stop()
-        self.update_state()
+        # get the group owned by this player (identified by the group name)
+        player_group = await self.snap_provider.ensure_player_owned_group(self.player_id)
+
+        if player_group is None:
+            return
+
+        player_group.set_callback(None)
+
+        curr_ma_player_ids = [
+            ma_id
+            for cli_id in player_group.clients
+            if (ma_id := self.snap_provider._get_ma_id(cli_id))
+        ]
+
+        curr_stream_id = player_group.stream
+        sync_group_player = None
+        if curr_ma_stream := self.snap_provider.get_snap_ma_stream(curr_stream_id):
+            media = curr_ma_stream.media
+            if media.media_type == MediaType.PLUGIN_SOURCE:
+                custom_data = media.custom_data or {}
+                assigned_player = custom_data.get("player_id", "")
+                if assigned_player.startswith(SYNCGROUP_PREFIX):
+                    sync_group_player = self.mass.players.get(assigned_player)
+            else:
+                media_src_id = media.source_id or ""
+                if media_src_id.startswith(SYNCGROUP_PREFIX):
+                    sync_group_player = self.mass.players.get(media_src_id)
+
+        if sync_group_player and self.player_id in (player_ids_to_remove or []):
+            # players in sync_group_player.group_members will be rejoined
+            # remove others first
+            for id_to_remove in player_ids_to_remove or []:
+                if id_to_remove == self.player_id:
+                    continue
+                if (
+                    id_to_remove in curr_ma_player_ids
+                    and id_to_remove not in sync_group_player.group_members
+                ):
+                    await self.snap_provider.isolate_player_to_dedicated_group(
+                        id_to_remove, target_stream_id="default"
+                    )
+
+            # split remaining group into individual groups,
+            # keeps the current stream, set this group to default stream
+            await self.snap_provider.isolate_player_to_dedicated_group(
+                target_player_id=self.player_id,
+                target_stream_id="default",
+                others_stream_id=curr_stream_id,
+            )
+        else:
+            for player_id in player_ids_to_remove or []:
+                if player_id not in curr_ma_player_ids:
+                    continue
+                await self.snap_provider.isolate_player_to_dedicated_group(
+                    player_id, target_stream_id="default"
+                )
+                curr_ma_player_ids.remove(player_id)
+
+        for ma_id in player_ids_to_add or []:
+            if (
+                snap_id := self.snap_provider._get_snapclient_id(ma_id)
+            ) and ma_id not in curr_ma_player_ids:
+                await player_group.add_client(snap_id)
+
+        # some caller require instant state updates before returning
+        async with self._state_update_lock:
+            if await self._process_snapcast_client_state():
+                self.update_state()
+
+        self.snap_provider._update_group_callbacks(poke=True)
 
     async def play_media(self, media: PlayerMedia) -> None:
         """Handle PLAY MEDIA on given player."""
@@ -158,134 +274,70 @@ class SnapCastPlayer(Player):
             msg = "A synced player cannot receive play commands directly"
             raise RuntimeError(msg)
 
-        # stop any existing streamtasks first
-        if self._stream_task is not None:
-            if not self._stream_task.done():
-                self._stream_task.cancel()
-                with suppress(asyncio.CancelledError):
-                    await self._stream_task
-            self._stream_task = None
+        ma_stream = await self.snap_provider.get_snapcast_media_stream(
+            media, filter_settings_owner=self.player_id
+        )
+
+        if ma_stream is None or ma_stream.stream_id is None:
+            return
+
+        self._snap_ma_stream = ma_stream
 
-        # get stream or create new one
-        stream_name = self._get_stream_name(SnapCastStreamType.MUSIC)
-        stream = await self._get_or_create_stream(stream_name, media.source_id or self.player_id)
+        # e.g. DSP settings require a restart
+        await self._snap_ma_stream.start_stream(allow_restart=True)
 
         # if no announcement is playing we activate the stream now, otherwise it
         # will be activated by play_announcement when the announcement is over.
         if not self.extra_data.get(ATTR_ANNOUNCEMENT_IN_PROGRESS):
-            snap_group = self._get_snapgroup()
-            assert snap_group is not None  # for type checking
-            await snap_group.set_stream(stream.identifier)
-
-        self._attr_current_media = media
-
-        # select audio source
-        audio_source = self.mass.streams.get_stream(media, DEFAULT_SNAPCAST_FORMAT)
-
-        async def _streamer() -> None:
-            stream_path = self._get_stream_path(stream)
-            self.logger.debug("Start streaming to %s", stream_path)
-            async with FFMpeg(
-                audio_input=audio_source,
-                input_format=DEFAULT_SNAPCAST_FORMAT,
-                output_format=DEFAULT_SNAPCAST_FORMAT,
-                filter_params=get_player_filter_params(
-                    self.mass, self.player_id, DEFAULT_SNAPCAST_FORMAT, DEFAULT_SNAPCAST_FORMAT
-                ),
-                audio_output=stream_path,
-                extra_input_args=["-y", "-re"],
-            ) as ffmpeg_proc:
-                self._attr_playback_state = PlaybackState.PLAYING
-                self._attr_current_media = media
-                self._attr_elapsed_time = 0
-                self._attr_elapsed_time_last_updated = time.time()
-                self.update_state()
-
-                self._set_childs_state()
-                await ffmpeg_proc.wait()
+            player_group = await self.snap_provider.ensure_player_owned_group(self.player_id)
+            assert player_group is not None  # for type checking
+            await player_group.set_stream(ma_stream.stream_id)
 
-            self.logger.debug("Finished streaming to %s", stream_path)
-            # we need to wait a bit for the stream status to become idle
-            # to ensure that all snapclients have consumed the audio
-            while stream.status != "idle":
-                await asyncio.sleep(0.25)
-            self._attr_playback_state = PlaybackState.IDLE
-            self._attr_elapsed_time = time.time() - self._attr_elapsed_time_last_updated
-            self.update_state()
-            self._set_childs_state()
-
-        # start streaming the queue (pcm) audio in a background task
-        self._stream_task = self.mass.create_task(_streamer())
+        self.poke_player_update()
 
     async def play_announcement(
         self, announcement: PlayerMedia, volume_level: int | None = None
     ) -> None:
         """Handle (provider native) playback of an announcement on given player."""
-        # get stream or create new one
-        stream_name = self._get_stream_name(SnapCastStreamType.ANNOUNCEMENT)
-        stream = await self._get_or_create_stream(stream_name, None)
+        was_synced_to: str | None = self.synced_to
+        orig_volume_level: int | None = self.volume_level
+
+        prev_stream = self.active_snap_ma_stream
+
+        ma_stream = await self.snap_provider.get_snapcast_media_stream(
+            announcement, filter_settings_owner=self.player_id
+        )
+        player_group = await self.snap_provider.ensure_player_owned_group(self.player_id)
+
+        if ma_stream is None or ma_stream.stream_id is None or player_group is None:
+            return
 
-        # always activate the stream (announcements have priority over music)
-        snap_group = self._get_snapgroup()
-        assert snap_group is not None  # for type checking
-        await snap_group.set_stream(stream.identifier)
+        await player_group.set_stream(ma_stream.stream_id)
 
-        # Unfortunately snapcast sets a volume per client (not per stream), so we need a way to
-        # set the announcement volume without affecting the music volume.
-        # We go for the simplest solution: save the previous volume, change it, restore later
-        # (with the downside that the change will be visible in the UI)
-        orig_volume_level = self.volume_level  # Note: might be None
+        if self.snap_provider._use_builtin_server:
+            await asyncio.sleep(self.snap_provider._snapcast_server_buffer_size / 1000.0)
 
         if volume_level is not None:
             await self.volume_set(volume_level)
 
-        input_format = DEFAULT_SNAPCAST_FORMAT
-        assert announcement.custom_data is not None  # for type checking
-        audio_source = self.mass.streams.get_announcement_stream(
-            announcement.custom_data["announcement_url"],
-            output_format=DEFAULT_SNAPCAST_FORMAT,
-            pre_announce=announcement.custom_data["pre_announce"],
-            pre_announce_url=announcement.custom_data["pre_announce_url"],
-        )
+        await ma_stream.start_stream()
+        await ma_stream.wait_for_stopped()
 
-        # stream the audio, wait for it to finish (play_announcement should return after the
-        # announcement is over to avoid simultaneous announcements).
-        stream_path = self._get_stream_path(stream)
-        self.logger.debug("Start announcement streaming to %s", stream_path)
-        async with FFMpeg(
-            audio_input=audio_source,
-            input_format=input_format,
-            output_format=DEFAULT_SNAPCAST_FORMAT,
-            filter_params=get_player_filter_params(
-                self.mass, self.player_id, input_format, DEFAULT_SNAPCAST_FORMAT
-            ),
-            audio_output=stream_path,
-            extra_input_args=["-y", "-re"],
-        ) as ffmpeg_proc:
-            await ffmpeg_proc.wait()
-
-        self.logger.debug("Finished announcement streaming to %s", stream_path)
-        # we need to wait a bit for the stream status to become idle
-        # to ensure that all snapclients have consumed the audio
-        while stream.status != "idle":
-            await asyncio.sleep(0.25)
-
-        # delete the announcement stream
-        await self._delete_stream(stream_name)
-
-        # restore volume, if we changed it above and it's still the same we set
-        # (the user did not change it himself while the announcement was playing)
         if self.volume_level == volume_level and orig_volume_level is not None:
             await self.volume_set(orig_volume_level)
 
-        # and restore the group to either the default or the music stream
-        if self.playback_state == PlaybackState.IDLE:
-            new_stream_name = "default"
+        if was_synced_to:
+            if (
+                leader_group := await self.snap_provider.ensure_player_owned_group(was_synced_to)
+            ) is None:
+                return
+            await leader_group.add_client(self.snap_client.identifier)
         else:
-            new_stream_name = self._get_stream_name(SnapCastStreamType.MUSIC)
-        group = self._get_snapgroup()
-        assert group is not None  # for type checking
-        await group.set_stream(new_stream_name)
+            await player_group.set_stream(
+                prev_stream.stream_id
+                if prev_stream and prev_stream.stream_id is not None
+                else "default"
+            )
 
     async def get_config_entries(
         self,
@@ -295,139 +347,206 @@ class SnapCastPlayer(Player):
         """Player config."""
         return [
             CONF_ENTRY_SAMPLE_RATES_SNAPCAST,
+            # we don't use the http server for streaming
+            CONF_ENTRY_HTTP_PROFILE_HIDDEN,
         ]
 
-    def _handle_player_update(self, snap_client: Snapclient) -> None:
-        """Process Snapcast update to Player controller.
-
-        This is a callback function
+    def _handle_player_update(self, snap_client: SnapclientProto) -> None:
+        """Forward snap_client updates."""
+        self.poke_player_update()
+
+    def poke_player_update(self) -> None:
+        """Signal that a player state update should be processed."""
+        self._poke_evt.set()
+
+    async def _player_update_worker(self) -> None:
+        """Aggregate and process player state update requests."""
+        while True:
+            await self._poke_evt.wait()
+            self._poke_evt.clear()
+            while True:
+                call_update: bool = False
+                async with self._state_update_lock:
+                    call_update = await self._process_snapcast_client_state()
+                if call_update:
+                    self.update_state()
+                if self._poke_evt.is_set():
+                    self._poke_evt.clear()
+                    continue
+                break
+
+    async def _process_snapcast_client_state(self) -> bool:
+        """Process the latest Snapcast client state and apply changes to this player.
+
+        Returns:
+        True if changes were applied and a state update should be emitted via
+        ``update_state()``; False if no update is necessary (or if required data
+        is temporarily unavailable and the update should be retried later).
         """
-        self._attr_name = self.snap_client.friendly_name
-        self._attr_volume_level = self.snap_client.volume
-        self._attr_volume_muted = self.snap_client.muted
-        self._attr_available = self.snap_client.connected
+        snap_group = self.snap_client.group
+        if snap_group is None:
+            # some data syncing error, a client is always a group member
+            # retry again later, don't call update now
+            return False
+
+        stream_id = snap_group.stream
+        snap_stream: SnapstreamProto | None = None
+        with suppress(KeyError):
+            snap_stream = self.snap_provider._snapserver.stream(stream_id)
+
+        members = list(snap_group.clients)  # snapshot
+
+        curr_state: TrackedPlayerState = {
+            "_attr_name": self.snap_client.friendly_name,
+            "_attr_volume_level": self.snap_client.volume,
+            "_attr_volume_muted": self.snap_client.muted,
+            "_attr_available": self.snap_client.connected,
+            "connected": self.snap_client.connected,
+            "stream_id": snap_group.stream,
+            "stream_status": snap_stream.status if snap_stream is not None else None,
+            "grp_name": snap_group.name,
+            "grp_member_ids": members,
+            "grp_member_avail": [
+                pl.available
+                for cl_id in members
+                if (pl_id := self.snap_provider._get_ma_id(cl_id))
+                and (pl := self.mass.players.get(pl_id))
+            ],
+        }
 
-        # Note: when the active stream is a MASS stream the active_source is __not__ updated at all.
-        # So it doesn't matter whether a MASS stream is for music or announcements.
-        if stream := self._get_active_snapstream():
-            if stream.identifier == "default":
-                self._attr_active_source = None
-            elif not stream.identifier.startswith(MASS_STREAM_PREFIX):
-                # unknown source
-                self._attr_active_source = stream.identifier
-        else:
-            self._attr_active_source = None
+        prev_state: TrackedPlayerState = (
+            self._last_tracked_state if self._last_tracked_state is not None else {}
+        )
+        self._last_tracked_state = curr_state
+
+        # change detection for simple attrs
+        changed_attrs = {
+            k: v for k, v in curr_state.items() if k.startswith("_attr_") and prev_state.get(k) != v
+        }
 
-        self._group_childs()
+        prev_connected = prev_state.get("connected", False)
+        now_connected = curr_state.get("connected", False)
+        connection_changed = prev_connected != now_connected
 
-        self.update_state()
+        prev_stream_id = prev_state.get("stream_id")
+        curr_stream_id = curr_state["stream_id"]
+        prev_stream_status = prev_state.get("stream_status")
+        curr_stream_status = curr_state.get("stream_status")
 
-    def _get_stream_name(self, stream_type: SnapCastStreamType) -> str:
-        """Return the name of the stream for the given player.
+        stream_changed = (
+            prev_stream_id != curr_stream_id or prev_stream_status != curr_stream_status
+        )
 
-        Each player can have up to two concurrent streams, for music and announcements.
+        grouping_changed = any(
+            prev_state.get(k) != curr_state.get(k)
+            for k in ("grp_name", "grp_member_ids", "grp_member_avail")
+        )
 
-        The stream name depends only on player_id (not queue_id) for two reasones:
-        1. Avoid issues when the same queue_id is simultaneously used by two players
-           (eg in universal groups).
-        2. Easily identify which stream belongs to which player, for instance to be able to
-           delete a music stream even when it is not active due to an announcement.
-        """
-        safe_name = create_safe_string(self.player_id, replace_space=True)
-        stream_name = f"{MASS_STREAM_PREFIX}{safe_name}"
-        if stream_type == SnapCastStreamType.ANNOUNCEMENT:
-            stream_name += MASS_ANNOUNCEMENT_POSTFIX
-        return stream_name
-
-    async def _get_or_create_stream(self, stream_name: str, queue_id: str | None) -> Snapstream:
-        """Create new stream on snapcast server (or return existing one)."""
-        # prefer to reuse existing stream if possible
-        if stream := self._get_snapstream(stream_name):
-            return stream
-        # The control script is used only for music streams in the builtin server
-        # (queue_id is None only for announcement streams).
-        extra_args = ""
-        if (
-            self.provider._use_builtin_server
-            and queue_id
-            and self.provider._controlscript_available
+        needs_processing = bool(
+            changed_attrs or grouping_changed or stream_changed or connection_changed
+        )
+        if not needs_processing:
+            return False
+
+        if connection_changed or grouping_changed:
+            self.snap_provider.poke_group_members(snap_group)
+
+        # help cleaning up unused streams
+        if curr_stream_id == "default" or (
+            (my_stream := self._snap_ma_stream)
+            and my_stream.stream_id in {prev_stream_id, curr_stream_id}
         ):
-            # Create socket server for control script communication
-            socket_path = await self.provider.get_or_create_socket_server(queue_id)
-            extra_args = (
-                f"&controlscript={urllib.parse.quote_plus('control.py')}"
-                f"&controlscriptparams=--queueid={urllib.parse.quote_plus(queue_id)}%20"
-                f"--socket={urllib.parse.quote_plus(socket_path)}%20"
-                f"--streamserver-ip={self.mass.streams.publish_ip}%20"
-                f"--streamserver-port={self.mass.streams.publish_port}"
-            )
+            self.snap_provider.update_stream_usage()
 
-        attempts = 50
-        while attempts:
-            attempts -= 1
-            # pick a random port
-            port = random.randint(4953, 4953 + 200)
-            result = await self.provider._snapserver.stream_add_stream(
-                # NOTE: setting the sampleformat to something else
-                # (like 24 bits bit depth) does not seem to work at all!
-                f"tcp://0.0.0.0:{port}?sampleformat=48000:16:2"
-                f"&idle_threshold={self.provider._snapcast_stream_idle_threshold}"
-                f"{extra_args}&name={stream_name}"
-            )
-            if "id" not in result:
-                # if the port is already taken, the result will be an error
-                self.logger.warning(result)
-                continue
-            return self.provider._snapserver.stream(result["id"])
-        msg = "Unable to create stream - No free port found?"
-        raise RuntimeError(msg)
-
-    def _get_snapstream(self, stream_name: str) -> Snapstream | None:
-        """Get a stream by name."""
-        with suppress(KeyError):
-            return self.provider._snapserver.stream(stream_name)
+        # apply changed attrs
+        for key, value in changed_attrs.items():
+            setattr(self, key, value)
+
+        # finally notify state update once
+        return True
+
+    @property
+    def active_snap_ma_stream(self) -> SnapcastMAStream | None:
+        """Return the MA stream source of the active group."""
+        grp = self.snap_client.group
+        if grp is None or grp.stream is None:
+            return None
+
+        if grp.stream == "default":
+            return None
+
+        return self.snap_provider.get_snap_ma_stream(grp.stream)
+
+    @property
+    def snap_group_name(self) -> str:
+        """Return the name of the active group."""
+        snap_group = self.snap_client.group
+        if snap_group is None:
+            return ""
+        return snap_group.name
+
+    @cached_property
+    def _current_media(self) -> PlayerMedia | None:
+        """
+        Return the current media being played by the player.
+
+        Note that this is NOT the final current media of the player,
+        as it may be overridden by a active group/sync membership.
+        Hence it's marked as a private property.
+        The final current media can be retrieved by using the 'current_media' property.
+        """
+        if snap_ma_stream := self.active_snap_ma_stream:
+            return snap_ma_stream.media
         return None
 
-    def _get_stream_path(self, stream: Snapstream) -> str:
-        stream_path = stream.path or f"tcp://{stream._stream['uri']['host']}"
-        return stream_path.replace("0.0.0.0", self.provider._snapcast_server_host)
-
-    async def _delete_stream(self, stream_name: str) -> None:
-        if stream := self._get_snapstream(stream_name):
-            with suppress(TypeError, KeyError, AttributeError):
-                await self.provider._snapserver.stream_remove_stream(stream.identifier)
-
-    def _get_snapgroup(self) -> Snapgroup | None:
-        """Get snapcast group for given player_id."""
-        return cast("Snapgroup | None", self.snap_client.group)
-
-    def _set_childs_state(self) -> None:
-        """Set the state of the child`s of the player."""
-        for child_player_id in self.group_members:
-            if child_player_id == self.player_id:
-                continue
-            if mass_child_player := self.mass.players.get(child_player_id):
-                mass_child_player._attr_playback_state = self.playback_state
-                mass_child_player.update_state()
-
-    def _get_active_snapstream(self) -> Snapstream | None:
+    @property
+    def _active_source(self) -> str | None:
+        """
+        Return the (id of) the active source of the player.
+
+        Only required if the player supports PlayerFeature.SELECT_SOURCE.
+
+        Set to None if the player is not currently playing a source or
+        the player_id if the player is currently playing a MA queue.
+
+        Note that this is NOT the final active source of the player,
+        as it may be overridden by a active group/sync membership.
+        Hence it's marked as a private property.
+        The final active source can be retrieved by using the 'active_source' property.
+        """
+        grp = self.snap_client.group
+        if grp is None or grp.stream is None:
+            return None
+
+        if grp.stream == "default":
+            return None
+
+        if ma_stream := self.snap_provider.get_snap_ma_stream(grp.stream):
+            return ma_stream.source_id
+
+        # external snapcast stream
+        return grp.stream or None
+
+    def _get_active_snapstream(self) -> SnapstreamProto | None:
         """Get active stream for given player_id."""
-        if group := self._get_snapgroup():
-            return self._get_snapstream(group.stream)
+        if group := self.snap_client.group:
+            with suppress(KeyError):
+                return self.snap_provider._snapserver.stream(group.stream)
         return None
 
-    def _group_childs(self) -> None:
-        """Return player_ids of the players synced to this player."""
-        snap_group = self._get_snapgroup()
-        assert snap_group is not None  # for type checking
-        self._attr_group_members.clear()
-        if self.synced_to is not None:
-            return
-        self._attr_group_members.append(self.player_id)
-        for snap_client_id in snap_group.clients:
-            if (
-                self.provider._get_ma_id(snap_client_id) != self.player_id
-                and self.provider._snapserver.client(snap_client_id).connected
-            ):
-                self._attr_group_members.append(self.provider._get_ma_id(snap_client_id))
-        self.update_state()
+    def _get_player_ids_of_curr_group(self) -> list[str]:
+        snap_group = self.snap_client.group
+        if snap_group is None:
+            return []
+        return [
+            ma_id
+            for client_id in snap_group.clients
+            if (ma_id := self.snap_provider._get_ma_id(client_id))
+        ]
+
+    def _get_players_of_curr_group(self) -> list[Player]:
+        return [
+            ma_player
+            for ma_id in self._get_player_ids_of_curr_group()
+            if (ma_player := self.mass.players.get(ma_id))
+        ]
index 6ef28602796e70d4763d6b974caaeb2c1bcd6768..f46f91953e1d76ee14844fb96fc0850a8e5b3193 100644 (file)
@@ -1,23 +1,25 @@
 """SnapCastProvider."""
 
+from __future__ import annotations
+
 import asyncio
+import hashlib
 import logging
 import re
 import shutil
 import socket
+from contextlib import suppress
 from pathlib import Path
-from typing import cast
+from typing import TYPE_CHECKING, cast
 
 from bidict import bidict
-from music_assistant_models.enums import PlaybackState
+from music_assistant_models.enums import MediaType, PlaybackState
 from music_assistant_models.errors import SetupFailedError
-from snapcast.control import create_server
-from snapcast.control.client import Snapclient
-from snapcast.control.group import Snapgroup
-from snapcast.control.server import Snapserver
+from snapcast.control.server import CONTROL_PORT, Snapserver
 from zeroconf import NonUniqueNameException
 from zeroconf.asyncio import AsyncServiceInfo
 
+from music_assistant.helpers.compare import create_safe_string
 from music_assistant.helpers.process import AsyncProcess
 from music_assistant.helpers.util import get_ip_pton
 from music_assistant.models.player_provider import PlayerProvider
@@ -32,18 +34,37 @@ from music_assistant.providers.snapcast.constants import (
     CONF_STREAM_IDLE_THRESHOLD,
     CONF_USE_EXTERNAL_SERVER,
     CONTROL_SCRIPT,
-    CONTROL_SOCKET_PATH_TEMPLATE,
     DEFAULT_SNAPSERVER_PORT,
+    MASS_ANNOUNCEMENT_POSTFIX,
+    MASS_STREAM_PREFIX,
     SNAPWEB_DIR,
 )
+from music_assistant.providers.snapcast.ma_stream import SnapcastMAStream
 from music_assistant.providers.snapcast.player import SnapCastPlayer
-from music_assistant.providers.snapcast.socket_server import SnapcastSocketServer
+from music_assistant.providers.universal_group.constants import UGP_PREFIX
+
+if TYPE_CHECKING:
+    from music_assistant_models.player import PlayerMedia
+
+    from .snap_cntrl_proto import SnapclientProto, SnapgroupProto, SnapserverProto
+
+
+async def _create_cntrl_server(
+    loop: asyncio.AbstractEventLoop,
+    host: str,
+    port: int = CONTROL_PORT,
+    reconnect: bool = False,
+) -> SnapserverProto:
+    """Server factory."""
+    server = Snapserver(loop, host, port, reconnect)
+    await server.start()
+    return cast("SnapserverProto", server)
 
 
 class SnapCastProvider(PlayerProvider):
     """SnapCastProvider."""
 
-    _snapserver: Snapserver
+    _snapserver: SnapserverProto
     _snapserver_runner: asyncio.Task[None] | None
     _snapserver_started: asyncio.Event | None
     _snapcast_server_host: str
@@ -52,7 +73,17 @@ class SnapCastProvider(PlayerProvider):
     _use_builtin_server: bool
     _stop_called: bool
     _controlscript_available: bool
-    _socket_servers: dict[str, SnapcastSocketServer]  # queue_id -> socket server
+    _snapcast_ma_streams: dict[str, SnapcastMAStream]
+    _snapcast_ma_streams_lock: asyncio.Lock
+
+    @property
+    def use_queue_control(self) -> bool:
+        """Return whether queue-based control scripts are available.
+
+        Indicates if the Snapcast control script has been successfully initialized
+        and can be used to control playback via a queue-specific control channel.
+        """
+        return self._controlscript_available
 
     async def handle_async_init(self) -> None:
         """Handle async initialization of the provider."""
@@ -61,11 +92,12 @@ class SnapCastProvider(PlayerProvider):
         self._use_builtin_server = not self.config.get_value(CONF_USE_EXTERNAL_SERVER)
         self._stop_called = False
         self._controlscript_available = False
-        self._socket_servers = {}
         if self._use_builtin_server:
             self._snapcast_server_host = "127.0.0.1"
             self._snapcast_server_control_port = DEFAULT_SNAPSERVER_PORT
-            self._snapcast_server_buffer_size = self.config.get_value(CONF_SERVER_BUFFER_SIZE)
+            self._snapcast_server_buffer_size = cast(
+                "int", self.config.get_value(CONF_SERVER_BUFFER_SIZE)
+            )
             self._snapcast_server_chunk_ms = self.config.get_value(CONF_SERVER_CHUNK_MS)
             self._snapcast_server_initial_volume = self.config.get_value(CONF_SERVER_INITIAL_VOLUME)
             self._snapcast_server_send_to_muted = self.config.get_value(
@@ -82,13 +114,16 @@ class SnapCastProvider(PlayerProvider):
         self._snapcast_stream_idle_threshold = self.config.get_value(CONF_STREAM_IDLE_THRESHOLD)
         self._ids_map = bidict({})
 
+        self._snapcast_ma_streams = {}
+        self._snapcast_ma_streams_lock = asyncio.Lock()
+
         if self._use_builtin_server:
             await self._start_builtin_server()
         else:
             self._snapserver_runner = None
             self._snapserver_started = None
         try:
-            self._snapserver = await create_server(
+            self._snapserver = await _create_cntrl_server(
                 self.mass.loop,
                 self._snapcast_server_host,
                 port=self._snapcast_server_control_port,
@@ -123,6 +158,10 @@ class SnapCastProvider(PlayerProvider):
             if player.playback_state != PlaybackState.PLAYING:
                 continue
             await player.stop()
+
+        for stream_name in list(self._snapcast_ma_streams):
+            await self.delete_ma_stream(stream_name)
+
         self._snapserver.stop()
         await self._stop_builtin_server()
 
@@ -252,9 +291,9 @@ class SnapCastProvider(PlayerProvider):
                 # The snapserver doesn't always cleanup the control script processes
                 # properly. We do it explicitly when closing a socket server.
                 # Should be fixed on the server side, though.
-                for socket_server in list(self._socket_servers.values()):
-                    await socket_server.stop()
-                self._socket_servers.clear()
+                for stream_name in list(self._snapcast_ma_streams):
+                    await self.delete_ma_stream(stream_name)
+                self._snapcast_ma_streams.clear()
                 raise
 
     def _get_ma_id(self, snap_client_id: str) -> str:
@@ -277,19 +316,16 @@ class SnapCastProvider(PlayerProvider):
             return new_id
         return self._get_ma_id(snap_client_id)
 
-    def _handle_player_init(self, snap_client: Snapclient) -> SnapCastPlayer:
+    def _handle_player_init(self, snap_client: SnapclientProto) -> SnapCastPlayer:
         """Process Snapcast add to Player controller."""
         player_id = self._generate_and_register_id(snap_client.identifier)
         player = self.mass.players.get(player_id, raise_unavailable=False)
         if not player:
-            snap_client = cast(
-                "Snapclient", self._snapserver.client(self._get_snapclient_id(player_id))
-            )
+            snap_client = self._snapserver.client(self._get_snapclient_id(player_id))
             player = SnapCastPlayer(
                 provider=self,
                 player_id=player_id,
                 snap_client=snap_client,
-                snap_client_id=self._get_snapclient_id(player_id),
             )
             player.setup()
         else:
@@ -310,22 +346,21 @@ class SnapCastProvider(PlayerProvider):
             if ma_player := self._handle_player_init(snap_client):
                 snap_client.set_callback(ma_player._handle_player_update)
         for snap_client in self._snapserver.clients:
-            if player := self.mass.players.get(self._get_ma_id(snap_client.identifier)):
-                ma_player = cast("SnapCastPlayer", player)
-                snap_client.set_callback(ma_player._handle_player_update)
-        for snap_group in self._snapserver.groups:
-            snap_group.set_callback(self._handle_group_update)
+            if player := self.get_snap_player(client_id=snap_client.identifier):
+                snap_client.set_callback(player._handle_player_update)
+        self._update_group_callbacks()
 
-    def _handle_group_update(self, snap_group: Snapgroup) -> None:
+    def poke_group_members(self, snap_group: SnapgroupProto) -> None:
         """Process Snapcast group callback."""
-        for snap_client in self._snapserver.clients:
-            if ma_player := self.mass.players.get(self._get_ma_id(snap_client.identifier)):
-                assert isinstance(ma_player, SnapCastPlayer)  # for type checking
-                ma_player._handle_player_update(snap_client)
+        for snap_client_id in snap_group.clients:
+            if ma_player := self.get_snap_player(client_id=snap_client_id):
+                ma_player.poke_player_update()
 
     def _handle_disconnect(self, exc: Exception) -> None:
         """Handle disconnect callback from snapserver."""
         if self._stop_called or self.mass.closing:
+            # prevent auto-reconnecting of snapcast controller
+            self._snapserver.stop()
             # we're instructed to stop/exit, so no need to restart the connection
             return
         self.logger.info(
@@ -345,34 +380,288 @@ class SnapCastProvider(PlayerProvider):
         else:
             self.logger.warning("Unable to remove snapclient %s: %s", player_id, error_msg)
 
-    async def get_or_create_socket_server(self, queue_id: str) -> str:
-        """Get or create a socket server for the given queue.
+    def _update_group_callbacks(self, poke: bool = False) -> None:
+        for grp in self._snapserver.groups:
+            grp.set_callback(self.poke_group_members)
+            if poke:
+                self.poke_group_members(grp)
+
+    async def ensure_player_owned_group(
+        self, ma_player_id: str, set_stream_id: str | None = None
+    ) -> SnapgroupProto | None:
+        """Ensure a Snapcast group is owned by the given player.
+
+        This method guarantees that the returned Snapcast group is *owned* by the
+        specified Music Assistant player, meaning the group name equals the
+        player's ID and the player is the group leader.
+
+        Behavior:
+        - If the player is already the leader of its current group, that group is
+        returned unchanged.
+        - If the player is a member of another group (but not the leader), the
+        player is removed from that group, which causes Snapcast to create a new
+        single-client group for the player.
+        - The resulting group is renamed to the player's ID.
 
-        :param queue_id: The queue ID to create a socket server for.
-        :return: The path to the Unix socket.
+        If `set_stream_id` is provided and a new group is created, the group's
+        stream is updated accordingly.
+
+        Args:
+            ma_player_id: Music Assistant player ID.
+            set_stream_id: Optional Snapcast stream ID to assign to the player's group.
+
+        Returns:
+            The Snapcast group owned by the player, or ``None`` if the player is not
+            currently part of any group.
         """
-        if queue_id in self._socket_servers:
-            return self._socket_servers[queue_id].socket_path
-
-        socket_path = CONTROL_SOCKET_PATH_TEMPLATE.format(queue_id=queue_id)
-        socket_server = SnapcastSocketServer(
-            mass=self.mass,
-            queue_id=queue_id,
-            socket_path=socket_path,
-            streamserver_ip=str(self.mass.streams.publish_ip),
-            streamserver_port=cast("int", self.mass.streams.publish_port),
+        player_client = self.get_snap_client(player_id=ma_player_id)
+        if player_client is None:
+            return None
+
+        curr_group = player_client.group
+
+        if curr_group is None:
+            return None
+
+        if curr_group.name == ma_player_id:
+            return curr_group
+
+        group_members = list(curr_group.clients)
+        if len(group_members) > 1 and curr_group.name:
+            # player is member of other player group, remove it, which results in a new group
+            group_members.remove(player_client.identifier)
+            res = await self._snapserver.group_clients(curr_group.identifier, group_members)
+            if not (isinstance(res, dict) and "server" in res):
+                raise RuntimeError("Couldn't remove client from group")
+            self._snapserver.synchronize(res)
+            curr_group = player_client.group
+            if curr_group is None:
+                return None
+            if set_stream_id:
+                await curr_group.set_stream(set_stream_id)
+
+        await curr_group.set_name(ma_player_id)
+        return curr_group
+
+    async def isolate_player_to_dedicated_group(
+        self,
+        target_player_id: str,
+        target_stream_id: str | None = None,
+        others_stream_id: str | None = "default",
+    ) -> None:
+        """Isolate a player into a dedicated Snapcast group.
+
+        Ensures that the target player ends up in a group where it is the sole
+        member and group leader.
+
+        Behavior:
+        - The target player is first ensured to own its group.
+        - All other members of that group are removed.
+        - Each removed player is placed into its own dedicated group.
+        - Removed players' groups are optionally assigned `others_stream_id`.
+        - The target group is optionally assigned `target_stream_id`.
+
+        Callbacks for affected clients and groups are temporarily disabled during
+        the operation to avoid intermediate state updates.
+
+        Args:
+            target_player_id: Music Assistant player ID to isolate.
+            target_stream_id: Optional stream ID to assign to the target player's group.
+            others_stream_id: Stream ID assigned to newly created groups for removed players.
+        """
+        this_client_id = self._get_snapclient_id(target_player_id)
+        target_group = await self.ensure_player_owned_group(
+            target_player_id, set_stream_id=target_stream_id
         )
-        await socket_server.start()
-        self._socket_servers[queue_id] = socket_server
-        self.logger.debug("Created socket server for queue %s at %s", queue_id, socket_path)
-        return socket_path
 
-    async def stop_socket_server(self, queue_id: str) -> None:
-        """Stop and remove the socket server for the given queue.
+        if target_group is None:
+            return
+
+        target_group.set_callback(None)
+        group_members = list(target_group.clients)
+        group_members.remove(this_client_id)
+        for client_id in group_members:
+            client = self._snapserver.client(client_id)
+            client.set_callback(None)
+        if group_members:
+            res = await self._snapserver.group_clients(target_group.identifier, [this_client_id])
+            if not (isinstance(res, dict) and "server" in res):
+                raise RuntimeError("Couldn't remove client from group")
+            self._snapserver.synchronize(res)
+            for client_id in group_members:
+                ma_player_id = self._get_ma_id(client_id)
+                if ma_player := cast("SnapCastPlayer", self.mass.players.get(ma_player_id)):
+                    client = self._snapserver.client(client_id)
+                    if client is not None:
+                        if client.group is not None:
+                            await client.group.set_name(ma_player_id)
+                            if others_stream_id:
+                                await client.group.set_stream(others_stream_id)
+                        client.set_callback(ma_player._handle_player_update)
+
+        if target_stream_id is not None:
+            await target_group.set_stream(target_stream_id)
+
+    async def get_snapcast_media_stream(
+        self,
+        media: PlayerMedia,
+        filter_settings_owner: str | None = None,
+        existing_only: bool = False,
+    ) -> SnapcastMAStream | None:
+        """Get or create a Snapcast Music Assistant stream for the given media.
 
-        :param queue_id: The queue ID to stop the socket server for.
+        Determines a deterministic Snapcast stream name based on the media type
+        and source, and either returns an existing stream or creates a new one.
+
+        Behavior:
+        - Announcement and generic media streams use a hashed name.
+        - Plugin and queue-backed sources reuse a stable stream name.
+        - Queue-backed streams may persist across playback sessions.
+        - If `existing_only` is True, no new stream will be created.
+
+        Newly created streams are registered with the Snapcast server and fully
+        set up before being returned.
+
+        Args:
+            media: Media item to stream.
+            filter_settings_owner: Optional player/entity ID used to resolve DSP filters.
+            existing_only: If True, only return an existing stream.
+
+        Returns:
+            A ``SnapcastMAStream`` instance, or ``None`` if no stream exists and
+            `existing_only` is True.
+        """
+        stream_name: str = ""
+        name_suffix: str = ""
+        queue_id: str | None = None
+        source_id: str | None = None
+        destroy_on_stop = True
+
+        if media.media_type == MediaType.ANNOUNCEMENT:
+            stream_name += hashlib.md5(media.uri.encode()).hexdigest()[:6]
+            name_suffix = MASS_ANNOUNCEMENT_POSTFIX
+        elif media.media_type == MediaType.PLUGIN_SOURCE:
+            custom_data = media.custom_data or {}
+            plugin: str = media.title or custom_data.get("provider") or ""
+            player: str = f" {custom_data.get('player_id', '')}"
+            stream_name += f"{plugin} {player}"
+            source_id = custom_data.get("source_id")
+        elif media.source_id and media.source_id.startswith(UGP_PREFIX):
+            stream_name += media.source_id
+        elif media.source_id and media.queue_item_id:
+            stream_name += media.source_id
+            queue_id = media.source_id
+            source_id = media.source_id
+            destroy_on_stop = False
+        else:
+            stream_name += hashlib.md5(media.uri.encode()).hexdigest()[:6]
+
+        stream_name = create_safe_string(stream_name, lowercase=False)
+        stream_name = f"{MASS_STREAM_PREFIX}{stream_name}{name_suffix}"
+        async with self._snapcast_ma_streams_lock:
+            if not (stream := self._snapcast_ma_streams.get(stream_name)):
+                if existing_only:
+                    return None
+
+                stream = SnapcastMAStream(
+                    provider=self,
+                    media=media,
+                    stream_name=stream_name,
+                    filter_settings_owner=filter_settings_owner,
+                    source_id=source_id,
+                    use_cntrl_script=bool(queue_id) and self.use_queue_control,
+                    destroy_on_stop=destroy_on_stop,
+                )
+                self._snapcast_ma_streams[stream_name] = stream
+            else:
+                stream.update_media(media)
+        await stream.setup()
+        return stream
+
+    def get_snap_ma_stream(self, stream_name: str) -> SnapcastMAStream | None:
+        """Return an existing Music Assistant Snapcast stream by name.
+
+        Args:
+            stream_name: Snapcast stream name.
+
+        Returns:
+            The corresponding ``SnapcastMAStream`` instance, or ``None`` if not found.
+        """
+        return self._snapcast_ma_streams.get(stream_name)
+
+    async def delete_ma_stream(self, stream_name: str) -> None:
+        """Remove and destroy a Music Assistant Snapcast stream.
+
+        The stream is removed from internal tracking and its resources are
+        destroyed asynchronously. Errors during destruction are logged but
+        otherwise ignored.
+
+        Args:
+            stream_name: Snapcast stream name to delete.
         """
-        if queue_id in self._socket_servers:
-            await self._socket_servers[queue_id].stop()
-            del self._socket_servers[queue_id]
-            self.logger.debug("Stopped socket server for queue %s", queue_id)
+        async with self._snapcast_ma_streams_lock:
+            stream = self._snapcast_ma_streams.pop(stream_name, None)
+
+        if not stream:
+            return
+
+        try:
+            await stream.destroy()
+        except Exception:
+            self.logger.exception("Failed to destroy stream session %s", stream_name)
+
+    def update_stream_usage(self) -> None:
+        """Update usage state for all tracked Snapcast streams.
+
+        Marks streams as "in use" if they are currently assigned to any Snapcast
+        group, and schedules unused streams for delayed shutdown.
+
+        This method should be called whenever group or stream assignments change
+        on the Snapcast server.
+        """
+        unused_streams = set(self._snapcast_ma_streams.keys())
+        for grp in self._snapserver.groups:
+            stream_id = grp.stream
+            if stream_id in self._snapcast_ma_streams:
+                ma_stream = self._snapcast_ma_streams[stream_id]
+                ma_stream.set_in_use(True)
+                unused_streams.discard(stream_id)
+
+            if not unused_streams:
+                break
+
+        for stream_id in unused_streams:
+            self._snapcast_ma_streams[stream_id].set_in_use(False)
+
+    def get_snap_client(
+        self, *, client_id: str | None = None, player_id: str | None = None
+    ) -> SnapclientProto | None:
+        """Return the snapclient for either given client_id or player_id."""
+        if player_id is not None:
+            if client_id is not None and client_id != self._get_snapclient_id(client_id):
+                raise ValueError("provided client_id and player_id do not match")
+            client_id = self._get_snapclient_id(player_id)
+
+        if client_id:
+            with suppress(KeyError):
+                return self._snapserver.client(client_id)
+
+        return None
+
+    def get_snap_player(
+        self, *, client_id: str | None = None, player_id: str | None = None
+    ) -> SnapCastPlayer | None:
+        """Return the MA SnapCastPlayer for either given client_id or player_id."""
+        if client_id is not None:
+            if player_id is not None and player_id != self._get_ma_id(client_id):
+                raise ValueError("provided client_id and player_id do not match")
+            player_id = self._get_ma_id(client_id)
+
+        if player_id is None:
+            return None
+
+        if ma_player := self.mass.players.get(player_id):
+            assert isinstance(ma_player, SnapCastPlayer)  # for type checking
+            return ma_player
+
+        return None
diff --git a/music_assistant/providers/snapcast/snap_cntrl_proto.py b/music_assistant/providers/snapcast/snap_cntrl_proto.py
new file mode 100644 (file)
index 0000000..9593345
--- /dev/null
@@ -0,0 +1,124 @@
+# ruff: noqa: D100, D101, D102
+
+from __future__ import annotations
+
+from collections.abc import Callable
+from typing import Any, Protocol
+
+type RetVal = dict[str, Any] | None
+type RetTuple = tuple[RetVal, RetVal]
+
+SnapclientCallback = Callable[["SnapclientProto"], Any]
+SnapgroupCallback = Callable[["SnapgroupProto"], Any]
+SnapstreamCallback = Callable[["SnapstreamProto"], Any]
+
+
+class SnapgroupProto(Protocol):
+    @property
+    def identifier(self) -> str: ...
+    @property
+    def name(self) -> str: ...
+    @property
+    def clients(self) -> list[str]: ...
+    @property
+    def stream(self) -> str: ...
+    def callback(self) -> None: ...
+    def set_callback(self, func: SnapgroupCallback | None) -> None: ...
+    async def set_name(self, name: str) -> RetVal: ...
+    async def set_stream(self, stream_id: str) -> RetVal: ...
+    async def add_client(self, client_identifier: str) -> None: ...
+
+
+class SnapclientProto(Protocol):
+    # read-only properties used by your code
+    @property
+    def identifier(self) -> str: ...
+
+    @property
+    def group(self) -> SnapgroupProto | None: ...
+
+    @property
+    def friendly_name(self) -> str: ...
+
+    @property
+    def connected(self) -> bool: ...
+
+    @property
+    def name(self) -> str: ...
+
+    @property
+    def latency(self) -> int: ...
+
+    @property
+    def muted(self) -> bool: ...
+
+    @property
+    def volume(self) -> int: ...
+
+    _client: dict[str, Any]
+
+    def groups_available(self) -> list[SnapgroupProto]: ...
+
+    async def set_muted(self, status: bool) -> None: ...
+    async def set_volume(self, percent: int, update_group: bool = True) -> None: ...
+
+    def callback(self) -> None: ...
+    def set_callback(self, func: SnapclientCallback | None) -> None: ...
+
+
+class SnapstreamProto(Protocol):
+    _stream: dict[str, Any]
+
+    @property
+    def identifier(self) -> str: ...
+    @property
+    def friendly_name(self) -> str: ...
+    @property
+    def status(self) -> str: ...
+    @property
+    def path(self) -> str | None: ...
+    def callback(self) -> None: ...
+    def set_callback(self, func: SnapstreamCallback | None) -> None: ...
+
+
+class SnapserverProto(Protocol):
+    # lifecycle
+    async def start(self) -> Any: ...
+    def stop(self) -> None: ...
+
+    # collections (as in the external lib: list(...) views)
+    @property
+    def groups(self) -> list[SnapgroupProto]: ...
+    @property
+    def clients(self) -> list[SnapclientProto]: ...
+    @property
+    def streams(self) -> list[SnapstreamProto]: ...
+
+    # accessors
+    def group(self, group_identifier: str) -> SnapgroupProto: ...
+    def client(self, client_identifier: str) -> SnapclientProto: ...
+    def stream(self, stream_identifier: str) -> SnapstreamProto: ...
+
+    async def delete_client(self, identifier: str) -> RetTuple: ...
+
+    async def status(self) -> tuple[Any, Any]: ...
+
+    async def group_clients(self, identifier: str, clients: list[str]) -> RetVal: ...
+    async def group_name(self, identifier: str, name: str) -> RetVal: ...
+    async def group_stream(self, identifier: str, stream_id: str) -> RetVal: ...
+
+    async def client_name(self, identifier: str, name: str) -> RetVal: ...
+    async def client_latency(self, identifier: str, latency: int) -> RetVal: ...
+    async def client_volume(self, identifier: str, volume: dict[str, Any]) -> RetVal: ...
+
+    async def stream_add_stream(self, stream_uri: str) -> RetVal: ...
+    async def stream_remove_stream(self, identifier: str) -> RetVal: ...
+
+    # state sync
+    def synchronize(self, status: dict[str, Any]) -> None: ...
+
+    # callbacks (if you use them)
+    def set_on_update_callback(self, func: Callable[[], Any] | None) -> None: ...
+    def set_on_connect_callback(self, func: Callable[[], Any] | None) -> None: ...
+    def set_on_disconnect_callback(self, func: Callable[[Exception], Any] | None) -> None: ...
+    def set_new_client_callback(self, func: Callable[[SnapclientProto], Any] | None) -> None: ...