Update Sendspin provider to version 4.0 with many improvements (#3158)
authorMaxim Raznatovski <nda.mr43@gmail.com>
Tue, 17 Feb 2026 21:46:52 +0000 (22:46 +0100)
committerGitHub <noreply@github.com>
Tue, 17 Feb 2026 21:46:52 +0000 (22:46 +0100)
music_assistant/providers/sendspin/README.md
music_assistant/providers/sendspin/__init__.py
music_assistant/providers/sendspin/manifest.json
music_assistant/providers/sendspin/playback.py [new file with mode: 0644]
music_assistant/providers/sendspin/player.py
music_assistant/providers/sendspin/provider.py
music_assistant/providers/sendspin/timed_client_stream.py [deleted file]
requirements_all.txt

index 24bd15845cb05f59f4b5daa44f84dfbee3649096..7dd50b929d533280612cf2b755b0b49b7185b8d8 100644 (file)
@@ -198,7 +198,7 @@ Sendspin players support:
 |------|-------------|
 | `provider.py` | Main provider class, handles WebRTC signaling and server lifecycle |
 | `player.py` | Player implementation with playback, grouping, and metadata handling |
-| `timed_client_stream.py` | Multi-client audio stream distribution with timing |
+| `playback.py` | Playback pipeline with DSP channel processing and timed frame commits |
 | `__init__.py` | Provider setup and configuration |
 | `manifest.json` | Provider metadata |
 
index 31686d48721d5b2a9ae1391014e41d4f23928ff1..2108f1b58e9279c85c1b29ebf37f8bd5c2caa5d8 100644 (file)
@@ -8,7 +8,7 @@ from __future__ import annotations
 
 from typing import TYPE_CHECKING
 
-from .provider import SendspinProvider
+from music_assistant.providers.sendspin.provider import SendspinProvider
 
 if TYPE_CHECKING:
     from music_assistant_models.config_entries import ConfigEntry, ConfigValueType, ProviderConfig
index c25c835005333d451940ab09b6b81421f5e6ca4f..1e945ecbb6f9127baccd908768ec11441196f13c 100644 (file)
@@ -7,7 +7,7 @@
   "documentation": "https://music-assistant.io/player-support/sendspin/",
   "codeowners": ["@music-assistant"],
   "credits": ["[Sendspin](https://sendspin-audio.com)"],
-  "requirements": ["aiosendspin==3.0.0", "av==16.1.0"],
+  "requirements": ["aiosendspin==4.0.1", "av==16.1.0"],
   "builtin": true,
   "allow_disable": false
 }
diff --git a/music_assistant/providers/sendspin/playback.py b/music_assistant/providers/sendspin/playback.py
new file mode 100644 (file)
index 0000000..a8ff2e8
--- /dev/null
@@ -0,0 +1,1154 @@
+"""Playback session coordinator for Sendspin players."""
+
+from __future__ import annotations
+
+import asyncio
+import time
+from collections import deque
+from collections.abc import Iterator
+from contextlib import suppress
+from dataclasses import dataclass, field
+from typing import TYPE_CHECKING, Any
+from uuid import UUID, uuid4
+
+from aiosendspin.server.audio import AudioFormat as SendspinAudioFormat
+from aiosendspin.server.push_stream import MAIN_CHANNEL, PushStream
+from music_assistant_models.enums import ContentType
+from music_assistant_models.media_items.audio_format import AudioFormat
+
+from music_assistant.constants import CONF_OUTPUT_CHANNELS
+from music_assistant.helpers.audio import get_player_filter_params
+from music_assistant.helpers.ffmpeg import FFMpeg
+from music_assistant.models.player import PlayerMedia
+
+if TYPE_CHECKING:
+    from .player import SendspinPlayer
+
+
+# Same sample format expressed in both MA and Sendspin type systems.
+_PCM_FORMAT = AudioFormat(
+    content_type=ContentType.PCM_S32LE,
+    sample_rate=48000,
+    bit_depth=32,
+    channels=2,
+)
+_SENDSPIN_PCM_FORMAT = SendspinAudioFormat(
+    sample_rate=48000,
+    bit_depth=32,
+    channels=2,
+)
+# Max PCM slice fed to the producer per iteration.
+_PRODUCER_SLICE_US = 100_000
+# Max pending chunks between producer and committer before the producer blocks.
+_PRODUCER_BACKLOG_SIZE = 64
+# Backpressure threshold: push stream sleeps when buffered audio exceeds this.
+_PRODUCER_BUFFER_LIMIT_US = 30_000_000
+# Start join promotion once catchup processor lag is within this window of the history tail.
+_JOIN_PROMOTE_ARM_WINDOW_US = 2_000_000
+# Accept catchup output within this margin of the promotion target.
+_JOIN_PROMOTE_TOLERANCE_US = 50_000
+# Abort join catchup if promotion hasn't completed within this.
+_JOIN_PROMOTION_TIMEOUT_S = 15.0
+# Retain committed history this far behind real-time for late-join backfill.
+# This pre-history also warms up ffmpeg's internal filter buffers so the DSP
+# output has settled by the time the member's channel goes live.
+_HISTORY_KEEP_PAST_US = 1_000_000
+
+
+class _BufferedFfmpegProcessor:
+    """FFmpeg wrapper with small output carry-over buffer and duration-based reads."""
+
+    def __init__(self, ffmpeg: FFMpeg, audio_format: AudioFormat) -> None:
+        self._ffmpeg = ffmpeg
+        self._output_buffer = bytearray()
+        bytes_per_sample = max(1, int(audio_format.bit_depth // 8))
+        self._sample_rate = int(audio_format.sample_rate)
+        self._frame_size = bytes_per_sample * int(audio_format.channels)
+        self._bytes_per_second = self._sample_rate * self._frame_size
+        # ~25ms worth of audio per read syscall.
+        self._read_quantum_bytes = max(1, int(self._bytes_per_second * 0.025))
+        self._produced_output_us = 0
+
+    async def start(self) -> None:
+        await self._ffmpeg.start()
+
+    async def close(self) -> None:
+        await self._ffmpeg.close()
+
+    async def push(self, pcm: bytes) -> None:
+        await self._ffmpeg.write(pcm)
+
+    @property
+    def produced_output_us(self) -> int:
+        """Return cumulative output duration currently drained from ffmpeg."""
+        return self._produced_output_us
+
+    async def read_duration_us(self, duration_us: int) -> bytes:
+        """Block-read exactly `duration_us` worth of processed PCM from ffmpeg."""
+        target_bytes = self._target_bytes_for_duration_us(duration_us)
+        if target_bytes == 0:
+            return b""
+
+        while len(self._output_buffer) < target_bytes:
+            missing = target_bytes - len(self._output_buffer)
+            read_size = max(self._read_quantum_bytes, missing)
+            chunk = await self._ffmpeg.readexactly(read_size)
+            self._output_buffer.extend(chunk)
+
+        out = bytes(self._output_buffer[:target_bytes])
+        del self._output_buffer[:target_bytes]
+        return out
+
+    async def drain_available(self) -> int:
+        """Non-blocking drain of ffmpeg stdout into internal buffer.
+
+        Returns cumulative produced output duration in microseconds.
+        """
+        while True:
+            try:
+                # 1ms timeout: non-blocking check for available data.
+                chunk = await asyncio.wait_for(
+                    self._ffmpeg.read(self._read_quantum_bytes),
+                    timeout=0.001,
+                )
+            except TimeoutError:
+                break
+            if not chunk:
+                break
+            self._output_buffer.extend(chunk)
+            self._produced_output_us += self._duration_us_for_bytes(len(chunk))
+            if len(chunk) < self._read_quantum_bytes:
+                break
+        return self._produced_output_us
+
+    async def drain_forever(self) -> None:
+        """Continuously drain ffmpeg stdout into internal buffer until EOF."""
+        while True:
+            chunk = await self._ffmpeg.read(self._read_quantum_bytes)
+            if not chunk:
+                break
+            self._output_buffer.extend(chunk)
+            self._produced_output_us += self._duration_us_for_bytes(len(chunk))
+
+    def pop_duration_us(self, duration_us: int) -> bytes | None:
+        """Pop exactly `duration_us` from already buffered output, or None if insufficient."""
+        target_bytes = self._target_bytes_for_duration_us(duration_us)
+        if target_bytes == 0:
+            return b""
+        if len(self._output_buffer) < target_bytes:
+            return None
+        out = bytes(self._output_buffer[:target_bytes])
+        del self._output_buffer[:target_bytes]
+        return out
+
+    def buffered_duration_us(self) -> int:
+        """Return buffered output duration currently available for immediate pop."""
+        return self._duration_us_for_bytes(len(self._output_buffer))
+
+    def pop_duration_us_or_pad(self, duration_us: int, pad_tolerance_us: int) -> bytes | None:
+        """Pop target duration; if short within tolerance, pad tail with silence."""
+        target_bytes = self._target_bytes_for_duration_us(duration_us)
+        if target_bytes == 0:
+            return b""
+        available = len(self._output_buffer)
+        if available >= target_bytes:
+            out = bytes(self._output_buffer[:target_bytes])
+            del self._output_buffer[:target_bytes]
+            return out
+        short_bytes = target_bytes - available
+        short_us = self._duration_us_for_bytes(short_bytes)
+        if short_us > max(0, pad_tolerance_us):
+            return None
+        out = bytes(self._output_buffer)
+        self._output_buffer.clear()
+        return out + (b"\x00" * short_bytes)
+
+    def _duration_us_for_bytes(self, byte_count: int) -> int:
+        if byte_count <= 0 or self._sample_rate <= 0 or self._frame_size <= 0:
+            return 0
+        frames = byte_count // self._frame_size
+        if frames <= 0:
+            return 0
+        return int((frames * 1_000_000) / self._sample_rate)
+
+    def _target_bytes_for_duration_us(self, duration_us: int) -> int:
+        """Convert duration to frame-aligned PCM byte count."""
+        if duration_us <= 0 or self._sample_rate <= 0 or self._frame_size <= 0:
+            return 0
+        samples = max(0, int((duration_us * self._sample_rate + 500_000) / 1_000_000))
+        return samples * self._frame_size
+
+
+@dataclass(slots=True)
+class _HistoryChunk:
+    start_time_us: int
+    duration_us: int
+    pcm: bytes
+
+
+@dataclass(slots=True)
+class _PendingChunk:
+    pcm: bytes
+    duration_us: int
+
+
+@dataclass(slots=True)
+class _JoinCatchupState:
+    """Per-member state for a join-catchup processor replaying history through DSP.
+
+    The processor is fed historical + live PCM via ``input_queue``.  Once its
+    output catches up to the live stream (within tolerance), it is promoted to
+    the member's live pipeline.  See ``_inject_ready_join_historical`` for the
+    full promotion lifecycle.
+    """
+
+    processor: _BufferedFfmpegProcessor
+    input_queue: asyncio.Queue[bytes | None]
+    writer_task: asyncio.Task[None]
+    drainer_task: asyncio.Task[None]
+    snapshot_task: asyncio.Task[None] | None = None
+    # Timeline position of the first history chunk fed into the processor.
+    first_history_start_us: int | None = None
+    # Timeline position up to which PCM has been enqueued into the processor.
+    fed_until_us: int | None = None
+    # End of the history snapshot taken when catchup started.
+    history_end_us: int | None = None
+    # Locked target: once set, promotion fires when output reaches this point.
+    promotion_target_end_us: int | None = None
+    # Monotonic time when promotion was armed, used for timeout detection.
+    promotion_armed_monotonic_s: float | None = None
+    write_lock: asyncio.Lock = field(default_factory=asyncio.Lock)
+
+
+@dataclass(slots=True)
+class _PipelineConfig:
+    requires_transform: bool
+    output_channels: str
+    filter_params: tuple[str, ...]
+
+    @property
+    def signature(self) -> tuple[bool, str, tuple[str, ...]]:
+        return (self.requires_transform, self.output_channels, self.filter_params)
+
+
+@dataclass(slots=True)
+class _MemberPipeline:
+    player_id: str
+    channel_id: UUID
+    config: _PipelineConfig
+    processor: _BufferedFfmpegProcessor | None = None
+    ready: bool = False
+
+
+class SendspinPlaybackSession:
+    """Coordinates playback for a Sendspin player group leader.
+
+    The push stream supports multi-channel audio: members that need per-player
+    DSP (EQ, channel mixing, output routing) each get a dedicated ffmpeg
+    processor and a separate channel. Members without DSP share MAIN_CHANNEL
+    and receive the raw PCM directly.
+
+    Playback runs as two concurrent coroutines inside ``_run_playback``:
+
+    * **Producer** -- reads PCM from the MA stream, slices it into fixed-size
+      chunks, queues them, and writes each slice into per-member ffmpeg
+      processors (transform push) in parallel.
+    * **Consumer** -- dequeues chunks, reads the corresponding transformed
+      output from each processor (transform read), prepares all channels on
+      the push stream, commits audio, and applies backpressure via
+      ``sleep_to_limit_buffer``.
+
+    When a new member joins mid-playback, a *join-catchup* processor replays
+    committed history through the member's DSP chain so it can be promoted
+    to the live pipeline without an audible gap.
+    """
+
+    def __init__(self, player: SendspinPlayer) -> None:
+        """Initialize session coordinator bound to the owning player."""
+        self.player = player
+        self.playback_task: asyncio.Task[None] | None = None
+        self.pending_join_members: set[str] = set()
+        self._state_lock = asyncio.Lock()
+        self._members: set[str] = set()
+        self._member_pipelines: dict[str, _MemberPipeline] = {}
+        self._push_stream: PushStream | None = None
+        self._playback_running = False
+        self._timeline_start_us: int | None = None
+        self._first_commit_monotonic_us: int | None = None
+        self._produced_audio_us = 0
+        self._history: deque[_HistoryChunk] = deque()
+        self._join_catchup: dict[str, _JoinCatchupState] = {}
+        self._pipeline_config_cache: dict[str, _PipelineConfig] = {}
+        self._preassigned_channels: dict[str, UUID] = {}
+        self._mapping_dirty = True
+
+    # -- Helpers ---------------------------------------------------------------
+
+    def _attach_task_exception_logger(self, task: asyncio.Task[Any], name: str) -> None:
+        """Log unhandled exception from background task when it finishes."""
+
+        def _done_callback(done_task: asyncio.Task[Any]) -> None:
+            if done_task.cancelled():
+                return
+            with suppress(Exception):
+                exc = done_task.exception()
+                if exc is not None:
+                    self.player.logger.exception(
+                        "Background task failed: %s",
+                        name,
+                        exc_info=exc,
+                    )
+
+        task.add_done_callback(_done_callback)
+
+    def _get_join_readiness(self) -> tuple[bool, str | None]:
+        """Check whether live join DSP preparation can be performed right now."""
+        if self._playback_running and self._push_stream is not None:
+            return (True, None)
+        return (False, "no active stream context")
+
+    # -- Snapshot helper -------------------------------------------------------
+
+    async def _snapshot_active_pipelines(
+        self,
+    ) -> tuple[set[str], tuple[tuple[str, _MemberPipeline], ...]]:
+        """Return (join_pending_ids, active_pipelines) under lock."""
+        async with self._state_lock:
+            members = self._members
+            return set(self._join_catchup), tuple(
+                (mid, p) for mid, p in self._member_pipelines.items() if mid in members
+            )
+
+    # -- Public API ------------------------------------------------------------
+
+    async def cancel(self, reason: str) -> None:
+        """Cancel and await the active playback task, if any."""
+        task = self.playback_task
+        if task is None:
+            return
+        if task.done():
+            if self.playback_task is task:
+                self.playback_task = None
+            return
+        self.player.logger.debug("Cancelling playback task (%s)", reason)
+        task.cancel()
+        with suppress(asyncio.CancelledError, Exception):
+            await task
+        if self.playback_task is task:
+            self.playback_task = None
+
+    async def start(self, media: PlayerMedia, restart: bool = False) -> None:
+        """Start background playback for `media`."""
+        active_task = self.playback_task
+        if active_task is not None and not active_task.done():
+            if not restart:
+                raise RuntimeError("playback already active")
+            await self.cancel("restart requested")
+        self.playback_task = asyncio.create_task(self._run_playback(media))
+
+    async def close(self) -> None:
+        """Stop playback and release all managed resources."""
+        await self.cancel("session close")
+        self.pending_join_members.clear()
+        async with self._state_lock:
+            self._members.clear()
+            self._mapping_dirty = True
+        await self._clear_member_pipelines()
+        await self._clear_join_catchup()
+        async with self._state_lock:
+            self._history.clear()
+            self._produced_audio_us = 0
+            self._timeline_start_us = None
+            self._first_commit_monotonic_us = None
+            self._pipeline_config_cache.clear()
+            self._preassigned_channels.clear()
+
+    async def add_member(self, player_id: str) -> None:
+        """Add a member to the group with DSP-aware lifecycle handling."""
+        async with self._state_lock:
+            if player_id in self._members:
+                self.pending_join_members.discard(player_id)
+                return
+            # Force a fresh channel identity for every new join cycle.
+            self._preassigned_channels[player_id] = uuid4()
+        self.pending_join_members.add(player_id)
+        try:
+            await self._start_join_catchup(player_id)
+            async with self._state_lock:
+                self._members.add(player_id)
+                self._mapping_dirty = True
+        except Exception:
+            await self._release_player_channel(player_id)
+            raise
+        finally:
+            self.pending_join_members.discard(player_id)
+
+    async def remove_member(self, player_id: str) -> None:
+        """Remove a member from the group and clean up per-member playback state."""
+        self.pending_join_members.discard(player_id)
+        async with self._state_lock:
+            self._members.discard(player_id)
+            self._mapping_dirty = True
+            self._pipeline_config_cache.pop(player_id, None)
+            self._preassigned_channels.pop(player_id, None)
+        await self._stop_join_catchup(player_id)
+        await self._release_player_channel(player_id)
+
+    async def sync_members(self, member_ids: set[str]) -> None:
+        """Reconcile session members to exactly the provided set."""
+        async with self._state_lock:
+            current_members = set(self._members)
+        for player_id in current_members - member_ids:
+            await self.remove_member(player_id)
+        for player_id in member_ids - current_members:
+            await self.add_member(player_id)
+
+    # -- Join catchup ----------------------------------------------------------
+
+    async def _start_join_catchup(self, player_id: str) -> None:
+        """Start dedicated join catchup processor fed from committed history."""
+        async with self._state_lock:
+            playback_active = self._playback_running and self._push_stream is not None
+        if not playback_active:
+            return
+
+        pipeline = await self._sync_member_pipeline(player_id)
+        if not pipeline.config.requires_transform:
+            return
+
+        await self._stop_join_catchup(player_id)
+
+        ffmpeg_obj = self._create_member_ffmpeg(pipeline.config.filter_params)
+        processor = _BufferedFfmpegProcessor(ffmpeg_obj, _PCM_FORMAT)
+        await processor.start()
+        # Bounded queue sized to hold the full buffer duration with some headroom.
+        queue_size = (_PRODUCER_BUFFER_LIMIT_US // _PRODUCER_SLICE_US) + _PRODUCER_BACKLOG_SIZE
+        input_queue: asyncio.Queue[bytes | None] = asyncio.Queue(maxsize=queue_size)
+
+        async with self._state_lock:
+            history_snapshot = list(self._history)
+        if not history_snapshot:
+            await processor.close()
+            return
+        history_end_us = history_snapshot[-1].start_time_us + history_snapshot[-1].duration_us
+
+        async def _writer() -> None:
+            while True:
+                chunk = await input_queue.get()
+                if chunk is None:
+                    return
+                await processor.push(chunk)
+
+        async def _drainer() -> None:
+            await processor.drain_forever()
+
+        writer_task = asyncio.create_task(_writer())
+        drainer_task = asyncio.create_task(_drainer())
+        self._attach_task_exception_logger(writer_task, f"join_writer:{player_id}")
+        self._attach_task_exception_logger(drainer_task, f"join_drainer:{player_id}")
+
+        state = _JoinCatchupState(
+            processor=processor,
+            input_queue=input_queue,
+            writer_task=writer_task,
+            drainer_task=drainer_task,
+            history_end_us=history_end_us,
+        )
+        async with self._state_lock:
+            self._join_catchup[player_id] = state
+
+        async with self._state_lock:
+            current = self._join_catchup.get(player_id)
+            if current is not None and current.processor is processor:
+                current.snapshot_task = asyncio.create_task(
+                    self._feed_join_history(player_id, processor, history_snapshot)
+                )
+                self._attach_task_exception_logger(
+                    current.snapshot_task, f"join_snapshot:{player_id}"
+                )
+
+    async def _feed_join_history(
+        self,
+        player_id: str,
+        processor: _BufferedFfmpegProcessor,
+        history_snapshot: list[_HistoryChunk],
+    ) -> None:
+        """Feed historical PCM into a join-catchup processor."""
+        async with self._state_lock:
+            state = self._join_catchup.get(player_id)
+        if state is None or state.processor is not processor:
+            return
+        async with state.write_lock:
+            first_history_start_us: int | None = None
+            previous_end_us: int | None = None
+            for hist_chunk in history_snapshot:
+                if first_history_start_us is None:
+                    first_history_start_us = hist_chunk.start_time_us
+                    async with self._state_lock:
+                        current = self._join_catchup.get(player_id)
+                        if current is not None and current.processor is processor:
+                            current.first_history_start_us = first_history_start_us
+                            current.fed_until_us = first_history_start_us
+                if previous_end_us is not None and hist_chunk.start_time_us > previous_end_us:
+                    gap_us = hist_chunk.start_time_us - previous_end_us
+                    silence = self._silence_for_duration_us(gap_us)
+                    if silence:
+                        await self._enqueue_join_pcm(state, silence)
+                await self._enqueue_join_pcm(state, hist_chunk.pcm)
+                previous_end_us = hist_chunk.start_time_us + hist_chunk.duration_us
+                async with self._state_lock:
+                    current = self._join_catchup.get(player_id)
+                    if current is not None and current.processor is processor:
+                        current.fed_until_us = previous_end_us
+
+    async def _stop_join_catchup(self, player_id: str) -> None:
+        """Stop and remove dedicated join catchup processor for one player."""
+        async with self._state_lock:
+            state = self._join_catchup.pop(player_id, None)
+        if state is None:
+            return
+        if state.snapshot_task is not None:
+            state.snapshot_task.cancel()
+            with suppress(asyncio.CancelledError, Exception):
+                await state.snapshot_task
+        state.writer_task.cancel()
+        with suppress(asyncio.CancelledError, Exception):
+            await state.writer_task
+        state.drainer_task.cancel()
+        with suppress(asyncio.CancelledError, Exception):
+            await state.drainer_task
+        with suppress(Exception):
+            await state.processor.close()
+
+    async def _promote_join_catchup_processor(
+        self,
+        player_id: str,
+        pipeline: _MemberPipeline,
+        target_end_us: int,
+    ) -> None:
+        """Promote join catchup processor to the member's live DSP processor."""
+        old_processor: _BufferedFfmpegProcessor | None = None
+        async with self._state_lock:
+            state = self._join_catchup.pop(player_id, None)
+            if state is None:
+                return
+            old_processor = pipeline.processor
+            pipeline.processor = state.processor
+        if state.snapshot_task is not None:
+            state.snapshot_task.cancel()
+            with suppress(asyncio.CancelledError, Exception):
+                await state.snapshot_task
+        # Let writer flush queued PCM before handoff; cancel if queue is full.
+        try:
+            state.input_queue.put_nowait(None)
+        except asyncio.QueueFull:
+            state.writer_task.cancel()
+        with suppress(asyncio.CancelledError, Exception):
+            await state.writer_task
+        state.drainer_task.cancel()
+        with suppress(asyncio.CancelledError, Exception):
+            await state.drainer_task
+        if old_processor is not None and old_processor is not state.processor:
+            with suppress(Exception):
+                await old_processor.close()
+
+    async def _clear_join_catchup(self) -> None:
+        """Stop and remove all dedicated join catchup processors."""
+        async with self._state_lock:
+            player_ids = list(self._join_catchup.keys())
+        for player_id in player_ids:
+            await self._stop_join_catchup(player_id)
+
+    async def _release_player_channel(self, player_id: str) -> None:
+        """Release per-member channel/DSP state for a removed member."""
+        async with self._state_lock:
+            pipeline = self._member_pipelines.pop(player_id, None)
+            self._preassigned_channels.pop(player_id, None)
+        if pipeline is None or pipeline.processor is None:
+            return
+        await self._close_member_ffmpeg(pipeline.processor)
+
+    # -- Playback pipeline -----------------------------------------------------
+
+    async def _run_playback(self, media: PlayerMedia) -> None:  # noqa: PLR0915
+        """Run the playback pipeline for a single media session.
+
+        Pulls PCM from the MA stream, feeds main + per-member DSP channels into the
+        Sendspin push stream, and commits audio continuously. Supports dynamic group
+        membership changes and late-join historical backfill while running.
+        """
+        push_stream = self._create_push_stream()
+        async with self._state_lock:
+            self._push_stream = push_stream
+            self._playback_running = True
+            self._history.clear()
+            self._produced_audio_us = 0
+            self._timeline_start_us = None
+            self._first_commit_monotonic_us = None
+            self._mapping_dirty = True
+        # Bounded queue between producer (stream reader) and consumer (committer).
+        pending_chunks: asyncio.Queue[_PendingChunk | None] = asyncio.Queue(
+            maxsize=_PRODUCER_BACKLOG_SIZE
+        )
+        # Shadow deque mirroring pending_chunks for join-catchup backlog peeking.
+        pending_backlog: deque[_PendingChunk] = deque()
+        pending_duration_us = 0
+
+        async def _produce_pending_chunks() -> None:
+            nonlocal pending_duration_us
+            audio_source = self.player.mass.streams.get_stream(media, _PCM_FORMAT)
+            async for chunk in audio_source:
+                if not chunk:
+                    continue
+                for slice_chunk in self._iter_pcm_slices(chunk, _PCM_FORMAT, _PRODUCER_SLICE_US):
+                    if not slice_chunk:
+                        continue
+                    duration_us = self._duration_us(slice_chunk, _PCM_FORMAT)
+                    if duration_us <= 0:
+                        continue
+                    await self._refresh_member_mappings()
+                    pending = _PendingChunk(pcm=slice_chunk, duration_us=duration_us)
+                    await pending_chunks.put(pending)
+                    pending_backlog.append(pending)
+                    pending_duration_us += duration_us
+                    join_pending_ids, pipelines = await self._snapshot_active_pipelines()
+                    transform_pipelines: list[_MemberPipeline] = []
+                    for member_id, pipeline in pipelines:
+                        if not pipeline.config.requires_transform:
+                            continue
+                        if member_id in join_pending_ids:
+                            continue
+                        transform_pipelines.append(pipeline)
+                    results = await asyncio.gather(
+                        *(
+                            self._transform_member_chunk(pipeline, slice_chunk)
+                            for pipeline in transform_pipelines
+                        ),
+                        return_exceptions=True,
+                    )
+                    for pipeline, result in zip(transform_pipelines, results, strict=True):
+                        if isinstance(result, BaseException):
+                            self.player.logger.warning(
+                                "Transform push failed for channel %s: %s",
+                                pipeline.channel_id,
+                                result,
+                            )
+
+        async def _commit_pending_chunks() -> None:
+            nonlocal pending_duration_us
+            while True:
+                pending = await pending_chunks.get()
+                if pending is None:
+                    break
+                pending_backlog.popleft()
+                pending_duration_us = max(0, pending_duration_us - pending.duration_us)
+                await self._inject_ready_join_historical(push_stream, pending_backlog, pending.pcm)
+                push_stream.prepare_audio(
+                    pending.pcm, _SENDSPIN_PCM_FORMAT, channel_id=MAIN_CHANNEL
+                )
+                join_pending_ids, pipelines = await self._snapshot_active_pipelines()
+                transform_pipelines: list[_MemberPipeline] = []
+                for member_id, pipeline in pipelines:
+                    if not pipeline.config.requires_transform:
+                        continue
+                    if member_id in join_pending_ids:
+                        continue
+                    transform_pipelines.append(pipeline)
+                transformed_chunks = await asyncio.gather(
+                    *(
+                        self._read_member_chunk(pipeline, pending.duration_us)
+                        for pipeline in transform_pipelines
+                    ),
+                    return_exceptions=True,
+                )
+                for pipeline, transformed_chunk in zip(
+                    transform_pipelines, transformed_chunks, strict=True
+                ):
+                    if isinstance(transformed_chunk, BaseException):
+                        self.player.logger.warning(
+                            "Transform read failed for channel %s: %s",
+                            pipeline.channel_id,
+                            transformed_chunk,
+                        )
+                        continue
+                    if transformed_chunk is None:
+                        continue
+                    push_stream.prepare_audio(
+                        transformed_chunk,
+                        _SENDSPIN_PCM_FORMAT,
+                        channel_id=pipeline.channel_id,
+                    )
+                commit_start_us = await push_stream.commit_audio()
+                await push_stream.sleep_to_limit_buffer(_PRODUCER_BUFFER_LIMIT_US)
+                commit_now_us = int(time.monotonic_ns() / 1000)
+                committed_history_chunk = _HistoryChunk(
+                    start_time_us=int(commit_start_us),
+                    duration_us=pending.duration_us,
+                    pcm=pending.pcm,
+                )
+                async with self._state_lock:
+                    if self._timeline_start_us is None:
+                        self._timeline_start_us = int(commit_start_us)
+                    if self._first_commit_monotonic_us is None:
+                        self._first_commit_monotonic_us = commit_now_us
+                    self._history.append(committed_history_chunk)
+                    self._produced_audio_us += pending.duration_us
+                    self._prune_history_locked(commit_now_us)
+                await self._fanout_history_chunk_to_join_processors(committed_history_chunk)
+
+        commit_task = asyncio.create_task(_commit_pending_chunks())
+        self._attach_task_exception_logger(commit_task, "commit_pending_chunks")
+        producer_stopped_cleanly = False
+        try:
+            await _produce_pending_chunks()
+            producer_stopped_cleanly = True
+        finally:
+            if producer_stopped_cleanly and not commit_task.done():
+                # Producer finished normally; send a None sentinel so the
+                # consumer exits cleanly.  The queue may be full, so retry
+                # with a deadline before falling back to cancellation.
+                sentinel_sent = False
+                deadline = time.monotonic() + 1.0
+                while not sentinel_sent and not commit_task.done():
+                    try:
+                        pending_chunks.put_nowait(None)
+                        sentinel_sent = True
+                    except asyncio.QueueFull:
+                        if time.monotonic() >= deadline:
+                            break
+                        await asyncio.sleep(0.01)
+                if sentinel_sent:
+                    with suppress(asyncio.CancelledError, Exception):
+                        await commit_task
+                else:
+                    commit_task.cancel()
+                    with suppress(asyncio.CancelledError, Exception):
+                        await commit_task
+            else:
+                commit_task.cancel()
+                with suppress(asyncio.CancelledError, Exception):
+                    await commit_task
+            with suppress(Exception):
+                self._stop_push_stream()
+            await self._clear_join_catchup()
+            await self._clear_member_pipelines()
+            async with self._state_lock:
+                self._push_stream = None
+                self._playback_running = False
+                self._timeline_start_us = None
+                self._first_commit_monotonic_us = None
+                self._produced_audio_us = 0
+                self._history.clear()
+
+    # -- Join injection --------------------------------------------------------
+
+    async def _inject_ready_join_historical(
+        self,
+        push_stream: PushStream,
+        pending_backlog: deque[_PendingChunk],
+        current_pcm: bytes,
+    ) -> bool:
+        """Inject join-catchup historical audio once processor output reaches history end.
+
+        Join promotion lifecycle:
+        1. A catchup processor is fed historical PCM and new commits in parallel.
+        2. Once the processor's output lag falls within _JOIN_PROMOTE_ARM_WINDOW_US
+           of the history tail, promotion is "armed" and a target end timestamp is locked.
+        3. Once output reaches the target (within _JOIN_PROMOTE_TOLERANCE_US), the
+           catchup processor is promoted to the member's live DSP pipeline.
+        4. If promotion doesn't complete within _JOIN_PROMOTION_TIMEOUT_S, it's aborted.
+        """
+        injected_any = False
+        async with self._state_lock:
+            items = list(self._join_catchup.items())
+        for player_id, state in items:
+            produced_output_us = state.processor.produced_output_us
+            async with self._state_lock:
+                current = self._join_catchup.get(player_id)
+                if current is None or current.processor is not state.processor:
+                    continue
+                first_history_start_us = current.first_history_start_us
+                fed_until_us = current.fed_until_us
+                history_end_us = current.history_end_us
+                promotion_target_end_us = current.promotion_target_end_us
+                promotion_armed_monotonic_s = current.promotion_armed_monotonic_s
+            if first_history_start_us is None or fed_until_us is None or history_end_us is None:
+                continue
+            max_ready_end_us = min(
+                fed_until_us,
+                first_history_start_us + max(0, produced_output_us),
+            )
+            if promotion_target_end_us is None:
+                lag_to_tail_us = history_end_us - max_ready_end_us
+                if lag_to_tail_us > _JOIN_PROMOTE_ARM_WINDOW_US:
+                    continue
+                async with self._state_lock:
+                    current = self._join_catchup.get(player_id)
+                    if current is None or current.processor is not state.processor:
+                        continue
+                    if current.promotion_target_end_us is None:
+                        current.promotion_target_end_us = history_end_us
+                        current.promotion_armed_monotonic_s = time.monotonic()
+                    promotion_target_end_us = current.promotion_target_end_us
+                    promotion_armed_monotonic_s = current.promotion_armed_monotonic_s
+            target_end_us = promotion_target_end_us
+            if (
+                promotion_armed_monotonic_s is not None
+                and time.monotonic() - promotion_armed_monotonic_s > _JOIN_PROMOTION_TIMEOUT_S
+            ):
+                self.player.logger.error(
+                    "Join promotion timed out for %s after %.1fs; dropping join catchup",
+                    player_id,
+                    _JOIN_PROMOTION_TIMEOUT_S,
+                )
+                await self._stop_join_catchup(player_id)
+                continue
+            if max_ready_end_us + _JOIN_PROMOTE_TOLERANCE_US < target_end_us:
+                continue
+            inject_duration_us = target_end_us - first_history_start_us
+            transformed_history = state.processor.pop_duration_us_or_pad(
+                inject_duration_us, _JOIN_PROMOTE_TOLERANCE_US
+            )
+            if transformed_history is None:
+                continue
+            pipeline = await self._sync_member_pipeline(player_id)
+            # Split the blob into slices so push_stream can yield between encodes.
+            frame_stride = (_SENDSPIN_PCM_FORMAT.bit_depth // 8) * _SENDSPIN_PCM_FORMAT.channels
+            slice_bytes = (
+                int(_SENDSPIN_PCM_FORMAT.sample_rate * _PRODUCER_SLICE_US / 1_000_000)
+                * frame_stride
+            )
+            for offset in range(0, len(transformed_history), slice_bytes):
+                push_stream.prepare_historical_audio(
+                    transformed_history[offset : offset + slice_bytes],
+                    _SENDSPIN_PCM_FORMAT,
+                    channel_id=pipeline.channel_id,
+                    start_time_us=first_history_start_us if offset == 0 else None,
+                )
+            await self._prefeed_pending_backlog_for_join(state, current_pcm, pending_backlog)
+            await self._promote_join_catchup_processor(player_id, pipeline, target_end_us)
+            injected_any = True
+        return injected_any
+
+    async def _prefeed_pending_backlog_for_join(
+        self,
+        state: _JoinCatchupState,
+        current_pcm: bytes,
+        pending_backlog: deque[_PendingChunk],
+    ) -> None:
+        """Push current chunk + queued pending chunks into join processor before promotion.
+
+        Between the last committed chunk and the next commit, there may be
+        chunks already queued by the producer that the catchup processor hasn't
+        seen yet.  Feeding them now avoids a gap in transformed audio after
+        promotion.
+        """
+        await self._enqueue_join_pcm(state, current_pcm)
+        for item in list(pending_backlog):
+            await self._enqueue_join_pcm(state, item.pcm)
+
+    async def _fanout_history_chunk_to_join_processors(self, hist_chunk: _HistoryChunk) -> None:
+        """Feed newly committed history chunk into all active join-catchup processors."""
+        async with self._state_lock:
+            items = list(self._join_catchup.items())
+        for player_id, state in items:
+            async with state.write_lock:
+                # Read current state under lock.
+                async with self._state_lock:
+                    current = self._join_catchup.get(player_id)
+                    if current is None or current.processor is not state.processor:
+                        continue
+                    previous_end_us = current.fed_until_us
+                    first_history_start_us = current.first_history_start_us
+                # Initialize first_history_start_us if this is the first chunk.
+                if first_history_start_us is None:
+                    first_history_start_us = hist_chunk.start_time_us
+                    previous_end_us = first_history_start_us
+                # Fill timeline gaps with silence.
+                if previous_end_us is not None and hist_chunk.start_time_us > previous_end_us:
+                    gap_us = hist_chunk.start_time_us - previous_end_us
+                    silence = self._silence_for_duration_us(gap_us)
+                    if silence:
+                        await self._enqueue_join_pcm(state, silence)
+                await self._enqueue_join_pcm(state, hist_chunk.pcm)
+                # Write updated state back under lock.
+                new_end_us = hist_chunk.start_time_us + hist_chunk.duration_us
+                async with self._state_lock:
+                    current = self._join_catchup.get(player_id)
+                    if current is not None and current.processor is state.processor:
+                        if current.first_history_start_us is None:
+                            current.first_history_start_us = first_history_start_us
+                        if current.fed_until_us is None:
+                            current.fed_until_us = first_history_start_us
+                        current.fed_until_us = new_end_us
+                        current.history_end_us = new_end_us
+
+    async def _enqueue_join_pcm(
+        self,
+        state: _JoinCatchupState,
+        pcm: bytes,
+    ) -> None:
+        """Enqueue PCM into a joining member writer queue.
+
+        Bails out immediately if the writer task is dead to avoid blocking
+        the commit loop on a queue with no consumer.
+        """
+        if state.writer_task.done():
+            return
+        try:
+            state.input_queue.put_nowait(pcm)
+        except asyncio.QueueFull:
+            if state.writer_task.done():
+                return
+            await state.input_queue.put(pcm)
+
+    # -- Member pipeline management --------------------------------------------
+
+    async def _refresh_member_mappings(self) -> None:
+        """Re-evaluate per-member channel mapping and DSP requirements."""
+        async with self._state_lock:
+            if not self._mapping_dirty:
+                return
+            member_ids = tuple(self._members)
+            self._mapping_dirty = False
+        for member_id in member_ids:
+            await self._sync_member_pipeline(member_id)
+
+    async def _sync_member_pipeline(self, player_id: str) -> _MemberPipeline:
+        """Create/update pipeline state for one member from current MA config."""
+        config = self._get_pipeline_config_cached(player_id)
+        release_processor: _BufferedFfmpegProcessor | None = None
+        start_processor: _BufferedFfmpegProcessor | None = None
+        async with self._state_lock:
+            current = self._member_pipelines.get(player_id)
+            if current is not None and current.config.signature == config.signature:
+                return current
+            if current and current.config.requires_transform:
+                channel_id = current.channel_id if config.requires_transform else MAIN_CHANNEL
+                release_processor = current.processor
+            elif config.requires_transform:
+                channel_id = self._get_or_create_preassigned_channel(player_id)
+            else:
+                channel_id = MAIN_CHANNEL
+                self._preassigned_channels.pop(player_id, None)
+            processor: _BufferedFfmpegProcessor | None = None
+            if config.requires_transform:
+                ffmpeg_obj = self._create_member_ffmpeg(config.filter_params)
+                processor = _BufferedFfmpegProcessor(ffmpeg_obj, _PCM_FORMAT)
+                start_processor = processor
+            pipeline = _MemberPipeline(
+                player_id=player_id,
+                channel_id=channel_id,
+                config=config,
+                processor=processor,
+            )
+            self._member_pipelines[player_id] = pipeline
+        if start_processor is not None:
+            try:
+                await start_processor.start()
+            except Exception as err:
+                async with self._state_lock:
+                    if (
+                        self._member_pipelines.get(player_id) is not None
+                        and self._member_pipelines[player_id].processor is start_processor
+                    ):
+                        self._member_pipelines.pop(player_id, None)
+                with suppress(Exception):
+                    await self._close_member_ffmpeg(start_processor)
+                raise RuntimeError(f"Failed to start member DSP ffmpeg for {player_id}") from err
+        if release_processor is not None:
+            await self._close_member_ffmpeg(release_processor)
+        return pipeline
+
+    def _get_pipeline_config_cached(
+        self,
+        player_id: str,
+        *,
+        force_refresh: bool = False,
+    ) -> _PipelineConfig:
+        """Return cached pipeline config for a player, calculating on cache miss."""
+        if not force_refresh and (cached := self._pipeline_config_cache.get(player_id)) is not None:
+            return cached
+        config = self._read_pipeline_config(player_id)
+        self._pipeline_config_cache[player_id] = config
+        return config
+
+    def _read_pipeline_config(self, player_id: str) -> _PipelineConfig:
+        """Read MA config and determine if member needs a dedicated DSP channel."""
+        dsp_config = self.player.mass.config.get_player_dsp_config(player_id)
+        dsp_enabled = bool(dsp_config.enabled)
+        raw_output_channels = self.player.mass.config.get_raw_player_config_value(
+            player_id,
+            CONF_OUTPUT_CHANNELS,
+            "stereo",
+        )
+        output_channels = str(raw_output_channels or "stereo").strip().lower()
+        if output_channels not in {"stereo", "left", "right", "mono"}:
+            output_channels = "stereo"
+        try:
+            filter_params = tuple(
+                get_player_filter_params(
+                    self.player.mass,
+                    player_id,
+                    _PCM_FORMAT,
+                    _PCM_FORMAT,
+                )
+            )
+        except Exception:
+            filter_params = ()
+        custom_filter_graph = any(
+            param.strip() and not param.strip().startswith("alimiter=") for param in filter_params
+        )
+        requires_transform = dsp_enabled or output_channels != "stereo" or custom_filter_graph
+        return _PipelineConfig(
+            requires_transform=requires_transform,
+            output_channels=output_channels,
+            filter_params=filter_params,
+        )
+
+    def _get_or_create_preassigned_channel(self, player_id: str) -> UUID:
+        """Return stable dedicated channel id for transform-required player."""
+        if (channel_id := self._preassigned_channels.get(player_id)) is not None:
+            return channel_id
+        channel_id = uuid4()
+        self._preassigned_channels[player_id] = channel_id
+        return channel_id
+
+    # -- FFmpeg lifecycle ------------------------------------------------------
+
+    def _create_member_ffmpeg(self, filter_params: tuple[str, ...]) -> FFMpeg:
+        """Create per-member FFMpeg for DSP pipeline."""
+        return FFMpeg(
+            audio_input="-",
+            input_format=_PCM_FORMAT,
+            output_format=_PCM_FORMAT,
+            filter_params=list(filter_params),
+        )
+
+    async def _transform_member_chunk(self, pipeline: _MemberPipeline, chunk: bytes) -> None:
+        """Push one PCM chunk into a member DSP pipeline."""
+        processor = pipeline.processor
+        if processor is None:
+            return
+        await processor.push(chunk)
+
+    async def _read_member_chunk(
+        self,
+        pipeline: _MemberPipeline,
+        duration_us: int,
+    ) -> bytes | None:
+        """Read one transformed chunk from a member DSP pipeline."""
+        processor = pipeline.processor
+        if processor is None or duration_us <= 0:
+            return b""
+        transformed = await processor.read_duration_us(duration_us)
+        if not transformed:
+            return None
+        pipeline.ready = True
+        return bytes(transformed)
+
+    async def _close_member_ffmpeg(self, processor: _BufferedFfmpegProcessor) -> None:
+        """Close an ffmpeg processor, suppressing errors."""
+        with suppress(Exception):
+            await processor.close()
+
+    async def _clear_member_pipelines(self) -> None:
+        """Release all member pipeline resources."""
+        async with self._state_lock:
+            pipelines = list(self._member_pipelines.values())
+            self._member_pipelines.clear()
+        for pipeline in pipelines:
+            if pipeline.processor is not None:
+                await self._close_member_ffmpeg(pipeline.processor)
+
+    # -- Push stream -----------------------------------------------------------
+
+    def _create_push_stream(self) -> PushStream:
+        """Create PushStream with channel resolver for per-member routing."""
+        return self.player.api.group.start_stream(channel_resolver=self._resolve_channel_for_player)
+
+    def _stop_push_stream(self) -> None:
+        """Stop the active PushStream."""
+        self.player.api.group.stop_stream()
+
+    def _resolve_channel_for_player(self, player_id: str) -> UUID:
+        """Channel resolver callback for per-player routing."""
+        pipeline = self._member_pipelines.get(player_id)
+        if pipeline is not None:
+            return pipeline.channel_id
+        # The leader always receives MAIN_CHANNEL audio directly from the
+        # commit loop; only group members get per-player DSP channels.
+        if player_id == self.player.player_id:
+            return MAIN_CHANNEL
+        # Force a fresh config read for pending/unknown joiners so the very
+        # first resolution (triggered by add_client) uses up-to-date DSP settings.
+        force = player_id not in self._members
+        config = self._get_pipeline_config_cached(player_id, force_refresh=force)
+        if not config.requires_transform:
+            return MAIN_CHANNEL
+        return self._get_or_create_preassigned_channel(player_id)
+
+    # -- History ---------------------------------------------------------------
+
+    def _prune_history_locked(self, now_monotonic_us: int) -> None:
+        """Drop old history chunks that are fully in the past."""
+        if self._timeline_start_us is None or self._first_commit_monotonic_us is None:
+            return
+        elapsed_real_us = max(0, now_monotonic_us - self._first_commit_monotonic_us)
+        source_now_us = self._timeline_start_us + elapsed_real_us
+        cutoff_us = source_now_us - _HISTORY_KEEP_PAST_US
+        while self._history and (
+            self._history[0].start_time_us + self._history[0].duration_us <= cutoff_us
+        ):
+            self._history.popleft()
+
+    # -- PCM utilities ---------------------------------------------------------
+
+    @staticmethod
+    def _duration_us(audio: bytes, audio_format: AudioFormat) -> int:
+        """Compute chunk duration from PCM payload size."""
+        bytes_per_sample = max(1, int(audio_format.bit_depth // 8))
+        bytes_per_second = (
+            int(audio_format.sample_rate) * bytes_per_sample * int(audio_format.channels)
+        )
+        if bytes_per_second <= 0:
+            return 0
+        return int((len(audio) / bytes_per_second) * 1_000_000)
+
+    @staticmethod
+    def _iter_pcm_slices(
+        audio: bytes, audio_format: AudioFormat, target_duration_us: int
+    ) -> Iterator[bytes]:
+        """Yield frame-aligned PCM slices up to target duration."""
+        if not audio:
+            return
+        bytes_per_sample = max(1, int(audio_format.bit_depth // 8))
+        frame_size = bytes_per_sample * int(audio_format.channels)
+        if frame_size <= 0:
+            yield audio
+            return
+        samples_per_slice = max(
+            1, round((target_duration_us / 1_000_000) * int(audio_format.sample_rate))
+        )
+        slice_size = max(frame_size, samples_per_slice * frame_size)
+        offset = 0
+        audio_len = len(audio)
+        while offset < audio_len:
+            end = min(audio_len, offset + slice_size)
+            if end < audio_len:
+                aligned_end = end - (end % frame_size)
+                if aligned_end <= offset:
+                    aligned_end = min(audio_len, offset + frame_size)
+                end = aligned_end
+            yield audio[offset:end]
+            offset = end
+
+    @staticmethod
+    def _silence_for_duration_us(duration_us: int) -> bytes:
+        """Generate silent PCM with frame-aligned duration for the default format."""
+        if duration_us <= 0:
+            return b""
+        bytes_per_sample = max(1, int(_PCM_FORMAT.bit_depth // 8))
+        frame_size = bytes_per_sample * int(_PCM_FORMAT.channels)
+        samples = max(0, round((duration_us / 1_000_000) * int(_PCM_FORMAT.sample_rate)))
+        return b"\x00" * (samples * frame_size)
index 91987a35498768a9efc6157aabb5cf781c755c0a..4fdaffd554947a7e5d31f438f1981b093e5b3abe 100644 (file)
@@ -4,49 +4,65 @@ from __future__ import annotations
 
 import asyncio
 import time
-from collections.abc import AsyncGenerator, Callable
+from collections.abc import Callable
 from io import BytesIO
 from typing import TYPE_CHECKING, cast
 
-from aiosendspin.models import MediaCommand
-from aiosendspin.models.types import ArtworkSource, PlaybackStateType
+from aiosendspin.models import AudioCodec, MediaCommand
+from aiosendspin.models.types import PlaybackStateType
 from aiosendspin.models.types import RepeatMode as SendspinRepeatMode
-from aiosendspin.server import AudioFormat as SendspinAudioFormat
 from aiosendspin.server import (
     ClientEvent,
-    GroupCommandEvent,
     GroupEvent,
-    GroupStateChangedEvent,
     SendspinGroup,
     VolumeChangedEvent,
 )
+from aiosendspin.server.audio import AudioFormat as SendspinAudioFormat
 from aiosendspin.server.client import DisconnectBehaviour
-from aiosendspin.server.events import ClientGroupChangedEvent
-from aiosendspin.server.group import (
+from aiosendspin.server.events import (
+    ClientGroupChangedEvent,
     GroupDeletedEvent,
     GroupMemberAddedEvent,
     GroupMemberRemovedEvent,
+    GroupStateChangedEvent,
+)
+from aiosendspin.server.roles import (
+    ArtworkGroupRole,
+    ControllerEvent,
+    ControllerGroupRole,
+    ControllerNextEvent,
+    ControllerPauseEvent,
+    ControllerPlayEvent,
+    ControllerPreviousEvent,
+    ControllerRepeatEvent,
+    ControllerShuffleEvent,
+    ControllerStopEvent,
+    MetadataGroupRole,
 )
-from aiosendspin.server.metadata import Metadata
-from aiosendspin.server.stream import AudioCodec, MediaStream
+from aiosendspin.server.roles.metadata.state import Metadata
+from aiosendspin.server.roles.player.types import PlayerRoleProtocol
+from music_assistant_models.config_entries import ConfigEntry, ConfigValueOption
 from music_assistant_models.constants import PLAYER_CONTROL_NONE
 from music_assistant_models.enums import (
-    ContentType,
+    ConfigEntryType,
     ImageType,
     PlaybackState,
     PlayerFeature,
     PlayerType,
     RepeatMode,
 )
-from music_assistant_models.media_items import AudioFormat
 from music_assistant_models.player import DeviceInfo
 from PIL import Image
 
-from music_assistant.constants import CONF_OUTPUT_CHANNELS, CONF_OUTPUT_CODEC, INTERNAL_PCM_FORMAT
-from music_assistant.helpers.audio import get_player_filter_params
+from music_assistant.constants import (
+    CONF_ENTRY_HTTP_PROFILE_HIDDEN,
+    CONF_ENTRY_OUTPUT_CODEC_HIDDEN,
+    CONF_ENTRY_SAMPLE_RATES,
+)
 from music_assistant.models.player import Player, PlayerMedia
-
-from .timed_client_stream import TimedClientStream
+from music_assistant.providers.sendspin.playback import (
+    SendspinPlaybackSession,
+)
 
 # Supported group commands for Sendspin players
 SUPPORTED_GROUP_COMMANDS = [
@@ -62,111 +78,61 @@ SUPPORTED_GROUP_COMMANDS = [
     MediaCommand.UNSHUFFLE,
 ]
 
-if TYPE_CHECKING:
-    from aiosendspin.server.client import SendspinClient
-    from music_assistant_models.player_queue import PlayerQueue
-    from music_assistant_models.queue_item import QueueItem
-
-    from .provider import SendspinProvider
+# Config constants for Sendspin audio format
+CONF_PREFERRED_SENDSPIN_FORMAT = "preferred_sendspin_format"
+SENDSPIN_FORMAT_AUTOMATIC = "automatic"
 
 
-class MusicAssistantMediaStream(MediaStream):
-    """MediaStream implementation for Music Assistant with per-player DSP support."""
-
-    player_instance: SendspinPlayer
-    internal_format: AudioFormat
-    output_format: AudioFormat
-
-    def __init__(
-        self,
-        *,
-        main_channel_source: AsyncGenerator[bytes, None],
-        main_channel_format: SendspinAudioFormat,
-        player_instance: SendspinPlayer,
-        internal_format: AudioFormat,
-        output_format: AudioFormat,
-    ) -> None:
-        """
-        Initialise the media stream with audio source and format for main_channel().
-
-        Args:
-            main_channel_source: Audio source generator for the main channel.
-            main_channel_format: Audio format for the main channel (includes codec).
-            player_instance: The SendspinPlayer instance for accessing mass and streams.
-            internal_format: Internal processing format (float32 for headroom).
-            output_format: Output PCM format (16-bit for player output).
-        """
-        super().__init__(
-            main_channel_source=main_channel_source,
-            main_channel_format=main_channel_format,
-        )
-        self.player_instance = player_instance
-        self.internal_format = internal_format
-        self.output_format = output_format
-
-    async def player_channel(
-        self,
-        player_id: str,
-        preferred_format: SendspinAudioFormat | None = None,
-        position_us: int = 0,
-    ) -> tuple[AsyncGenerator[bytes, None], SendspinAudioFormat, int] | None:
-        """
-        Get a player-specific audio stream with per-player DSP.
+def format_to_option_value(fmt: SupportedAudioFormat) -> str:
+    """Convert SupportedAudioFormat to "codec:sample_rate:bit_depth:channels"."""
+    return f"{fmt.codec.value}:{fmt.sample_rate}:{fmt.bit_depth}:{fmt.channels}"
 
-        Args:
-            player_id: Identifier for the player requesting the stream.
-            preferred_format: The player's preferred native format for the stream.
-                The implementation may return a different format; the library
-                will handle any necessary conversion.
-            position_us: Position in microseconds relative to the main_stream start.
-                Used for late-joining players to sync with the main stream.
-
-        Returns:
-            A tuple of (audio generator, audio format, actual position in microseconds)
-            or None if unavailable. If None, the main_stream is used as fallback.
-        """
-        mass = self.player_instance.mass
-        multi_client_stream = self.player_instance.timed_client_stream
-        assert multi_client_stream is not None
 
-        dsp = mass.config.get_player_dsp_config(player_id)
-        output_channels = mass.config.get_raw_player_config_value(
-            player_id, CONF_OUTPUT_CHANNELS, "stereo"
-        )
-        if not dsp.enabled and output_channels == "stereo":
-            # DSP is disabled and output is stereo, use main_stream
-            return None
+def option_value_to_format(value: str) -> tuple[AudioCodec, SendspinAudioFormat] | None:
+    """Parse option value back to (AudioCodec, SendspinAudioFormat).
 
-        # Get per-player DSP filter parameters
-        filter_params = get_player_filter_params(
-            mass, player_id, self.internal_format, self.output_format
+    :param value: Option value in format "codec:sample_rate:bit_depth:channels".
+    :return: Tuple of (AudioCodec, SendspinAudioFormat) or None if parsing fails.
+    """
+    try:
+        codec_str, sample_rate_str, bit_depth_str, channels_str = value.split(":")
+        codec = AudioCodec(codec_str)
+        audio_format = SendspinAudioFormat(
+            sample_rate=int(sample_rate_str),
+            bit_depth=int(bit_depth_str),
+            channels=int(channels_str),
         )
+        return (codec, audio_format)
+    except (ValueError, KeyError):
+        return None
+
+
+def format_to_display_string(fmt: SupportedAudioFormat) -> str:
+    """Convert to display string like "FLAC 48kHz/24bit stereo"."""
+    codec_name = fmt.codec.name
+    sample_rate_khz = fmt.sample_rate / 1000
+    # Format sample rate: show as integer if whole number, otherwise one decimal
+    if sample_rate_khz == int(sample_rate_khz):
+        sample_rate_str = f"{int(sample_rate_khz)}kHz"
+    else:
+        sample_rate_str = f"{sample_rate_khz:.1f}kHz"
+    if fmt.channels == 2:
+        channels_str = "stereo"
+    elif fmt.channels == 1:
+        channels_str = "mono"
+    else:
+        channels_str = f"{fmt.channels}ch"
+    return f"{codec_name} {sample_rate_str}/{fmt.bit_depth}bit {channels_str}"
 
-        # Get the stream with position (in seconds)
-        stream_gen, actual_position = await multi_client_stream.get_stream(
-            output_format=self.output_format,
-            filter_params=filter_params,
-        )
 
-        # Convert position from seconds to microseconds for aiosendspin API
-        actual_position_us = int(actual_position * 1_000_000)
+if TYPE_CHECKING:
+    from aiosendspin.models.player import SupportedAudioFormat
+    from aiosendspin.server.client import SendspinClient
+    from music_assistant_models.config_entries import ConfigValueType
+    from music_assistant_models.player_queue import PlayerQueue
+    from music_assistant_models.queue_item import QueueItem
 
-        # Return actual position in microseconds relative to main_stream start
-        self.player_instance.logger.debug(
-            "Providing channel stream for player %s at position %d us",
-            player_id,
-            actual_position_us,
-        )
-        return (
-            stream_gen,
-            SendspinAudioFormat(
-                sample_rate=self.output_format.sample_rate,
-                bit_depth=self.output_format.bit_depth,
-                channels=self.output_format.channels,
-                codec=self._main_channel_format.codec,
-            ),
-            actual_position_us,
-        )
+    from .provider import SendspinProvider
 
 
 class SendspinPlayer(Player):
@@ -179,8 +145,7 @@ class SendspinPlayer(Player):
     unsub_group_event_cb: Callable[[], None]
     last_sent_artwork_url: str | None = None
     last_sent_artist_artwork_url: str | None = None
-    _playback_task: asyncio.Task[None] | None = None
-    timed_client_stream: TimedClientStream | None = None
+    playback_session: SendspinPlaybackSession
     is_web_player: bool = False
 
     @property
@@ -197,7 +162,10 @@ class SendspinPlayer(Player):
         self.api.disconnect_behaviour = DisconnectBehaviour.STOP
         self.unsub_event_cb = sendspin_client.add_event_listener(self.event_cb)
         self.unsub_group_event_cb = sendspin_client.group.add_event_listener(self.group_event_cb)
-        sendspin_client.group.set_supported_commands(SUPPORTED_GROUP_COMMANDS)
+        if controller_role := self._controller_role:
+            controller_role.set_supported_commands(SUPPORTED_GROUP_COMMANDS)
+
+        self.playback_session = SendspinPlaybackSession(self)
 
         self.logger = self.provider.logger.getChild(player_id)
         # init some static variables
@@ -205,9 +173,9 @@ class SendspinPlayer(Player):
         self._attr_supported_features = {
             PlayerFeature.PLAY_MEDIA,
             PlayerFeature.SET_MEMBERS,
-            PlayerFeature.MULTI_DEVICE_DSP,
             PlayerFeature.VOLUME_SET,
             PlayerFeature.VOLUME_MUTE,
+            PlayerFeature.MULTI_DEVICE_DSP,
         }
         self._attr_can_group_with = {provider.instance_id}
         self._attr_power_control = PLAYER_CONTROL_NONE
@@ -219,9 +187,16 @@ class SendspinPlayer(Player):
             )
         else:
             self._attr_device_info = DeviceInfo()
-        if player_client := sendspin_client.player:
-            self._attr_volume_level = player_client.volume
-            self._attr_volume_muted = player_client.muted
+        if sendspin_client.info.player_support:
+            for role in sendspin_client.roles_by_family("player"):
+                volume = role.get_player_volume()
+                muted = role.get_player_muted()
+                if volume is not None:
+                    self._attr_volume_level = volume
+                if muted is not None:
+                    self._attr_volume_muted = muted
+                if volume is not None or muted is not None:
+                    break
         self._attr_available = True
         self.is_web_player = sendspin_client.name.startswith(
             "Web ("  # The regular Web Interface
@@ -231,9 +206,85 @@ class SendspinPlayer(Player):
         self._attr_expose_to_ha_by_default = not self.is_web_player
         self._attr_hidden_by_default = self.is_web_player
 
+    @property
+    def _artwork_role(self) -> ArtworkGroupRole | None:
+        """Get the ArtworkGroupRole for this player's group."""
+        role = self.api.group.group_role("artwork")
+        if isinstance(role, ArtworkGroupRole):
+            return role
+        return None
+
+    @property
+    def _metadata_role(self) -> MetadataGroupRole | None:
+        """Get the MetadataGroupRole for this player's group."""
+        role = self.api.group.group_role("metadata")
+        if isinstance(role, MetadataGroupRole):
+            return role
+        return None
+
+    @property
+    def _controller_role(self) -> ControllerGroupRole | None:
+        """Get the ControllerGroupRole for this player's group."""
+        role = self.api.group.group_role("controller")
+        if isinstance(role, ControllerGroupRole):
+            return role
+        return None
+
+    @property
+    def _player_role(self) -> PlayerRoleProtocol | None:
+        """Get the player role for this client (not group role)."""
+        for role in self.api.roles_by_family("player"):
+            if isinstance(role, PlayerRoleProtocol):
+                return role
+        return None
+
+    async def _handle_controller_event(self, event: ControllerEvent) -> None:
+        """Handle a controller event from the ControllerGroupRole."""
+        queue = self.mass.player_queues.get_active_queue(self.player_id)
+        match event:
+            case ControllerPlayEvent():
+                await self.mass.players.cmd_play(self.player_id)
+            case ControllerPauseEvent():
+                await self.mass.players.cmd_pause(self.player_id)
+            case ControllerStopEvent():
+                await self.mass.players.cmd_stop(self.player_id)
+            case ControllerNextEvent():
+                await self.mass.players.cmd_next_track(self.player_id)
+            case ControllerPreviousEvent():
+                await self.mass.players.cmd_previous_track(self.player_id)
+            case ControllerRepeatEvent(mode=mode) if queue:
+                match mode:
+                    case SendspinRepeatMode.OFF:
+                        self.mass.player_queues.set_repeat(queue.queue_id, RepeatMode.OFF)
+                    case SendspinRepeatMode.ONE:
+                        self.mass.player_queues.set_repeat(queue.queue_id, RepeatMode.ONE)
+                    case SendspinRepeatMode.ALL:
+                        self.mass.player_queues.set_repeat(queue.queue_id, RepeatMode.ALL)
+            case ControllerShuffleEvent(shuffle=shuffle) if queue:
+                await self.mass.player_queues.set_shuffle(queue.queue_id, shuffle_enabled=shuffle)
+
+    async def _sync_membership_from_group(self, group: SendspinGroup) -> None:
+        """Sync MA/player + playback session membership from authoritative group state."""
+        # Ignore stale events from a group we no longer belong to.
+        if group is not self.api.group:
+            return
+        group_client_ids = [client.client_id for client in group.clients]
+        is_leader = bool(group_client_ids) and group_client_ids[0] == self.player_id
+        desired_group_members = group_client_ids if is_leader else []
+        desired_session_members = group_client_ids[1:] if is_leader else []
+        if self._attr_group_members != desired_group_members:
+            self._attr_group_members = desired_group_members
+            self.update_state()
+        # Only use STOP when we actually lead other members.
+        self.api.disconnect_behaviour = (
+            DisconnectBehaviour.STOP
+            if is_leader and len(desired_session_members) > 0
+            else DisconnectBehaviour.UNGROUP
+        )
+        await self.playback_session.sync_members(set(desired_session_members))
+
     def event_cb(self, client: SendspinClient, event: ClientEvent) -> None:
-        """Event callback registered to the sendspin server."""
-        self.logger.debug("Received PlayerEvent: %s", event)
+        """Event callback registered to the sendspin client."""
         match event:
             case VolumeChangedEvent(volume=volume, muted=muted):
                 self._attr_volume_level = volume
@@ -242,6 +293,10 @@ class SendspinPlayer(Player):
             case ClientGroupChangedEvent(new_group=new_group):
                 self.unsub_group_event_cb()
                 self.unsub_group_event_cb = new_group.add_event_listener(self.group_event_cb)
+                if controller_role := self._controller_role:
+                    controller_role.set_supported_commands(SUPPORTED_GROUP_COMMANDS)
+                # Cancel active playback - push stream belongs to the old group
+                self.mass.create_task(self.playback_session.cancel("group changed"))
                 # Sync playback state from the new group
                 match new_group.state:
                     case PlaybackStateType.PLAYING:
@@ -250,42 +305,14 @@ class SendspinPlayer(Player):
                         self._attr_playback_state = PlaybackState.PAUSED
                     case PlaybackStateType.STOPPED:
                         self._attr_playback_state = PlaybackState.IDLE
+                        self._attr_elapsed_time = 0
+                        self._attr_elapsed_time_last_updated = time.time()
                 # Update in case this is a newly created group
-                new_group.set_supported_commands(SUPPORTED_GROUP_COMMANDS)
                 # GroupMemberAddedEvent or GroupMemberRemovedEvent will be fired before this
                 # so group members are already up to date at this point
-                if self.synced_to is None:
-                    # We are the leader, stop on disconnect
-                    self.api.disconnect_behaviour = DisconnectBehaviour.STOP
-                else:
-                    self.api.disconnect_behaviour = DisconnectBehaviour.UNGROUP
+                self.mass.create_task(self._sync_membership_from_group(new_group))
                 self.update_state()
 
-    async def _handle_group_command(self, command: MediaCommand) -> None:
-        """Handle a group command from aiosendspin."""
-        queue = self.mass.player_queues.get_active_queue(self.player_id)
-        match command:
-            case MediaCommand.PLAY:
-                await self.mass.players.cmd_play(self.player_id)
-            case MediaCommand.PAUSE:
-                await self.mass.players.cmd_pause(self.player_id)
-            case MediaCommand.STOP:
-                await self.mass.players.cmd_stop(self.player_id)
-            case MediaCommand.NEXT:
-                await self.mass.players.cmd_next_track(self.player_id)
-            case MediaCommand.PREVIOUS:
-                await self.mass.players.cmd_previous_track(self.player_id)
-            case MediaCommand.REPEAT_OFF if queue:
-                self.mass.player_queues.set_repeat(queue.queue_id, RepeatMode.OFF)
-            case MediaCommand.REPEAT_ONE if queue:
-                self.mass.player_queues.set_repeat(queue.queue_id, RepeatMode.ONE)
-            case MediaCommand.REPEAT_ALL if queue:
-                self.mass.player_queues.set_repeat(queue.queue_id, RepeatMode.ALL)
-            case MediaCommand.SHUFFLE if queue:
-                await self.mass.player_queues.set_shuffle(queue.queue_id, shuffle_enabled=True)
-            case MediaCommand.UNSHUFFLE if queue:
-                await self.mass.player_queues.set_shuffle(queue.queue_id, shuffle_enabled=False)
-
     def group_event_cb(self, group: SendspinGroup, event: GroupEvent) -> None:
         """Event callback registered to the sendspin group this player belongs to."""
         if self.synced_to is not None:
@@ -294,14 +321,8 @@ class SendspinPlayer(Player):
             # - GroupStateChangedEvent: to update playback state when leader stops/disconnects
             if not isinstance(event, (GroupMemberRemovedEvent, GroupStateChangedEvent)):
                 return
-        self.logger.debug("Received GroupEvent: %s", event)
-
         match event:
-            case GroupCommandEvent(command=command):
-                self.logger.debug("Group command received: %s", command)
-                self.mass.create_task(self._handle_group_command(command))
             case GroupStateChangedEvent(state=state):
-                self.logger.debug("Group state changed to: %s", state)
                 match state:
                     case PlaybackStateType.PLAYING:
                         self._attr_playback_state = PlaybackState.PLAYING
@@ -311,38 +332,41 @@ class SendspinPlayer(Player):
                         self._attr_playback_state = PlaybackState.IDLE
                         self._attr_elapsed_time = 0
                         self._attr_elapsed_time_last_updated = time.time()
+                        if self.synced_to is None:
+                            self.mass.create_task(self.playback_session.cancel("group stopped"))
                 self.update_state()
             case GroupMemberAddedEvent(client_id=client_id):
-                self.logger.debug("Group member added: %s", client_id)
+                is_group_leader = (
+                    bool(group.clients) and group.clients[0].client_id == self.player_id
+                )
+                if is_group_leader and (
+                    not self._attr_group_members or self._attr_group_members[0] != self.player_id
+                ):
+                    self._attr_group_members = [self.player_id, *self._attr_group_members]
                 if client_id not in self._attr_group_members:
                     self._attr_group_members.append(client_id)
                     self.update_state()
+                self.mass.create_task(self.playback_session.add_member(client_id))
+                self.mass.create_task(self._sync_membership_from_group(group))
             case GroupMemberRemovedEvent(client_id=client_id):
-                self.logger.debug("Group member removed: %s", client_id)
-                self.mass.create_task(self._handle_member_removed(group, client_id))
+                self.mass.create_task(self.playback_session.remove_member(client_id))
+                self.mass.create_task(self._handle_group_member_removed(group, client_id))
+                self.mass.create_task(self._sync_membership_from_group(group))
             case GroupDeletedEvent():
                 pass
+            case ControllerEvent() as controller_event:
+                if self.synced_to is None:
+                    self.mass.create_task(self._handle_controller_event(controller_event))
 
-    async def _handle_member_removed(self, group: SendspinGroup, client_id: str) -> None:
-        """Handle group member removed event asynchronously."""
+    async def _handle_group_member_removed(self, group: SendspinGroup, client_id: str) -> None:
+        """Handle a group member being removed asynchronously."""
         if client_id == self.player_id:
-            if len(self._attr_group_members) > 0:
+            if len(group.clients) > 0:
                 # We were just removed as a leader:
                 # 1. stop playback on the old group
                 await group.stop()
-                # 2. clear our members (since we are now alone)
-                group_members = [
-                    member for member in self._attr_group_members if member != client_id
-                ]
+                # 2. clear our members (since we are now alone in a new group)
                 self._attr_group_members = []
-                # 3. assign new leader if there are members left
-                if len(group_members) > 0 and (
-                    new_leader := self.mass.players.get_player(group_members[0])
-                ):
-                    new_leader = cast("SendspinPlayer", new_leader)
-                    new_leader._attr_group_members = group_members[1:]
-                    new_leader.api.disconnect_behaviour = DisconnectBehaviour.STOP
-                    new_leader.update_state()
             self.update_state()
         elif client_id in self._attr_group_members:
             # Someone else left our group
@@ -351,26 +375,27 @@ class SendspinPlayer(Player):
 
     async def volume_set(self, volume_level: int) -> None:
         """Handle VOLUME_SET command on the player."""
-        if player_client := self.api.player:
-            player_client.set_volume(volume_level)
+        roles = self.api.roles_by_family("player")
+        for role in roles:
+            role.set_player_volume(volume_level)
 
     async def volume_mute(self, muted: bool) -> None:
         """Handle VOLUME MUTE command on the player."""
-        if player_client := self.api.player:
-            if muted:
-                player_client.mute()
-            else:
-                player_client.unmute()
+        roles = self.api.roles_by_family("player")
+        for role in roles:
+            role.set_player_mute(muted)
 
     async def stop(self) -> None:
         """Stop command."""
         self.logger.debug("Received STOP command on player %s", self.display_name)
-        # We don't care if we stopped the stream or it was already stopped
-        await self.api.group.stop()
-        # Clear the playback task reference (group.stop() handles stopping the stream)
-        self._playback_task = None
+        self.mark_stop_called()
         self._attr_current_media = None
+        self._attr_playback_state = PlaybackState.IDLE
+        self._attr_elapsed_time = 0
+        self._attr_elapsed_time_last_updated = time.time()
         self.update_state()
+        await self.playback_session.cancel("stop command")
+        await self.api.group.stop()
 
     async def play_media(self, media: PlayerMedia) -> None:
         """Play media command."""
@@ -385,83 +410,53 @@ class SendspinPlayer(Player):
         # playback_state will be set by the group state change event
 
         # Stop previous stream in case we were already playing something
+        await self.playback_session.cancel("new media requested")
         await self.api.group.stop()
-        # Run playback in background task to immediately return
-        self._playback_task = asyncio.create_task(self._run_playback(media))
+        await self.playback_session.start(media)
         self.update_state()
 
-    async def _run_playback(self, media: PlayerMedia) -> None:
-        """Run the actual playback in a background task."""
-        try:
-            # Use 32-bit for the main channel: aiosendspin converts per player as needed
-            pcm_format = AudioFormat(
-                content_type=ContentType.PCM_S32LE,
-                sample_rate=48000,
-                bit_depth=32,
-                channels=2,
-            )
-            flow_pcm_format = AudioFormat(
-                content_type=INTERNAL_PCM_FORMAT.content_type,
-                sample_rate=pcm_format.sample_rate,
-                bit_depth=INTERNAL_PCM_FORMAT.bit_depth,
-                channels=pcm_format.channels,
-            )
-
-            output_codec = cast("str", self.config.get_value(CONF_OUTPUT_CODEC, "pcm"))
+    async def on_config_updated(self) -> None:
+        """Apply preferred format when config changes."""
+        await self._apply_preferred_format()
 
-            # Convert string codec to AudioCodec enum
-            audio_codec = AudioCodec(output_codec)
+    async def _apply_preferred_format(self) -> None:
+        """Read config and call set_preferred_format() if not automatic."""
+        player_role = self._player_role
+        if player_role is None:
+            return
 
-            # Get clean audio source in flow format (high quality internal format)
-            # Format conversion and per-player DSP will be applied via player_channel
-            audio_source = self.mass.streams.get_stream(media, flow_pcm_format)
+        config_value = cast(
+            "str",
+            self.config.get_value(CONF_PREFERRED_SENDSPIN_FORMAT, SENDSPIN_FORMAT_AUTOMATIC),
+        )
+        if config_value == SENDSPIN_FORMAT_AUTOMATIC:
+            # Automatic mode: don't set a preferred format, let client decide.
+            return
 
-            # Create TimedClientStream to wrap the clean audio source
-            # This distributes the audio to multiple subscribers without DSP
-            self.timed_client_stream = TimedClientStream(
-                audio_source=audio_source,
-                audio_format=flow_pcm_format,
+        parsed = option_value_to_format(config_value)
+        if parsed is None:
+            self.logger.warning(
+                "Invalid audio format config value '%s' for player %s",
+                config_value,
+                self.display_name,
             )
+            return
 
-            # Setup the main channel subscription
-            main_channel_gen, main_position = await self.timed_client_stream.get_stream(
-                output_format=pcm_format,
-                filter_params=None,  # TODO: this should probably still include the safety limiter
-            )
-            assert main_position == 0.0  # first subscriber, should be zero
-            media_stream = MusicAssistantMediaStream(
-                main_channel_source=main_channel_gen,
-                main_channel_format=SendspinAudioFormat(
-                    sample_rate=pcm_format.sample_rate,
-                    bit_depth=pcm_format.bit_depth,
-                    channels=pcm_format.channels,
-                    codec=audio_codec,
-                ),
-                player_instance=self,
-                internal_format=flow_pcm_format,
-                output_format=pcm_format,
+        codec, audio_format = parsed
+        if not player_role.set_preferred_format(audio_format, codec):
+            self.logger.warning(
+                "Failed to set preferred audio format %s %s for player %s",
+                codec.name,
+                audio_format,
+                self.display_name,
             )
 
-            stop_time = await self.api.group.play_media(media_stream)
-            await self.api.group.stop(stop_time)
-        except asyncio.CancelledError:
-            self.logger.debug("Playback cancelled for player %s", self.display_name)
-            raise
-        except Exception:
-            self.logger.exception("Error during playback for player %s", self.display_name)
-            raise
-        finally:
-            self.timed_client_stream = None
-
     async def set_members(
         self,
         player_ids_to_add: list[str] | None = None,
         player_ids_to_remove: list[str] | None = None,
     ) -> None:
         """Handle SET_MEMBERS command on the player."""
-        self.logger.debug(
-            "set_members called: adding %s, removing %s", player_ids_to_add, player_ids_to_remove
-        )
         for player_id in player_ids_to_remove or []:
             player = self.mass.players.get_player(player_id, True)
             player = cast("SendspinPlayer", player)  # For type checking
@@ -492,10 +487,11 @@ class SendspinPlayer(Player):
                 )
                 if image_data is not None:
                     image = await asyncio.to_thread(Image.open, BytesIO(image_data))
-                    await self.api.group.set_media_art(image, source=ArtworkSource.ALBUM)
-            else:
-                # Clear artwork if none available
-                await self.api.group.set_media_art(None, source=ArtworkSource.ALBUM)
+                    if (artwork_role := self._artwork_role) is not None:
+                        await artwork_role.set_album_artwork(image)
+            # Clear artwork if none available
+            elif (artwork_role := self._artwork_role) is not None:
+                await artwork_role.set_album_artwork(None)
 
         return artwork_url
 
@@ -526,10 +522,11 @@ class SendspinPlayer(Player):
                 )
                 if artist_image_data is not None:
                     artist_image = await asyncio.to_thread(Image.open, BytesIO(artist_image_data))
-                    await self.api.group.set_media_art(artist_image, source=ArtworkSource.ARTIST)
-            else:
-                # Clear artist artwork if none available
-                await self.api.group.set_media_art(None, source=ArtworkSource.ARTIST)
+                    if (artwork_role := self._artwork_role) is not None:
+                        await artwork_role.set_artist_artwork(artist_image)
+            # Clear artist artwork if none available
+            elif (artwork_role := self._artwork_role) is not None:
+                await artwork_role.set_artist_artwork(None)
 
     def _on_player_media_updated(self) -> None:
         """Handle callback when the current media of the player is updated."""
@@ -539,7 +536,8 @@ class SendspinPlayer(Player):
 
         if self.state.current_media is None:
             # Clear metadata when no media loaded
-            self.api.group.set_metadata(Metadata())
+            if (metadata_role := self._metadata_role) is not None:
+                metadata_role.set_metadata(Metadata())
             return
         self.mass.create_task(self.send_current_media_metadata())
 
@@ -576,11 +574,11 @@ class SendspinPlayer(Player):
         metadata = Metadata(
             title=current_media.title,
             artist=current_media.artist,
-            album_artist=None,  # TODO: extract from optional queue item
+            album_artist=None,
             album=current_media.album,
             artwork_url=current_media.image_url,
-            year=None,  # TODO: extract from optional queue item
-            track=None,  # TODO: extract from optional queue item
+            year=None,
+            track=None,
             track_duration=track_duration * 1000 if track_duration is not None else None,
             track_progress=int(current_media.corrected_elapsed_time * 1000)
             if current_media.corrected_elapsed_time
@@ -591,10 +589,59 @@ class SendspinPlayer(Player):
         )
 
         # Send metadata to the group
-        self.api.group.set_metadata(metadata)
+        if (metadata_role := self._metadata_role) is not None:
+            metadata_role.set_metadata(metadata)
+
+    async def get_config_entries(
+        self,
+        action: str | None = None,
+        values: dict[str, ConfigValueType] | None = None,
+    ) -> list[ConfigEntry]:
+        """Return all (provider/player specific) Config Entries for the player."""
+        default_entries = await super().get_config_entries(action=action, values=values)
+        entries = [
+            *default_entries,
+            CONF_ENTRY_OUTPUT_CODEC_HIDDEN,
+            CONF_ENTRY_HTTP_PROFILE_HIDDEN,
+            ConfigEntry.from_dict({**CONF_ENTRY_SAMPLE_RATES.to_dict(), "hidden": True}),
+        ]
+
+        # Build dynamic format options from player's supported formats
+        player_role = self._player_role
+        if player_role is not None:
+            supported_formats = player_role.get_supported_formats()
+            if supported_formats:
+                format_options = [
+                    ConfigValueOption(
+                        title="Automatic (let client decide)",
+                        value=SENDSPIN_FORMAT_AUTOMATIC,
+                    ),
+                ]
+                for fmt in supported_formats:
+                    format_options.append(
+                        ConfigValueOption(
+                            title=format_to_display_string(fmt),
+                            value=format_to_option_value(fmt),
+                        )
+                    )
+                entries.append(
+                    ConfigEntry(
+                        key=CONF_PREFERRED_SENDSPIN_FORMAT,
+                        type=ConfigEntryType.STRING,
+                        label="Preferred audio format",
+                        description="Select the audio format to use for playback on this player.",
+                        category="protocol_generic",
+                        default_value=SENDSPIN_FORMAT_AUTOMATIC,
+                        options=format_options,
+                        advanced=True,
+                    )
+                )
+
+        return entries
 
     async def on_unload(self) -> None:
         """Handle logic when the player is unloaded from the Player controller."""
+        await self.playback_session.close()
         await super().on_unload()
         self.unsub_event_cb()
         self.unsub_group_event_cb()
index 1866a53c46139b599de56494d19e94d144152eab..392cc46fce408b3377c01a89a64a4201ad01fdcc 100644 (file)
@@ -8,6 +8,7 @@ from typing import TYPE_CHECKING, cast
 
 from aiosendspin.server import ClientAddedEvent, ClientRemovedEvent, SendspinEvent, SendspinServer
 from music_assistant_models.enums import ProviderFeature
+from music_assistant_models.errors import AlreadyRegisteredError
 
 from music_assistant.mass import MusicAssistant
 from music_assistant.models.player_provider import PlayerProvider
@@ -42,7 +43,6 @@ class SendspinProvider(PlayerProvider):
 
     def event_cb(self, server: SendspinServer, event: SendspinEvent) -> None:
         """Event callback registered to the sendspin server."""
-        self.logger.debug("Received SendspinEvent: %s", event)
         match event:
             case ClientAddedEvent(client_id):
                 self.mass.create_task(self._handle_client_added(client_id))
@@ -52,7 +52,7 @@ class SendspinProvider(PlayerProvider):
                 self.logger.error("Unknown sendspin event: %s", event)
 
     async def _handle_client_added(self, client_id: str) -> None:
-        """Handle client added event asynchronously."""
+        """Handle a new client connection asynchronously."""
         # Wait for any pending unregister to complete before registering
         # This prevents a race condition where a slow unregister removes
         # a newly registered player after a quick reconnect
@@ -63,6 +63,11 @@ class SendspinProvider(PlayerProvider):
         if self.server_api.get_client(client_id) is None:
             self.logger.debug("Client %s gone after waiting for pending unregister", client_id)
             return
+        if self.mass.players.get_player(client_id) is not None:
+            self.logger.debug(
+                "Client %s already registered, skipping duplicate add event", client_id
+            )
+            return
         player = SendspinPlayer(self, client_id)
         self.logger.debug("Client %s connected", client_id)
         if player.device_info.manufacturer == "ESPHome" and (
@@ -74,10 +79,15 @@ class SendspinProvider(PlayerProvider):
                 player._attr_name = (
                     hass_device["name_by_user"] or hass_device["name"] or player.name
                 )
-        await self.mass.players.register(player)
+        try:
+            await self.mass.players.register(player)
+        except AlreadyRegisteredError:
+            self.logger.debug("Client %s already registered while handling add event", client_id)
+            player.unsub_event_cb()
+            player.unsub_group_event_cb()
 
     async def _handle_client_removed(self, client_id: str) -> None:
-        """Handle client removed event asynchronously."""
+        """Handle a client disconnection asynchronously."""
         self.logger.debug("Client %s disconnected", client_id)
         unregister_event = asyncio.Event()
         self._pending_unregisters[client_id] = unregister_event
@@ -115,13 +125,16 @@ class SendspinProvider(PlayerProvider):
         """
         # Disconnect all clients before stopping the server
         clients = list(self.server_api.clients)
+        connected_clients = []
         disconnect_tasks = []
         for client in clients:
-            self.logger.debug("Disconnecting client %s", client.client_id)
-            disconnect_tasks.append(client.disconnect(retry_connection=False))
+            if client.connection is None:
+                continue
+            connected_clients.append(client)
+            disconnect_tasks.append(client.connection.disconnect(retry_connection=False))
         if disconnect_tasks:
             results = await asyncio.gather(*disconnect_tasks, return_exceptions=True)
-            for client, result in zip(clients, results, strict=True):
+            for client, result in zip(connected_clients, results, strict=True):
                 if isinstance(result, Exception):
                     self.logger.warning(
                         "Error disconnecting client %s: %s", client.client_id, result
diff --git a/music_assistant/providers/sendspin/timed_client_stream.py b/music_assistant/providers/sendspin/timed_client_stream.py
deleted file mode 100644 (file)
index bb0b962..0000000
+++ /dev/null
@@ -1,331 +0,0 @@
-"""
-Timestamped multi-client audio stream for position-aware playback.
-
-This module provides a multi-client streaming implementation optimized for
-aiosendspin's synchronized multi-room audio playback. Each audio chunk is
-timestamped, allowing late-joining players to start at the correct position
-for synchronized playback across multiple devices.
-"""
-
-import asyncio
-import logging
-from collections import deque
-from collections.abc import AsyncGenerator
-from contextlib import suppress
-from uuid import UUID, uuid4
-
-from music_assistant_models.media_items import AudioFormat
-
-from music_assistant.helpers.ffmpeg import get_ffmpeg_stream
-
-LOGGER = logging.getLogger(__name__)
-
-# Minimum/target buffer retention time in seconds
-# This 10s buffer is currently required since:
-# - aiosendspin currently uses a fixed 5s buffer to allow up to ~4s of network interruption
-# - ~2s allows for ffmpeg processing time and some margin
-# - ~3s are currently needed internally by aiosendspin for initial buffering
-MIN_BUFFER_DURATION = 10.0
-# Maximum buffer duration before raising an error (safety mechanism)
-MAX_BUFFER_DURATION = MIN_BUFFER_DURATION + 5.0
-
-
-class TimedClientStream:
-    """Multi-client audio stream with timestamped chunks for synchronized playback."""
-
-    audio_source: AsyncGenerator[bytes, None]
-    """The source audio stream to read from."""
-    audio_format: AudioFormat
-    """The audio format of the source stream."""
-    chunk_buffer: deque[tuple[bytes, float]]
-    """Buffer storing chunks with their timestamps in seconds (chunk_data, timestamp_seconds)."""
-    subscriber_positions: dict[UUID, int]
-    """Subscriber positions: maps subscriber_id to position (index into chunk_buffer)."""
-    buffer_lock: asyncio.Lock
-    """Lock for buffer and shared state access."""
-    source_read_lock: asyncio.Lock
-    """Lock to serialize audio source reads."""
-    stream_ended: bool = False
-    """Track if stream has ended."""
-    current_position: float = 0.0
-    """Current position in seconds (from stream start)."""
-
-    def __init__(
-        self,
-        audio_source: AsyncGenerator[bytes, None],
-        audio_format: AudioFormat,
-    ) -> None:
-        """Initialize TimedClientStream."""
-        self.audio_source = audio_source
-        self.audio_format = audio_format
-        self.chunk_buffer = deque()
-        self.subscriber_positions = {}
-        self.buffer_lock = asyncio.Lock()
-        self.source_read_lock = asyncio.Lock()
-
-    def _get_bytes_per_second(self) -> int:
-        """Get bytes per second for the audio format."""
-        return (
-            self.audio_format.sample_rate
-            * self.audio_format.channels
-            * (self.audio_format.bit_depth // 8)
-        )
-
-    def _bytes_to_seconds(self, num_bytes: int) -> float:
-        """Convert bytes to seconds based on audio format."""
-        bytes_per_second = self._get_bytes_per_second()
-        if bytes_per_second == 0:
-            return 0.0
-        return num_bytes / bytes_per_second
-
-    def _get_buffer_duration(self) -> float:
-        """Calculate total duration of buffered chunks in seconds."""
-        if not self.chunk_buffer:
-            return 0.0
-        # Duration is from first chunk timestamp to current position
-        first_chunk_timestamp = self.chunk_buffer[0][1]
-        return self.current_position - first_chunk_timestamp
-
-    def _cleanup_old_chunks(self) -> None:
-        """Remove old chunks when all subscribers read them and min duration exceeded."""
-        # Find the oldest position still needed by any subscriber
-        if self.subscriber_positions:
-            min_position = min(self.subscriber_positions.values())
-        else:
-            min_position = len(self.chunk_buffer)
-
-        # Calculate target oldest timestamp
-        # This ensures buffer contains at least MIN_BUFFER_DURATION seconds of recent data
-        target_oldest = self.current_position - MIN_BUFFER_DURATION
-
-        # Remove old chunks that meet both conditions:
-        # 1. Before min_position (no subscriber needs them)
-        # 2. Older than target_oldest (outside minimum retention window)
-        chunks_removed = 0
-        while chunks_removed < min_position and self.chunk_buffer:
-            _chunk_bytes, chunk_timestamp = self.chunk_buffer[0]
-            if chunk_timestamp < target_oldest:
-                self.chunk_buffer.popleft()
-                chunks_removed += 1
-            else:
-                # Stop when we reach chunks we want to keep
-                break
-
-        # Adjust all subscriber positions to account for removed chunks
-        for sub_id in self.subscriber_positions:
-            self.subscriber_positions[sub_id] -= chunks_removed
-
-    async def _read_chunk_from_source(self) -> None:
-        """Read next chunk from audio source and add to buffer."""
-        try:
-            chunk = await anext(self.audio_source)
-            async with self.buffer_lock:
-                # Calculate timestamp for this chunk
-                chunk_timestamp = self.current_position
-                chunk_duration = self._bytes_to_seconds(len(chunk))
-
-                # Append chunk with its timestamp
-                self.chunk_buffer.append((chunk, chunk_timestamp))
-
-                # Update current position
-                self.current_position += chunk_duration
-
-                # Safety check: ensure buffer doesn't grow unbounded
-                if self._get_buffer_duration() > MAX_BUFFER_DURATION:
-                    msg = f"Buffer exceeded maximum duration ({MAX_BUFFER_DURATION}s)"
-                    raise RuntimeError(msg)
-        except StopAsyncIteration:
-            # Source exhausted, add EOF marker
-            async with self.buffer_lock:
-                self.chunk_buffer.append((b"", self.current_position))
-                self.stream_ended = True
-        except Exception:
-            # Source errored or was canceled, mark stream as ended
-            async with self.buffer_lock:
-                self.stream_ended = True
-            raise
-
-    async def _check_buffer(self, subscriber_id: UUID) -> bool | None:
-        """
-        Check if buffer has grown or stream ended.
-
-        REQUIRES: Caller must hold self.source_read_lock before calling.
-
-        Returns:
-            True if should continue reading loop (chunk found in buffer),
-            False if should break (stream ended),
-            None if should proceed to read from source.
-        """
-        async with self.buffer_lock:
-            position = self.subscriber_positions[subscriber_id]
-            if position < len(self.chunk_buffer):
-                # Another subscriber already read the chunk
-                return True
-            if self.stream_ended:
-                # Stream ended while waiting for source lock
-                return False
-        return None  # Continue to read from source
-
-    async def _get_chunk_from_buffer(self, subscriber_id: UUID) -> bytes | None:
-        """
-        Get next chunk from buffer for subscriber.
-
-        Returns:
-            Chunk bytes if available, None if no chunk available, or empty bytes for EOF.
-        """
-        async with self.buffer_lock:
-            position = self.subscriber_positions[subscriber_id]
-
-            # Check if we have a chunk at this position
-            if position < len(self.chunk_buffer):
-                # Chunk available in buffer
-                chunk_data, _ = self.chunk_buffer[position]
-
-                # Move to next position
-                self.subscriber_positions[subscriber_id] = position + 1
-
-                # Cleanup old chunks that no one needs
-                self._cleanup_old_chunks()
-                return chunk_data
-            if self.stream_ended:
-                # Stream ended and we've read all buffered chunks
-                return b""
-        return None
-
-    async def _cleanup_subscriber(self, subscriber_id: UUID) -> None:
-        """Clean up subscriber and close stream if no subscribers left."""
-        async with self.buffer_lock:
-            if subscriber_id in self.subscriber_positions:
-                del self.subscriber_positions[subscriber_id]
-
-            # If no subscribers left, close the stream
-            if not self.subscriber_positions and not self.stream_ended:
-                self.stream_ended = True
-                # Close the audio source generator to prevent resource leak
-                with suppress(Exception):
-                    await self.audio_source.aclose()
-
-    async def get_stream(
-        self,
-        output_format: AudioFormat,
-        filter_params: list[str] | None = None,
-    ) -> tuple[AsyncGenerator[bytes, None], float]:
-        """
-        Get (client specific encoded) ffmpeg stream.
-
-        Returns:
-            A tuple of (audio generator, actual position in seconds)
-        """
-        audio_gen, position = await self.subscribe_raw()
-
-        # Calculate frame size for alignment
-        # Frame size = channels * bytes_per_sample
-        bytes_per_frame = output_format.channels * (output_format.bit_depth // 8)
-
-        async def _stream_with_ffmpeg() -> AsyncGenerator[bytes, None]:
-            buffer = b""
-            try:
-                async for chunk in get_ffmpeg_stream(
-                    audio_input=audio_gen,
-                    input_format=self.audio_format,
-                    output_format=output_format,
-                    filter_params=filter_params,
-                ):
-                    buffer += chunk
-                    # Yield only complete frames
-                    aligned_size = (len(buffer) // bytes_per_frame) * bytes_per_frame
-                    if aligned_size > 0:
-                        yield buffer[:aligned_size]
-                        buffer = buffer[aligned_size:]
-                # Yield any remaining complete frames at end of stream
-                if buffer:
-                    aligned_size = (len(buffer) // bytes_per_frame) * bytes_per_frame
-                    if aligned_size > 0:
-                        yield buffer[:aligned_size]
-            finally:
-                # Ensure audio_gen cleanup runs immediately
-                with suppress(Exception):
-                    await audio_gen.aclose()
-
-        return _stream_with_ffmpeg(), position
-
-    async def _generate(self, subscriber_id: UUID) -> AsyncGenerator[bytes, None]:
-        """
-        Generate audio chunks for a subscriber.
-
-        Yields chunks from the buffer until the stream ends, reading from the source
-        as needed. Automatically cleans up the subscriber on exit.
-        """
-        try:
-            # Position already set above atomically with timestamp capture
-            while True:
-                # Try to get chunk from buffer
-                chunk_bytes = await self._get_chunk_from_buffer(subscriber_id)
-
-                # Release lock before yielding to avoid deadlock
-                if chunk_bytes is not None:
-                    if chunk_bytes == b"":
-                        # End of stream marker
-                        break
-                    yield chunk_bytes
-                else:
-                    # No chunk available, need to read from source
-                    # Use source_read_lock to ensure only one subscriber reads at a time
-                    async with self.source_read_lock:
-                        # Check again if buffer has grown or stream ended while waiting
-                        check_result = await self._check_buffer(subscriber_id)
-                        if check_result is True:
-                            # Another subscriber already read the chunk
-                            continue
-                        if check_result is False:
-                            # Stream ended while waiting for source lock
-                            break
-
-                        # Read next chunk from source (check_result is None)
-                        # Note: This may block if the audio_source does synchronous I/O
-                        await self._read_chunk_from_source()
-
-        finally:
-            await self._cleanup_subscriber(subscriber_id)
-
-    async def subscribe_raw(self) -> tuple[AsyncGenerator[bytes, None], float]:
-        """
-        Subscribe to the raw/unaltered audio stream.
-
-        Returns:
-            A tuple of (audio generator, actual position in seconds).
-            The position indicates where in the stream the first chunk will be from.
-
-        Note:
-            Callers must properly consume or cancel the returned generator to prevent
-            resource leaks.
-        """
-        subscriber_id = uuid4()
-
-        # Atomically capture starting position and register subscriber while holding lock
-        async with self.buffer_lock:
-            if self.chunk_buffer:
-                _, starting_position = self.chunk_buffer[0]
-                # Log buffer time range for debugging
-                newest_ts = self.chunk_buffer[-1][1]
-                oldest_relative = starting_position - self.current_position
-                newest_relative = newest_ts - self.current_position
-                LOGGER.debug(
-                    "New subscriber joining: buffer contains %.3fs (from %.3fs to %.3fs, "
-                    "current_position=%.3fs)",
-                    newest_ts - starting_position,
-                    oldest_relative,
-                    newest_relative,
-                    self.current_position,
-                )
-            else:
-                starting_position = self.current_position
-                LOGGER.debug(
-                    "New subscriber joining: buffer is empty, starting at current_position=%.3fs",
-                    self.current_position,
-                )
-            # Register subscriber at position 0 (start of buffer)
-            self.subscriber_positions[subscriber_id] = 0
-
-        # Return generator and starting position in seconds
-        return self._generate(subscriber_id), starting_position
index 429149534b2346edbc79e63c7502b7c94df4cc38..7544fe65f512391d7c8cd4cc212d6f4ee182bee1 100644 (file)
@@ -11,7 +11,7 @@ aiojellyfin==0.14.1
 aiomusiccast==0.15.0
 aiortc>=1.6.0
 aiorun==2025.1.1
-aiosendspin==3.0.0
+aiosendspin==4.0.1
 aioslimproto==3.1.5
 aiosonos==0.1.9
 aiosqlite==0.22.1