|------|-------------|
| `provider.py` | Main provider class, handles WebRTC signaling and server lifecycle |
| `player.py` | Player implementation with playback, grouping, and metadata handling |
-| `timed_client_stream.py` | Multi-client audio stream distribution with timing |
+| `playback.py` | Playback pipeline with DSP channel processing and timed frame commits |
| `__init__.py` | Provider setup and configuration |
| `manifest.json` | Provider metadata |
from typing import TYPE_CHECKING
-from .provider import SendspinProvider
+from music_assistant.providers.sendspin.provider import SendspinProvider
if TYPE_CHECKING:
from music_assistant_models.config_entries import ConfigEntry, ConfigValueType, ProviderConfig
"documentation": "https://music-assistant.io/player-support/sendspin/",
"codeowners": ["@music-assistant"],
"credits": ["[Sendspin](https://sendspin-audio.com)"],
- "requirements": ["aiosendspin==3.0.0", "av==16.1.0"],
+ "requirements": ["aiosendspin==4.0.1", "av==16.1.0"],
"builtin": true,
"allow_disable": false
}
--- /dev/null
+"""Playback session coordinator for Sendspin players."""
+
+from __future__ import annotations
+
+import asyncio
+import time
+from collections import deque
+from collections.abc import Iterator
+from contextlib import suppress
+from dataclasses import dataclass, field
+from typing import TYPE_CHECKING, Any
+from uuid import UUID, uuid4
+
+from aiosendspin.server.audio import AudioFormat as SendspinAudioFormat
+from aiosendspin.server.push_stream import MAIN_CHANNEL, PushStream
+from music_assistant_models.enums import ContentType
+from music_assistant_models.media_items.audio_format import AudioFormat
+
+from music_assistant.constants import CONF_OUTPUT_CHANNELS
+from music_assistant.helpers.audio import get_player_filter_params
+from music_assistant.helpers.ffmpeg import FFMpeg
+from music_assistant.models.player import PlayerMedia
+
+if TYPE_CHECKING:
+ from .player import SendspinPlayer
+
+
+# Same sample format expressed in both MA and Sendspin type systems.
+_PCM_FORMAT = AudioFormat(
+ content_type=ContentType.PCM_S32LE,
+ sample_rate=48000,
+ bit_depth=32,
+ channels=2,
+)
+_SENDSPIN_PCM_FORMAT = SendspinAudioFormat(
+ sample_rate=48000,
+ bit_depth=32,
+ channels=2,
+)
+# Max PCM slice fed to the producer per iteration.
+_PRODUCER_SLICE_US = 100_000
+# Max pending chunks between producer and committer before the producer blocks.
+_PRODUCER_BACKLOG_SIZE = 64
+# Backpressure threshold: push stream sleeps when buffered audio exceeds this.
+_PRODUCER_BUFFER_LIMIT_US = 30_000_000
+# Start join promotion once catchup processor lag is within this window of the history tail.
+_JOIN_PROMOTE_ARM_WINDOW_US = 2_000_000
+# Accept catchup output within this margin of the promotion target.
+_JOIN_PROMOTE_TOLERANCE_US = 50_000
+# Abort join catchup if promotion hasn't completed within this.
+_JOIN_PROMOTION_TIMEOUT_S = 15.0
+# Retain committed history this far behind real-time for late-join backfill.
+# This pre-history also warms up ffmpeg's internal filter buffers so the DSP
+# output has settled by the time the member's channel goes live.
+_HISTORY_KEEP_PAST_US = 1_000_000
+
+
+class _BufferedFfmpegProcessor:
+ """FFmpeg wrapper with small output carry-over buffer and duration-based reads."""
+
+ def __init__(self, ffmpeg: FFMpeg, audio_format: AudioFormat) -> None:
+ self._ffmpeg = ffmpeg
+ self._output_buffer = bytearray()
+ bytes_per_sample = max(1, int(audio_format.bit_depth // 8))
+ self._sample_rate = int(audio_format.sample_rate)
+ self._frame_size = bytes_per_sample * int(audio_format.channels)
+ self._bytes_per_second = self._sample_rate * self._frame_size
+ # ~25ms worth of audio per read syscall.
+ self._read_quantum_bytes = max(1, int(self._bytes_per_second * 0.025))
+ self._produced_output_us = 0
+
+ async def start(self) -> None:
+ await self._ffmpeg.start()
+
+ async def close(self) -> None:
+ await self._ffmpeg.close()
+
+ async def push(self, pcm: bytes) -> None:
+ await self._ffmpeg.write(pcm)
+
+ @property
+ def produced_output_us(self) -> int:
+ """Return cumulative output duration currently drained from ffmpeg."""
+ return self._produced_output_us
+
+ async def read_duration_us(self, duration_us: int) -> bytes:
+ """Block-read exactly `duration_us` worth of processed PCM from ffmpeg."""
+ target_bytes = self._target_bytes_for_duration_us(duration_us)
+ if target_bytes == 0:
+ return b""
+
+ while len(self._output_buffer) < target_bytes:
+ missing = target_bytes - len(self._output_buffer)
+ read_size = max(self._read_quantum_bytes, missing)
+ chunk = await self._ffmpeg.readexactly(read_size)
+ self._output_buffer.extend(chunk)
+
+ out = bytes(self._output_buffer[:target_bytes])
+ del self._output_buffer[:target_bytes]
+ return out
+
+ async def drain_available(self) -> int:
+ """Non-blocking drain of ffmpeg stdout into internal buffer.
+
+ Returns cumulative produced output duration in microseconds.
+ """
+ while True:
+ try:
+ # 1ms timeout: non-blocking check for available data.
+ chunk = await asyncio.wait_for(
+ self._ffmpeg.read(self._read_quantum_bytes),
+ timeout=0.001,
+ )
+ except TimeoutError:
+ break
+ if not chunk:
+ break
+ self._output_buffer.extend(chunk)
+ self._produced_output_us += self._duration_us_for_bytes(len(chunk))
+ if len(chunk) < self._read_quantum_bytes:
+ break
+ return self._produced_output_us
+
+ async def drain_forever(self) -> None:
+ """Continuously drain ffmpeg stdout into internal buffer until EOF."""
+ while True:
+ chunk = await self._ffmpeg.read(self._read_quantum_bytes)
+ if not chunk:
+ break
+ self._output_buffer.extend(chunk)
+ self._produced_output_us += self._duration_us_for_bytes(len(chunk))
+
+ def pop_duration_us(self, duration_us: int) -> bytes | None:
+ """Pop exactly `duration_us` from already buffered output, or None if insufficient."""
+ target_bytes = self._target_bytes_for_duration_us(duration_us)
+ if target_bytes == 0:
+ return b""
+ if len(self._output_buffer) < target_bytes:
+ return None
+ out = bytes(self._output_buffer[:target_bytes])
+ del self._output_buffer[:target_bytes]
+ return out
+
+ def buffered_duration_us(self) -> int:
+ """Return buffered output duration currently available for immediate pop."""
+ return self._duration_us_for_bytes(len(self._output_buffer))
+
+ def pop_duration_us_or_pad(self, duration_us: int, pad_tolerance_us: int) -> bytes | None:
+ """Pop target duration; if short within tolerance, pad tail with silence."""
+ target_bytes = self._target_bytes_for_duration_us(duration_us)
+ if target_bytes == 0:
+ return b""
+ available = len(self._output_buffer)
+ if available >= target_bytes:
+ out = bytes(self._output_buffer[:target_bytes])
+ del self._output_buffer[:target_bytes]
+ return out
+ short_bytes = target_bytes - available
+ short_us = self._duration_us_for_bytes(short_bytes)
+ if short_us > max(0, pad_tolerance_us):
+ return None
+ out = bytes(self._output_buffer)
+ self._output_buffer.clear()
+ return out + (b"\x00" * short_bytes)
+
+ def _duration_us_for_bytes(self, byte_count: int) -> int:
+ if byte_count <= 0 or self._sample_rate <= 0 or self._frame_size <= 0:
+ return 0
+ frames = byte_count // self._frame_size
+ if frames <= 0:
+ return 0
+ return int((frames * 1_000_000) / self._sample_rate)
+
+ def _target_bytes_for_duration_us(self, duration_us: int) -> int:
+ """Convert duration to frame-aligned PCM byte count."""
+ if duration_us <= 0 or self._sample_rate <= 0 or self._frame_size <= 0:
+ return 0
+ samples = max(0, int((duration_us * self._sample_rate + 500_000) / 1_000_000))
+ return samples * self._frame_size
+
+
+@dataclass(slots=True)
+class _HistoryChunk:
+ start_time_us: int
+ duration_us: int
+ pcm: bytes
+
+
+@dataclass(slots=True)
+class _PendingChunk:
+ pcm: bytes
+ duration_us: int
+
+
+@dataclass(slots=True)
+class _JoinCatchupState:
+ """Per-member state for a join-catchup processor replaying history through DSP.
+
+ The processor is fed historical + live PCM via ``input_queue``. Once its
+ output catches up to the live stream (within tolerance), it is promoted to
+ the member's live pipeline. See ``_inject_ready_join_historical`` for the
+ full promotion lifecycle.
+ """
+
+ processor: _BufferedFfmpegProcessor
+ input_queue: asyncio.Queue[bytes | None]
+ writer_task: asyncio.Task[None]
+ drainer_task: asyncio.Task[None]
+ snapshot_task: asyncio.Task[None] | None = None
+ # Timeline position of the first history chunk fed into the processor.
+ first_history_start_us: int | None = None
+ # Timeline position up to which PCM has been enqueued into the processor.
+ fed_until_us: int | None = None
+ # End of the history snapshot taken when catchup started.
+ history_end_us: int | None = None
+ # Locked target: once set, promotion fires when output reaches this point.
+ promotion_target_end_us: int | None = None
+ # Monotonic time when promotion was armed, used for timeout detection.
+ promotion_armed_monotonic_s: float | None = None
+ write_lock: asyncio.Lock = field(default_factory=asyncio.Lock)
+
+
+@dataclass(slots=True)
+class _PipelineConfig:
+ requires_transform: bool
+ output_channels: str
+ filter_params: tuple[str, ...]
+
+ @property
+ def signature(self) -> tuple[bool, str, tuple[str, ...]]:
+ return (self.requires_transform, self.output_channels, self.filter_params)
+
+
+@dataclass(slots=True)
+class _MemberPipeline:
+ player_id: str
+ channel_id: UUID
+ config: _PipelineConfig
+ processor: _BufferedFfmpegProcessor | None = None
+ ready: bool = False
+
+
+class SendspinPlaybackSession:
+ """Coordinates playback for a Sendspin player group leader.
+
+ The push stream supports multi-channel audio: members that need per-player
+ DSP (EQ, channel mixing, output routing) each get a dedicated ffmpeg
+ processor and a separate channel. Members without DSP share MAIN_CHANNEL
+ and receive the raw PCM directly.
+
+ Playback runs as two concurrent coroutines inside ``_run_playback``:
+
+ * **Producer** -- reads PCM from the MA stream, slices it into fixed-size
+ chunks, queues them, and writes each slice into per-member ffmpeg
+ processors (transform push) in parallel.
+ * **Consumer** -- dequeues chunks, reads the corresponding transformed
+ output from each processor (transform read), prepares all channels on
+ the push stream, commits audio, and applies backpressure via
+ ``sleep_to_limit_buffer``.
+
+ When a new member joins mid-playback, a *join-catchup* processor replays
+ committed history through the member's DSP chain so it can be promoted
+ to the live pipeline without an audible gap.
+ """
+
+ def __init__(self, player: SendspinPlayer) -> None:
+ """Initialize session coordinator bound to the owning player."""
+ self.player = player
+ self.playback_task: asyncio.Task[None] | None = None
+ self.pending_join_members: set[str] = set()
+ self._state_lock = asyncio.Lock()
+ self._members: set[str] = set()
+ self._member_pipelines: dict[str, _MemberPipeline] = {}
+ self._push_stream: PushStream | None = None
+ self._playback_running = False
+ self._timeline_start_us: int | None = None
+ self._first_commit_monotonic_us: int | None = None
+ self._produced_audio_us = 0
+ self._history: deque[_HistoryChunk] = deque()
+ self._join_catchup: dict[str, _JoinCatchupState] = {}
+ self._pipeline_config_cache: dict[str, _PipelineConfig] = {}
+ self._preassigned_channels: dict[str, UUID] = {}
+ self._mapping_dirty = True
+
+ # -- Helpers ---------------------------------------------------------------
+
+ def _attach_task_exception_logger(self, task: asyncio.Task[Any], name: str) -> None:
+ """Log unhandled exception from background task when it finishes."""
+
+ def _done_callback(done_task: asyncio.Task[Any]) -> None:
+ if done_task.cancelled():
+ return
+ with suppress(Exception):
+ exc = done_task.exception()
+ if exc is not None:
+ self.player.logger.exception(
+ "Background task failed: %s",
+ name,
+ exc_info=exc,
+ )
+
+ task.add_done_callback(_done_callback)
+
+ def _get_join_readiness(self) -> tuple[bool, str | None]:
+ """Check whether live join DSP preparation can be performed right now."""
+ if self._playback_running and self._push_stream is not None:
+ return (True, None)
+ return (False, "no active stream context")
+
+ # -- Snapshot helper -------------------------------------------------------
+
+ async def _snapshot_active_pipelines(
+ self,
+ ) -> tuple[set[str], tuple[tuple[str, _MemberPipeline], ...]]:
+ """Return (join_pending_ids, active_pipelines) under lock."""
+ async with self._state_lock:
+ members = self._members
+ return set(self._join_catchup), tuple(
+ (mid, p) for mid, p in self._member_pipelines.items() if mid in members
+ )
+
+ # -- Public API ------------------------------------------------------------
+
+ async def cancel(self, reason: str) -> None:
+ """Cancel and await the active playback task, if any."""
+ task = self.playback_task
+ if task is None:
+ return
+ if task.done():
+ if self.playback_task is task:
+ self.playback_task = None
+ return
+ self.player.logger.debug("Cancelling playback task (%s)", reason)
+ task.cancel()
+ with suppress(asyncio.CancelledError, Exception):
+ await task
+ if self.playback_task is task:
+ self.playback_task = None
+
+ async def start(self, media: PlayerMedia, restart: bool = False) -> None:
+ """Start background playback for `media`."""
+ active_task = self.playback_task
+ if active_task is not None and not active_task.done():
+ if not restart:
+ raise RuntimeError("playback already active")
+ await self.cancel("restart requested")
+ self.playback_task = asyncio.create_task(self._run_playback(media))
+
+ async def close(self) -> None:
+ """Stop playback and release all managed resources."""
+ await self.cancel("session close")
+ self.pending_join_members.clear()
+ async with self._state_lock:
+ self._members.clear()
+ self._mapping_dirty = True
+ await self._clear_member_pipelines()
+ await self._clear_join_catchup()
+ async with self._state_lock:
+ self._history.clear()
+ self._produced_audio_us = 0
+ self._timeline_start_us = None
+ self._first_commit_monotonic_us = None
+ self._pipeline_config_cache.clear()
+ self._preassigned_channels.clear()
+
+ async def add_member(self, player_id: str) -> None:
+ """Add a member to the group with DSP-aware lifecycle handling."""
+ async with self._state_lock:
+ if player_id in self._members:
+ self.pending_join_members.discard(player_id)
+ return
+ # Force a fresh channel identity for every new join cycle.
+ self._preassigned_channels[player_id] = uuid4()
+ self.pending_join_members.add(player_id)
+ try:
+ await self._start_join_catchup(player_id)
+ async with self._state_lock:
+ self._members.add(player_id)
+ self._mapping_dirty = True
+ except Exception:
+ await self._release_player_channel(player_id)
+ raise
+ finally:
+ self.pending_join_members.discard(player_id)
+
+ async def remove_member(self, player_id: str) -> None:
+ """Remove a member from the group and clean up per-member playback state."""
+ self.pending_join_members.discard(player_id)
+ async with self._state_lock:
+ self._members.discard(player_id)
+ self._mapping_dirty = True
+ self._pipeline_config_cache.pop(player_id, None)
+ self._preassigned_channels.pop(player_id, None)
+ await self._stop_join_catchup(player_id)
+ await self._release_player_channel(player_id)
+
+ async def sync_members(self, member_ids: set[str]) -> None:
+ """Reconcile session members to exactly the provided set."""
+ async with self._state_lock:
+ current_members = set(self._members)
+ for player_id in current_members - member_ids:
+ await self.remove_member(player_id)
+ for player_id in member_ids - current_members:
+ await self.add_member(player_id)
+
+ # -- Join catchup ----------------------------------------------------------
+
+ async def _start_join_catchup(self, player_id: str) -> None:
+ """Start dedicated join catchup processor fed from committed history."""
+ async with self._state_lock:
+ playback_active = self._playback_running and self._push_stream is not None
+ if not playback_active:
+ return
+
+ pipeline = await self._sync_member_pipeline(player_id)
+ if not pipeline.config.requires_transform:
+ return
+
+ await self._stop_join_catchup(player_id)
+
+ ffmpeg_obj = self._create_member_ffmpeg(pipeline.config.filter_params)
+ processor = _BufferedFfmpegProcessor(ffmpeg_obj, _PCM_FORMAT)
+ await processor.start()
+ # Bounded queue sized to hold the full buffer duration with some headroom.
+ queue_size = (_PRODUCER_BUFFER_LIMIT_US // _PRODUCER_SLICE_US) + _PRODUCER_BACKLOG_SIZE
+ input_queue: asyncio.Queue[bytes | None] = asyncio.Queue(maxsize=queue_size)
+
+ async with self._state_lock:
+ history_snapshot = list(self._history)
+ if not history_snapshot:
+ await processor.close()
+ return
+ history_end_us = history_snapshot[-1].start_time_us + history_snapshot[-1].duration_us
+
+ async def _writer() -> None:
+ while True:
+ chunk = await input_queue.get()
+ if chunk is None:
+ return
+ await processor.push(chunk)
+
+ async def _drainer() -> None:
+ await processor.drain_forever()
+
+ writer_task = asyncio.create_task(_writer())
+ drainer_task = asyncio.create_task(_drainer())
+ self._attach_task_exception_logger(writer_task, f"join_writer:{player_id}")
+ self._attach_task_exception_logger(drainer_task, f"join_drainer:{player_id}")
+
+ state = _JoinCatchupState(
+ processor=processor,
+ input_queue=input_queue,
+ writer_task=writer_task,
+ drainer_task=drainer_task,
+ history_end_us=history_end_us,
+ )
+ async with self._state_lock:
+ self._join_catchup[player_id] = state
+
+ async with self._state_lock:
+ current = self._join_catchup.get(player_id)
+ if current is not None and current.processor is processor:
+ current.snapshot_task = asyncio.create_task(
+ self._feed_join_history(player_id, processor, history_snapshot)
+ )
+ self._attach_task_exception_logger(
+ current.snapshot_task, f"join_snapshot:{player_id}"
+ )
+
+ async def _feed_join_history(
+ self,
+ player_id: str,
+ processor: _BufferedFfmpegProcessor,
+ history_snapshot: list[_HistoryChunk],
+ ) -> None:
+ """Feed historical PCM into a join-catchup processor."""
+ async with self._state_lock:
+ state = self._join_catchup.get(player_id)
+ if state is None or state.processor is not processor:
+ return
+ async with state.write_lock:
+ first_history_start_us: int | None = None
+ previous_end_us: int | None = None
+ for hist_chunk in history_snapshot:
+ if first_history_start_us is None:
+ first_history_start_us = hist_chunk.start_time_us
+ async with self._state_lock:
+ current = self._join_catchup.get(player_id)
+ if current is not None and current.processor is processor:
+ current.first_history_start_us = first_history_start_us
+ current.fed_until_us = first_history_start_us
+ if previous_end_us is not None and hist_chunk.start_time_us > previous_end_us:
+ gap_us = hist_chunk.start_time_us - previous_end_us
+ silence = self._silence_for_duration_us(gap_us)
+ if silence:
+ await self._enqueue_join_pcm(state, silence)
+ await self._enqueue_join_pcm(state, hist_chunk.pcm)
+ previous_end_us = hist_chunk.start_time_us + hist_chunk.duration_us
+ async with self._state_lock:
+ current = self._join_catchup.get(player_id)
+ if current is not None and current.processor is processor:
+ current.fed_until_us = previous_end_us
+
+ async def _stop_join_catchup(self, player_id: str) -> None:
+ """Stop and remove dedicated join catchup processor for one player."""
+ async with self._state_lock:
+ state = self._join_catchup.pop(player_id, None)
+ if state is None:
+ return
+ if state.snapshot_task is not None:
+ state.snapshot_task.cancel()
+ with suppress(asyncio.CancelledError, Exception):
+ await state.snapshot_task
+ state.writer_task.cancel()
+ with suppress(asyncio.CancelledError, Exception):
+ await state.writer_task
+ state.drainer_task.cancel()
+ with suppress(asyncio.CancelledError, Exception):
+ await state.drainer_task
+ with suppress(Exception):
+ await state.processor.close()
+
+ async def _promote_join_catchup_processor(
+ self,
+ player_id: str,
+ pipeline: _MemberPipeline,
+ target_end_us: int,
+ ) -> None:
+ """Promote join catchup processor to the member's live DSP processor."""
+ old_processor: _BufferedFfmpegProcessor | None = None
+ async with self._state_lock:
+ state = self._join_catchup.pop(player_id, None)
+ if state is None:
+ return
+ old_processor = pipeline.processor
+ pipeline.processor = state.processor
+ if state.snapshot_task is not None:
+ state.snapshot_task.cancel()
+ with suppress(asyncio.CancelledError, Exception):
+ await state.snapshot_task
+ # Let writer flush queued PCM before handoff; cancel if queue is full.
+ try:
+ state.input_queue.put_nowait(None)
+ except asyncio.QueueFull:
+ state.writer_task.cancel()
+ with suppress(asyncio.CancelledError, Exception):
+ await state.writer_task
+ state.drainer_task.cancel()
+ with suppress(asyncio.CancelledError, Exception):
+ await state.drainer_task
+ if old_processor is not None and old_processor is not state.processor:
+ with suppress(Exception):
+ await old_processor.close()
+
+ async def _clear_join_catchup(self) -> None:
+ """Stop and remove all dedicated join catchup processors."""
+ async with self._state_lock:
+ player_ids = list(self._join_catchup.keys())
+ for player_id in player_ids:
+ await self._stop_join_catchup(player_id)
+
+ async def _release_player_channel(self, player_id: str) -> None:
+ """Release per-member channel/DSP state for a removed member."""
+ async with self._state_lock:
+ pipeline = self._member_pipelines.pop(player_id, None)
+ self._preassigned_channels.pop(player_id, None)
+ if pipeline is None or pipeline.processor is None:
+ return
+ await self._close_member_ffmpeg(pipeline.processor)
+
+ # -- Playback pipeline -----------------------------------------------------
+
+ async def _run_playback(self, media: PlayerMedia) -> None: # noqa: PLR0915
+ """Run the playback pipeline for a single media session.
+
+ Pulls PCM from the MA stream, feeds main + per-member DSP channels into the
+ Sendspin push stream, and commits audio continuously. Supports dynamic group
+ membership changes and late-join historical backfill while running.
+ """
+ push_stream = self._create_push_stream()
+ async with self._state_lock:
+ self._push_stream = push_stream
+ self._playback_running = True
+ self._history.clear()
+ self._produced_audio_us = 0
+ self._timeline_start_us = None
+ self._first_commit_monotonic_us = None
+ self._mapping_dirty = True
+ # Bounded queue between producer (stream reader) and consumer (committer).
+ pending_chunks: asyncio.Queue[_PendingChunk | None] = asyncio.Queue(
+ maxsize=_PRODUCER_BACKLOG_SIZE
+ )
+ # Shadow deque mirroring pending_chunks for join-catchup backlog peeking.
+ pending_backlog: deque[_PendingChunk] = deque()
+ pending_duration_us = 0
+
+ async def _produce_pending_chunks() -> None:
+ nonlocal pending_duration_us
+ audio_source = self.player.mass.streams.get_stream(media, _PCM_FORMAT)
+ async for chunk in audio_source:
+ if not chunk:
+ continue
+ for slice_chunk in self._iter_pcm_slices(chunk, _PCM_FORMAT, _PRODUCER_SLICE_US):
+ if not slice_chunk:
+ continue
+ duration_us = self._duration_us(slice_chunk, _PCM_FORMAT)
+ if duration_us <= 0:
+ continue
+ await self._refresh_member_mappings()
+ pending = _PendingChunk(pcm=slice_chunk, duration_us=duration_us)
+ await pending_chunks.put(pending)
+ pending_backlog.append(pending)
+ pending_duration_us += duration_us
+ join_pending_ids, pipelines = await self._snapshot_active_pipelines()
+ transform_pipelines: list[_MemberPipeline] = []
+ for member_id, pipeline in pipelines:
+ if not pipeline.config.requires_transform:
+ continue
+ if member_id in join_pending_ids:
+ continue
+ transform_pipelines.append(pipeline)
+ results = await asyncio.gather(
+ *(
+ self._transform_member_chunk(pipeline, slice_chunk)
+ for pipeline in transform_pipelines
+ ),
+ return_exceptions=True,
+ )
+ for pipeline, result in zip(transform_pipelines, results, strict=True):
+ if isinstance(result, BaseException):
+ self.player.logger.warning(
+ "Transform push failed for channel %s: %s",
+ pipeline.channel_id,
+ result,
+ )
+
+ async def _commit_pending_chunks() -> None:
+ nonlocal pending_duration_us
+ while True:
+ pending = await pending_chunks.get()
+ if pending is None:
+ break
+ pending_backlog.popleft()
+ pending_duration_us = max(0, pending_duration_us - pending.duration_us)
+ await self._inject_ready_join_historical(push_stream, pending_backlog, pending.pcm)
+ push_stream.prepare_audio(
+ pending.pcm, _SENDSPIN_PCM_FORMAT, channel_id=MAIN_CHANNEL
+ )
+ join_pending_ids, pipelines = await self._snapshot_active_pipelines()
+ transform_pipelines: list[_MemberPipeline] = []
+ for member_id, pipeline in pipelines:
+ if not pipeline.config.requires_transform:
+ continue
+ if member_id in join_pending_ids:
+ continue
+ transform_pipelines.append(pipeline)
+ transformed_chunks = await asyncio.gather(
+ *(
+ self._read_member_chunk(pipeline, pending.duration_us)
+ for pipeline in transform_pipelines
+ ),
+ return_exceptions=True,
+ )
+ for pipeline, transformed_chunk in zip(
+ transform_pipelines, transformed_chunks, strict=True
+ ):
+ if isinstance(transformed_chunk, BaseException):
+ self.player.logger.warning(
+ "Transform read failed for channel %s: %s",
+ pipeline.channel_id,
+ transformed_chunk,
+ )
+ continue
+ if transformed_chunk is None:
+ continue
+ push_stream.prepare_audio(
+ transformed_chunk,
+ _SENDSPIN_PCM_FORMAT,
+ channel_id=pipeline.channel_id,
+ )
+ commit_start_us = await push_stream.commit_audio()
+ await push_stream.sleep_to_limit_buffer(_PRODUCER_BUFFER_LIMIT_US)
+ commit_now_us = int(time.monotonic_ns() / 1000)
+ committed_history_chunk = _HistoryChunk(
+ start_time_us=int(commit_start_us),
+ duration_us=pending.duration_us,
+ pcm=pending.pcm,
+ )
+ async with self._state_lock:
+ if self._timeline_start_us is None:
+ self._timeline_start_us = int(commit_start_us)
+ if self._first_commit_monotonic_us is None:
+ self._first_commit_monotonic_us = commit_now_us
+ self._history.append(committed_history_chunk)
+ self._produced_audio_us += pending.duration_us
+ self._prune_history_locked(commit_now_us)
+ await self._fanout_history_chunk_to_join_processors(committed_history_chunk)
+
+ commit_task = asyncio.create_task(_commit_pending_chunks())
+ self._attach_task_exception_logger(commit_task, "commit_pending_chunks")
+ producer_stopped_cleanly = False
+ try:
+ await _produce_pending_chunks()
+ producer_stopped_cleanly = True
+ finally:
+ if producer_stopped_cleanly and not commit_task.done():
+ # Producer finished normally; send a None sentinel so the
+ # consumer exits cleanly. The queue may be full, so retry
+ # with a deadline before falling back to cancellation.
+ sentinel_sent = False
+ deadline = time.monotonic() + 1.0
+ while not sentinel_sent and not commit_task.done():
+ try:
+ pending_chunks.put_nowait(None)
+ sentinel_sent = True
+ except asyncio.QueueFull:
+ if time.monotonic() >= deadline:
+ break
+ await asyncio.sleep(0.01)
+ if sentinel_sent:
+ with suppress(asyncio.CancelledError, Exception):
+ await commit_task
+ else:
+ commit_task.cancel()
+ with suppress(asyncio.CancelledError, Exception):
+ await commit_task
+ else:
+ commit_task.cancel()
+ with suppress(asyncio.CancelledError, Exception):
+ await commit_task
+ with suppress(Exception):
+ self._stop_push_stream()
+ await self._clear_join_catchup()
+ await self._clear_member_pipelines()
+ async with self._state_lock:
+ self._push_stream = None
+ self._playback_running = False
+ self._timeline_start_us = None
+ self._first_commit_monotonic_us = None
+ self._produced_audio_us = 0
+ self._history.clear()
+
+ # -- Join injection --------------------------------------------------------
+
+ async def _inject_ready_join_historical(
+ self,
+ push_stream: PushStream,
+ pending_backlog: deque[_PendingChunk],
+ current_pcm: bytes,
+ ) -> bool:
+ """Inject join-catchup historical audio once processor output reaches history end.
+
+ Join promotion lifecycle:
+ 1. A catchup processor is fed historical PCM and new commits in parallel.
+ 2. Once the processor's output lag falls within _JOIN_PROMOTE_ARM_WINDOW_US
+ of the history tail, promotion is "armed" and a target end timestamp is locked.
+ 3. Once output reaches the target (within _JOIN_PROMOTE_TOLERANCE_US), the
+ catchup processor is promoted to the member's live DSP pipeline.
+ 4. If promotion doesn't complete within _JOIN_PROMOTION_TIMEOUT_S, it's aborted.
+ """
+ injected_any = False
+ async with self._state_lock:
+ items = list(self._join_catchup.items())
+ for player_id, state in items:
+ produced_output_us = state.processor.produced_output_us
+ async with self._state_lock:
+ current = self._join_catchup.get(player_id)
+ if current is None or current.processor is not state.processor:
+ continue
+ first_history_start_us = current.first_history_start_us
+ fed_until_us = current.fed_until_us
+ history_end_us = current.history_end_us
+ promotion_target_end_us = current.promotion_target_end_us
+ promotion_armed_monotonic_s = current.promotion_armed_monotonic_s
+ if first_history_start_us is None or fed_until_us is None or history_end_us is None:
+ continue
+ max_ready_end_us = min(
+ fed_until_us,
+ first_history_start_us + max(0, produced_output_us),
+ )
+ if promotion_target_end_us is None:
+ lag_to_tail_us = history_end_us - max_ready_end_us
+ if lag_to_tail_us > _JOIN_PROMOTE_ARM_WINDOW_US:
+ continue
+ async with self._state_lock:
+ current = self._join_catchup.get(player_id)
+ if current is None or current.processor is not state.processor:
+ continue
+ if current.promotion_target_end_us is None:
+ current.promotion_target_end_us = history_end_us
+ current.promotion_armed_monotonic_s = time.monotonic()
+ promotion_target_end_us = current.promotion_target_end_us
+ promotion_armed_monotonic_s = current.promotion_armed_monotonic_s
+ target_end_us = promotion_target_end_us
+ if (
+ promotion_armed_monotonic_s is not None
+ and time.monotonic() - promotion_armed_monotonic_s > _JOIN_PROMOTION_TIMEOUT_S
+ ):
+ self.player.logger.error(
+ "Join promotion timed out for %s after %.1fs; dropping join catchup",
+ player_id,
+ _JOIN_PROMOTION_TIMEOUT_S,
+ )
+ await self._stop_join_catchup(player_id)
+ continue
+ if max_ready_end_us + _JOIN_PROMOTE_TOLERANCE_US < target_end_us:
+ continue
+ inject_duration_us = target_end_us - first_history_start_us
+ transformed_history = state.processor.pop_duration_us_or_pad(
+ inject_duration_us, _JOIN_PROMOTE_TOLERANCE_US
+ )
+ if transformed_history is None:
+ continue
+ pipeline = await self._sync_member_pipeline(player_id)
+ # Split the blob into slices so push_stream can yield between encodes.
+ frame_stride = (_SENDSPIN_PCM_FORMAT.bit_depth // 8) * _SENDSPIN_PCM_FORMAT.channels
+ slice_bytes = (
+ int(_SENDSPIN_PCM_FORMAT.sample_rate * _PRODUCER_SLICE_US / 1_000_000)
+ * frame_stride
+ )
+ for offset in range(0, len(transformed_history), slice_bytes):
+ push_stream.prepare_historical_audio(
+ transformed_history[offset : offset + slice_bytes],
+ _SENDSPIN_PCM_FORMAT,
+ channel_id=pipeline.channel_id,
+ start_time_us=first_history_start_us if offset == 0 else None,
+ )
+ await self._prefeed_pending_backlog_for_join(state, current_pcm, pending_backlog)
+ await self._promote_join_catchup_processor(player_id, pipeline, target_end_us)
+ injected_any = True
+ return injected_any
+
+ async def _prefeed_pending_backlog_for_join(
+ self,
+ state: _JoinCatchupState,
+ current_pcm: bytes,
+ pending_backlog: deque[_PendingChunk],
+ ) -> None:
+ """Push current chunk + queued pending chunks into join processor before promotion.
+
+ Between the last committed chunk and the next commit, there may be
+ chunks already queued by the producer that the catchup processor hasn't
+ seen yet. Feeding them now avoids a gap in transformed audio after
+ promotion.
+ """
+ await self._enqueue_join_pcm(state, current_pcm)
+ for item in list(pending_backlog):
+ await self._enqueue_join_pcm(state, item.pcm)
+
+ async def _fanout_history_chunk_to_join_processors(self, hist_chunk: _HistoryChunk) -> None:
+ """Feed newly committed history chunk into all active join-catchup processors."""
+ async with self._state_lock:
+ items = list(self._join_catchup.items())
+ for player_id, state in items:
+ async with state.write_lock:
+ # Read current state under lock.
+ async with self._state_lock:
+ current = self._join_catchup.get(player_id)
+ if current is None or current.processor is not state.processor:
+ continue
+ previous_end_us = current.fed_until_us
+ first_history_start_us = current.first_history_start_us
+ # Initialize first_history_start_us if this is the first chunk.
+ if first_history_start_us is None:
+ first_history_start_us = hist_chunk.start_time_us
+ previous_end_us = first_history_start_us
+ # Fill timeline gaps with silence.
+ if previous_end_us is not None and hist_chunk.start_time_us > previous_end_us:
+ gap_us = hist_chunk.start_time_us - previous_end_us
+ silence = self._silence_for_duration_us(gap_us)
+ if silence:
+ await self._enqueue_join_pcm(state, silence)
+ await self._enqueue_join_pcm(state, hist_chunk.pcm)
+ # Write updated state back under lock.
+ new_end_us = hist_chunk.start_time_us + hist_chunk.duration_us
+ async with self._state_lock:
+ current = self._join_catchup.get(player_id)
+ if current is not None and current.processor is state.processor:
+ if current.first_history_start_us is None:
+ current.first_history_start_us = first_history_start_us
+ if current.fed_until_us is None:
+ current.fed_until_us = first_history_start_us
+ current.fed_until_us = new_end_us
+ current.history_end_us = new_end_us
+
+ async def _enqueue_join_pcm(
+ self,
+ state: _JoinCatchupState,
+ pcm: bytes,
+ ) -> None:
+ """Enqueue PCM into a joining member writer queue.
+
+ Bails out immediately if the writer task is dead to avoid blocking
+ the commit loop on a queue with no consumer.
+ """
+ if state.writer_task.done():
+ return
+ try:
+ state.input_queue.put_nowait(pcm)
+ except asyncio.QueueFull:
+ if state.writer_task.done():
+ return
+ await state.input_queue.put(pcm)
+
+ # -- Member pipeline management --------------------------------------------
+
+ async def _refresh_member_mappings(self) -> None:
+ """Re-evaluate per-member channel mapping and DSP requirements."""
+ async with self._state_lock:
+ if not self._mapping_dirty:
+ return
+ member_ids = tuple(self._members)
+ self._mapping_dirty = False
+ for member_id in member_ids:
+ await self._sync_member_pipeline(member_id)
+
+ async def _sync_member_pipeline(self, player_id: str) -> _MemberPipeline:
+ """Create/update pipeline state for one member from current MA config."""
+ config = self._get_pipeline_config_cached(player_id)
+ release_processor: _BufferedFfmpegProcessor | None = None
+ start_processor: _BufferedFfmpegProcessor | None = None
+ async with self._state_lock:
+ current = self._member_pipelines.get(player_id)
+ if current is not None and current.config.signature == config.signature:
+ return current
+ if current and current.config.requires_transform:
+ channel_id = current.channel_id if config.requires_transform else MAIN_CHANNEL
+ release_processor = current.processor
+ elif config.requires_transform:
+ channel_id = self._get_or_create_preassigned_channel(player_id)
+ else:
+ channel_id = MAIN_CHANNEL
+ self._preassigned_channels.pop(player_id, None)
+ processor: _BufferedFfmpegProcessor | None = None
+ if config.requires_transform:
+ ffmpeg_obj = self._create_member_ffmpeg(config.filter_params)
+ processor = _BufferedFfmpegProcessor(ffmpeg_obj, _PCM_FORMAT)
+ start_processor = processor
+ pipeline = _MemberPipeline(
+ player_id=player_id,
+ channel_id=channel_id,
+ config=config,
+ processor=processor,
+ )
+ self._member_pipelines[player_id] = pipeline
+ if start_processor is not None:
+ try:
+ await start_processor.start()
+ except Exception as err:
+ async with self._state_lock:
+ if (
+ self._member_pipelines.get(player_id) is not None
+ and self._member_pipelines[player_id].processor is start_processor
+ ):
+ self._member_pipelines.pop(player_id, None)
+ with suppress(Exception):
+ await self._close_member_ffmpeg(start_processor)
+ raise RuntimeError(f"Failed to start member DSP ffmpeg for {player_id}") from err
+ if release_processor is not None:
+ await self._close_member_ffmpeg(release_processor)
+ return pipeline
+
+ def _get_pipeline_config_cached(
+ self,
+ player_id: str,
+ *,
+ force_refresh: bool = False,
+ ) -> _PipelineConfig:
+ """Return cached pipeline config for a player, calculating on cache miss."""
+ if not force_refresh and (cached := self._pipeline_config_cache.get(player_id)) is not None:
+ return cached
+ config = self._read_pipeline_config(player_id)
+ self._pipeline_config_cache[player_id] = config
+ return config
+
+ def _read_pipeline_config(self, player_id: str) -> _PipelineConfig:
+ """Read MA config and determine if member needs a dedicated DSP channel."""
+ dsp_config = self.player.mass.config.get_player_dsp_config(player_id)
+ dsp_enabled = bool(dsp_config.enabled)
+ raw_output_channels = self.player.mass.config.get_raw_player_config_value(
+ player_id,
+ CONF_OUTPUT_CHANNELS,
+ "stereo",
+ )
+ output_channels = str(raw_output_channels or "stereo").strip().lower()
+ if output_channels not in {"stereo", "left", "right", "mono"}:
+ output_channels = "stereo"
+ try:
+ filter_params = tuple(
+ get_player_filter_params(
+ self.player.mass,
+ player_id,
+ _PCM_FORMAT,
+ _PCM_FORMAT,
+ )
+ )
+ except Exception:
+ filter_params = ()
+ custom_filter_graph = any(
+ param.strip() and not param.strip().startswith("alimiter=") for param in filter_params
+ )
+ requires_transform = dsp_enabled or output_channels != "stereo" or custom_filter_graph
+ return _PipelineConfig(
+ requires_transform=requires_transform,
+ output_channels=output_channels,
+ filter_params=filter_params,
+ )
+
+ def _get_or_create_preassigned_channel(self, player_id: str) -> UUID:
+ """Return stable dedicated channel id for transform-required player."""
+ if (channel_id := self._preassigned_channels.get(player_id)) is not None:
+ return channel_id
+ channel_id = uuid4()
+ self._preassigned_channels[player_id] = channel_id
+ return channel_id
+
+ # -- FFmpeg lifecycle ------------------------------------------------------
+
+ def _create_member_ffmpeg(self, filter_params: tuple[str, ...]) -> FFMpeg:
+ """Create per-member FFMpeg for DSP pipeline."""
+ return FFMpeg(
+ audio_input="-",
+ input_format=_PCM_FORMAT,
+ output_format=_PCM_FORMAT,
+ filter_params=list(filter_params),
+ )
+
+ async def _transform_member_chunk(self, pipeline: _MemberPipeline, chunk: bytes) -> None:
+ """Push one PCM chunk into a member DSP pipeline."""
+ processor = pipeline.processor
+ if processor is None:
+ return
+ await processor.push(chunk)
+
+ async def _read_member_chunk(
+ self,
+ pipeline: _MemberPipeline,
+ duration_us: int,
+ ) -> bytes | None:
+ """Read one transformed chunk from a member DSP pipeline."""
+ processor = pipeline.processor
+ if processor is None or duration_us <= 0:
+ return b""
+ transformed = await processor.read_duration_us(duration_us)
+ if not transformed:
+ return None
+ pipeline.ready = True
+ return bytes(transformed)
+
+ async def _close_member_ffmpeg(self, processor: _BufferedFfmpegProcessor) -> None:
+ """Close an ffmpeg processor, suppressing errors."""
+ with suppress(Exception):
+ await processor.close()
+
+ async def _clear_member_pipelines(self) -> None:
+ """Release all member pipeline resources."""
+ async with self._state_lock:
+ pipelines = list(self._member_pipelines.values())
+ self._member_pipelines.clear()
+ for pipeline in pipelines:
+ if pipeline.processor is not None:
+ await self._close_member_ffmpeg(pipeline.processor)
+
+ # -- Push stream -----------------------------------------------------------
+
+ def _create_push_stream(self) -> PushStream:
+ """Create PushStream with channel resolver for per-member routing."""
+ return self.player.api.group.start_stream(channel_resolver=self._resolve_channel_for_player)
+
+ def _stop_push_stream(self) -> None:
+ """Stop the active PushStream."""
+ self.player.api.group.stop_stream()
+
+ def _resolve_channel_for_player(self, player_id: str) -> UUID:
+ """Channel resolver callback for per-player routing."""
+ pipeline = self._member_pipelines.get(player_id)
+ if pipeline is not None:
+ return pipeline.channel_id
+ # The leader always receives MAIN_CHANNEL audio directly from the
+ # commit loop; only group members get per-player DSP channels.
+ if player_id == self.player.player_id:
+ return MAIN_CHANNEL
+ # Force a fresh config read for pending/unknown joiners so the very
+ # first resolution (triggered by add_client) uses up-to-date DSP settings.
+ force = player_id not in self._members
+ config = self._get_pipeline_config_cached(player_id, force_refresh=force)
+ if not config.requires_transform:
+ return MAIN_CHANNEL
+ return self._get_or_create_preassigned_channel(player_id)
+
+ # -- History ---------------------------------------------------------------
+
+ def _prune_history_locked(self, now_monotonic_us: int) -> None:
+ """Drop old history chunks that are fully in the past."""
+ if self._timeline_start_us is None or self._first_commit_monotonic_us is None:
+ return
+ elapsed_real_us = max(0, now_monotonic_us - self._first_commit_monotonic_us)
+ source_now_us = self._timeline_start_us + elapsed_real_us
+ cutoff_us = source_now_us - _HISTORY_KEEP_PAST_US
+ while self._history and (
+ self._history[0].start_time_us + self._history[0].duration_us <= cutoff_us
+ ):
+ self._history.popleft()
+
+ # -- PCM utilities ---------------------------------------------------------
+
+ @staticmethod
+ def _duration_us(audio: bytes, audio_format: AudioFormat) -> int:
+ """Compute chunk duration from PCM payload size."""
+ bytes_per_sample = max(1, int(audio_format.bit_depth // 8))
+ bytes_per_second = (
+ int(audio_format.sample_rate) * bytes_per_sample * int(audio_format.channels)
+ )
+ if bytes_per_second <= 0:
+ return 0
+ return int((len(audio) / bytes_per_second) * 1_000_000)
+
+ @staticmethod
+ def _iter_pcm_slices(
+ audio: bytes, audio_format: AudioFormat, target_duration_us: int
+ ) -> Iterator[bytes]:
+ """Yield frame-aligned PCM slices up to target duration."""
+ if not audio:
+ return
+ bytes_per_sample = max(1, int(audio_format.bit_depth // 8))
+ frame_size = bytes_per_sample * int(audio_format.channels)
+ if frame_size <= 0:
+ yield audio
+ return
+ samples_per_slice = max(
+ 1, round((target_duration_us / 1_000_000) * int(audio_format.sample_rate))
+ )
+ slice_size = max(frame_size, samples_per_slice * frame_size)
+ offset = 0
+ audio_len = len(audio)
+ while offset < audio_len:
+ end = min(audio_len, offset + slice_size)
+ if end < audio_len:
+ aligned_end = end - (end % frame_size)
+ if aligned_end <= offset:
+ aligned_end = min(audio_len, offset + frame_size)
+ end = aligned_end
+ yield audio[offset:end]
+ offset = end
+
+ @staticmethod
+ def _silence_for_duration_us(duration_us: int) -> bytes:
+ """Generate silent PCM with frame-aligned duration for the default format."""
+ if duration_us <= 0:
+ return b""
+ bytes_per_sample = max(1, int(_PCM_FORMAT.bit_depth // 8))
+ frame_size = bytes_per_sample * int(_PCM_FORMAT.channels)
+ samples = max(0, round((duration_us / 1_000_000) * int(_PCM_FORMAT.sample_rate)))
+ return b"\x00" * (samples * frame_size)
import asyncio
import time
-from collections.abc import AsyncGenerator, Callable
+from collections.abc import Callable
from io import BytesIO
from typing import TYPE_CHECKING, cast
-from aiosendspin.models import MediaCommand
-from aiosendspin.models.types import ArtworkSource, PlaybackStateType
+from aiosendspin.models import AudioCodec, MediaCommand
+from aiosendspin.models.types import PlaybackStateType
from aiosendspin.models.types import RepeatMode as SendspinRepeatMode
-from aiosendspin.server import AudioFormat as SendspinAudioFormat
from aiosendspin.server import (
ClientEvent,
- GroupCommandEvent,
GroupEvent,
- GroupStateChangedEvent,
SendspinGroup,
VolumeChangedEvent,
)
+from aiosendspin.server.audio import AudioFormat as SendspinAudioFormat
from aiosendspin.server.client import DisconnectBehaviour
-from aiosendspin.server.events import ClientGroupChangedEvent
-from aiosendspin.server.group import (
+from aiosendspin.server.events import (
+ ClientGroupChangedEvent,
GroupDeletedEvent,
GroupMemberAddedEvent,
GroupMemberRemovedEvent,
+ GroupStateChangedEvent,
+)
+from aiosendspin.server.roles import (
+ ArtworkGroupRole,
+ ControllerEvent,
+ ControllerGroupRole,
+ ControllerNextEvent,
+ ControllerPauseEvent,
+ ControllerPlayEvent,
+ ControllerPreviousEvent,
+ ControllerRepeatEvent,
+ ControllerShuffleEvent,
+ ControllerStopEvent,
+ MetadataGroupRole,
)
-from aiosendspin.server.metadata import Metadata
-from aiosendspin.server.stream import AudioCodec, MediaStream
+from aiosendspin.server.roles.metadata.state import Metadata
+from aiosendspin.server.roles.player.types import PlayerRoleProtocol
+from music_assistant_models.config_entries import ConfigEntry, ConfigValueOption
from music_assistant_models.constants import PLAYER_CONTROL_NONE
from music_assistant_models.enums import (
- ContentType,
+ ConfigEntryType,
ImageType,
PlaybackState,
PlayerFeature,
PlayerType,
RepeatMode,
)
-from music_assistant_models.media_items import AudioFormat
from music_assistant_models.player import DeviceInfo
from PIL import Image
-from music_assistant.constants import CONF_OUTPUT_CHANNELS, CONF_OUTPUT_CODEC, INTERNAL_PCM_FORMAT
-from music_assistant.helpers.audio import get_player_filter_params
+from music_assistant.constants import (
+ CONF_ENTRY_HTTP_PROFILE_HIDDEN,
+ CONF_ENTRY_OUTPUT_CODEC_HIDDEN,
+ CONF_ENTRY_SAMPLE_RATES,
+)
from music_assistant.models.player import Player, PlayerMedia
-
-from .timed_client_stream import TimedClientStream
+from music_assistant.providers.sendspin.playback import (
+ SendspinPlaybackSession,
+)
# Supported group commands for Sendspin players
SUPPORTED_GROUP_COMMANDS = [
MediaCommand.UNSHUFFLE,
]
-if TYPE_CHECKING:
- from aiosendspin.server.client import SendspinClient
- from music_assistant_models.player_queue import PlayerQueue
- from music_assistant_models.queue_item import QueueItem
-
- from .provider import SendspinProvider
+# Config constants for Sendspin audio format
+CONF_PREFERRED_SENDSPIN_FORMAT = "preferred_sendspin_format"
+SENDSPIN_FORMAT_AUTOMATIC = "automatic"
-class MusicAssistantMediaStream(MediaStream):
- """MediaStream implementation for Music Assistant with per-player DSP support."""
-
- player_instance: SendspinPlayer
- internal_format: AudioFormat
- output_format: AudioFormat
-
- def __init__(
- self,
- *,
- main_channel_source: AsyncGenerator[bytes, None],
- main_channel_format: SendspinAudioFormat,
- player_instance: SendspinPlayer,
- internal_format: AudioFormat,
- output_format: AudioFormat,
- ) -> None:
- """
- Initialise the media stream with audio source and format for main_channel().
-
- Args:
- main_channel_source: Audio source generator for the main channel.
- main_channel_format: Audio format for the main channel (includes codec).
- player_instance: The SendspinPlayer instance for accessing mass and streams.
- internal_format: Internal processing format (float32 for headroom).
- output_format: Output PCM format (16-bit for player output).
- """
- super().__init__(
- main_channel_source=main_channel_source,
- main_channel_format=main_channel_format,
- )
- self.player_instance = player_instance
- self.internal_format = internal_format
- self.output_format = output_format
-
- async def player_channel(
- self,
- player_id: str,
- preferred_format: SendspinAudioFormat | None = None,
- position_us: int = 0,
- ) -> tuple[AsyncGenerator[bytes, None], SendspinAudioFormat, int] | None:
- """
- Get a player-specific audio stream with per-player DSP.
+def format_to_option_value(fmt: SupportedAudioFormat) -> str:
+ """Convert SupportedAudioFormat to "codec:sample_rate:bit_depth:channels"."""
+ return f"{fmt.codec.value}:{fmt.sample_rate}:{fmt.bit_depth}:{fmt.channels}"
- Args:
- player_id: Identifier for the player requesting the stream.
- preferred_format: The player's preferred native format for the stream.
- The implementation may return a different format; the library
- will handle any necessary conversion.
- position_us: Position in microseconds relative to the main_stream start.
- Used for late-joining players to sync with the main stream.
-
- Returns:
- A tuple of (audio generator, audio format, actual position in microseconds)
- or None if unavailable. If None, the main_stream is used as fallback.
- """
- mass = self.player_instance.mass
- multi_client_stream = self.player_instance.timed_client_stream
- assert multi_client_stream is not None
- dsp = mass.config.get_player_dsp_config(player_id)
- output_channels = mass.config.get_raw_player_config_value(
- player_id, CONF_OUTPUT_CHANNELS, "stereo"
- )
- if not dsp.enabled and output_channels == "stereo":
- # DSP is disabled and output is stereo, use main_stream
- return None
+def option_value_to_format(value: str) -> tuple[AudioCodec, SendspinAudioFormat] | None:
+ """Parse option value back to (AudioCodec, SendspinAudioFormat).
- # Get per-player DSP filter parameters
- filter_params = get_player_filter_params(
- mass, player_id, self.internal_format, self.output_format
+ :param value: Option value in format "codec:sample_rate:bit_depth:channels".
+ :return: Tuple of (AudioCodec, SendspinAudioFormat) or None if parsing fails.
+ """
+ try:
+ codec_str, sample_rate_str, bit_depth_str, channels_str = value.split(":")
+ codec = AudioCodec(codec_str)
+ audio_format = SendspinAudioFormat(
+ sample_rate=int(sample_rate_str),
+ bit_depth=int(bit_depth_str),
+ channels=int(channels_str),
)
+ return (codec, audio_format)
+ except (ValueError, KeyError):
+ return None
+
+
+def format_to_display_string(fmt: SupportedAudioFormat) -> str:
+ """Convert to display string like "FLAC 48kHz/24bit stereo"."""
+ codec_name = fmt.codec.name
+ sample_rate_khz = fmt.sample_rate / 1000
+ # Format sample rate: show as integer if whole number, otherwise one decimal
+ if sample_rate_khz == int(sample_rate_khz):
+ sample_rate_str = f"{int(sample_rate_khz)}kHz"
+ else:
+ sample_rate_str = f"{sample_rate_khz:.1f}kHz"
+ if fmt.channels == 2:
+ channels_str = "stereo"
+ elif fmt.channels == 1:
+ channels_str = "mono"
+ else:
+ channels_str = f"{fmt.channels}ch"
+ return f"{codec_name} {sample_rate_str}/{fmt.bit_depth}bit {channels_str}"
- # Get the stream with position (in seconds)
- stream_gen, actual_position = await multi_client_stream.get_stream(
- output_format=self.output_format,
- filter_params=filter_params,
- )
- # Convert position from seconds to microseconds for aiosendspin API
- actual_position_us = int(actual_position * 1_000_000)
+if TYPE_CHECKING:
+ from aiosendspin.models.player import SupportedAudioFormat
+ from aiosendspin.server.client import SendspinClient
+ from music_assistant_models.config_entries import ConfigValueType
+ from music_assistant_models.player_queue import PlayerQueue
+ from music_assistant_models.queue_item import QueueItem
- # Return actual position in microseconds relative to main_stream start
- self.player_instance.logger.debug(
- "Providing channel stream for player %s at position %d us",
- player_id,
- actual_position_us,
- )
- return (
- stream_gen,
- SendspinAudioFormat(
- sample_rate=self.output_format.sample_rate,
- bit_depth=self.output_format.bit_depth,
- channels=self.output_format.channels,
- codec=self._main_channel_format.codec,
- ),
- actual_position_us,
- )
+ from .provider import SendspinProvider
class SendspinPlayer(Player):
unsub_group_event_cb: Callable[[], None]
last_sent_artwork_url: str | None = None
last_sent_artist_artwork_url: str | None = None
- _playback_task: asyncio.Task[None] | None = None
- timed_client_stream: TimedClientStream | None = None
+ playback_session: SendspinPlaybackSession
is_web_player: bool = False
@property
self.api.disconnect_behaviour = DisconnectBehaviour.STOP
self.unsub_event_cb = sendspin_client.add_event_listener(self.event_cb)
self.unsub_group_event_cb = sendspin_client.group.add_event_listener(self.group_event_cb)
- sendspin_client.group.set_supported_commands(SUPPORTED_GROUP_COMMANDS)
+ if controller_role := self._controller_role:
+ controller_role.set_supported_commands(SUPPORTED_GROUP_COMMANDS)
+
+ self.playback_session = SendspinPlaybackSession(self)
self.logger = self.provider.logger.getChild(player_id)
# init some static variables
self._attr_supported_features = {
PlayerFeature.PLAY_MEDIA,
PlayerFeature.SET_MEMBERS,
- PlayerFeature.MULTI_DEVICE_DSP,
PlayerFeature.VOLUME_SET,
PlayerFeature.VOLUME_MUTE,
+ PlayerFeature.MULTI_DEVICE_DSP,
}
self._attr_can_group_with = {provider.instance_id}
self._attr_power_control = PLAYER_CONTROL_NONE
)
else:
self._attr_device_info = DeviceInfo()
- if player_client := sendspin_client.player:
- self._attr_volume_level = player_client.volume
- self._attr_volume_muted = player_client.muted
+ if sendspin_client.info.player_support:
+ for role in sendspin_client.roles_by_family("player"):
+ volume = role.get_player_volume()
+ muted = role.get_player_muted()
+ if volume is not None:
+ self._attr_volume_level = volume
+ if muted is not None:
+ self._attr_volume_muted = muted
+ if volume is not None or muted is not None:
+ break
self._attr_available = True
self.is_web_player = sendspin_client.name.startswith(
"Web (" # The regular Web Interface
self._attr_expose_to_ha_by_default = not self.is_web_player
self._attr_hidden_by_default = self.is_web_player
+ @property
+ def _artwork_role(self) -> ArtworkGroupRole | None:
+ """Get the ArtworkGroupRole for this player's group."""
+ role = self.api.group.group_role("artwork")
+ if isinstance(role, ArtworkGroupRole):
+ return role
+ return None
+
+ @property
+ def _metadata_role(self) -> MetadataGroupRole | None:
+ """Get the MetadataGroupRole for this player's group."""
+ role = self.api.group.group_role("metadata")
+ if isinstance(role, MetadataGroupRole):
+ return role
+ return None
+
+ @property
+ def _controller_role(self) -> ControllerGroupRole | None:
+ """Get the ControllerGroupRole for this player's group."""
+ role = self.api.group.group_role("controller")
+ if isinstance(role, ControllerGroupRole):
+ return role
+ return None
+
+ @property
+ def _player_role(self) -> PlayerRoleProtocol | None:
+ """Get the player role for this client (not group role)."""
+ for role in self.api.roles_by_family("player"):
+ if isinstance(role, PlayerRoleProtocol):
+ return role
+ return None
+
+ async def _handle_controller_event(self, event: ControllerEvent) -> None:
+ """Handle a controller event from the ControllerGroupRole."""
+ queue = self.mass.player_queues.get_active_queue(self.player_id)
+ match event:
+ case ControllerPlayEvent():
+ await self.mass.players.cmd_play(self.player_id)
+ case ControllerPauseEvent():
+ await self.mass.players.cmd_pause(self.player_id)
+ case ControllerStopEvent():
+ await self.mass.players.cmd_stop(self.player_id)
+ case ControllerNextEvent():
+ await self.mass.players.cmd_next_track(self.player_id)
+ case ControllerPreviousEvent():
+ await self.mass.players.cmd_previous_track(self.player_id)
+ case ControllerRepeatEvent(mode=mode) if queue:
+ match mode:
+ case SendspinRepeatMode.OFF:
+ self.mass.player_queues.set_repeat(queue.queue_id, RepeatMode.OFF)
+ case SendspinRepeatMode.ONE:
+ self.mass.player_queues.set_repeat(queue.queue_id, RepeatMode.ONE)
+ case SendspinRepeatMode.ALL:
+ self.mass.player_queues.set_repeat(queue.queue_id, RepeatMode.ALL)
+ case ControllerShuffleEvent(shuffle=shuffle) if queue:
+ await self.mass.player_queues.set_shuffle(queue.queue_id, shuffle_enabled=shuffle)
+
+ async def _sync_membership_from_group(self, group: SendspinGroup) -> None:
+ """Sync MA/player + playback session membership from authoritative group state."""
+ # Ignore stale events from a group we no longer belong to.
+ if group is not self.api.group:
+ return
+ group_client_ids = [client.client_id for client in group.clients]
+ is_leader = bool(group_client_ids) and group_client_ids[0] == self.player_id
+ desired_group_members = group_client_ids if is_leader else []
+ desired_session_members = group_client_ids[1:] if is_leader else []
+ if self._attr_group_members != desired_group_members:
+ self._attr_group_members = desired_group_members
+ self.update_state()
+ # Only use STOP when we actually lead other members.
+ self.api.disconnect_behaviour = (
+ DisconnectBehaviour.STOP
+ if is_leader and len(desired_session_members) > 0
+ else DisconnectBehaviour.UNGROUP
+ )
+ await self.playback_session.sync_members(set(desired_session_members))
+
def event_cb(self, client: SendspinClient, event: ClientEvent) -> None:
- """Event callback registered to the sendspin server."""
- self.logger.debug("Received PlayerEvent: %s", event)
+ """Event callback registered to the sendspin client."""
match event:
case VolumeChangedEvent(volume=volume, muted=muted):
self._attr_volume_level = volume
case ClientGroupChangedEvent(new_group=new_group):
self.unsub_group_event_cb()
self.unsub_group_event_cb = new_group.add_event_listener(self.group_event_cb)
+ if controller_role := self._controller_role:
+ controller_role.set_supported_commands(SUPPORTED_GROUP_COMMANDS)
+ # Cancel active playback - push stream belongs to the old group
+ self.mass.create_task(self.playback_session.cancel("group changed"))
# Sync playback state from the new group
match new_group.state:
case PlaybackStateType.PLAYING:
self._attr_playback_state = PlaybackState.PAUSED
case PlaybackStateType.STOPPED:
self._attr_playback_state = PlaybackState.IDLE
+ self._attr_elapsed_time = 0
+ self._attr_elapsed_time_last_updated = time.time()
# Update in case this is a newly created group
- new_group.set_supported_commands(SUPPORTED_GROUP_COMMANDS)
# GroupMemberAddedEvent or GroupMemberRemovedEvent will be fired before this
# so group members are already up to date at this point
- if self.synced_to is None:
- # We are the leader, stop on disconnect
- self.api.disconnect_behaviour = DisconnectBehaviour.STOP
- else:
- self.api.disconnect_behaviour = DisconnectBehaviour.UNGROUP
+ self.mass.create_task(self._sync_membership_from_group(new_group))
self.update_state()
- async def _handle_group_command(self, command: MediaCommand) -> None:
- """Handle a group command from aiosendspin."""
- queue = self.mass.player_queues.get_active_queue(self.player_id)
- match command:
- case MediaCommand.PLAY:
- await self.mass.players.cmd_play(self.player_id)
- case MediaCommand.PAUSE:
- await self.mass.players.cmd_pause(self.player_id)
- case MediaCommand.STOP:
- await self.mass.players.cmd_stop(self.player_id)
- case MediaCommand.NEXT:
- await self.mass.players.cmd_next_track(self.player_id)
- case MediaCommand.PREVIOUS:
- await self.mass.players.cmd_previous_track(self.player_id)
- case MediaCommand.REPEAT_OFF if queue:
- self.mass.player_queues.set_repeat(queue.queue_id, RepeatMode.OFF)
- case MediaCommand.REPEAT_ONE if queue:
- self.mass.player_queues.set_repeat(queue.queue_id, RepeatMode.ONE)
- case MediaCommand.REPEAT_ALL if queue:
- self.mass.player_queues.set_repeat(queue.queue_id, RepeatMode.ALL)
- case MediaCommand.SHUFFLE if queue:
- await self.mass.player_queues.set_shuffle(queue.queue_id, shuffle_enabled=True)
- case MediaCommand.UNSHUFFLE if queue:
- await self.mass.player_queues.set_shuffle(queue.queue_id, shuffle_enabled=False)
-
def group_event_cb(self, group: SendspinGroup, event: GroupEvent) -> None:
"""Event callback registered to the sendspin group this player belongs to."""
if self.synced_to is not None:
# - GroupStateChangedEvent: to update playback state when leader stops/disconnects
if not isinstance(event, (GroupMemberRemovedEvent, GroupStateChangedEvent)):
return
- self.logger.debug("Received GroupEvent: %s", event)
-
match event:
- case GroupCommandEvent(command=command):
- self.logger.debug("Group command received: %s", command)
- self.mass.create_task(self._handle_group_command(command))
case GroupStateChangedEvent(state=state):
- self.logger.debug("Group state changed to: %s", state)
match state:
case PlaybackStateType.PLAYING:
self._attr_playback_state = PlaybackState.PLAYING
self._attr_playback_state = PlaybackState.IDLE
self._attr_elapsed_time = 0
self._attr_elapsed_time_last_updated = time.time()
+ if self.synced_to is None:
+ self.mass.create_task(self.playback_session.cancel("group stopped"))
self.update_state()
case GroupMemberAddedEvent(client_id=client_id):
- self.logger.debug("Group member added: %s", client_id)
+ is_group_leader = (
+ bool(group.clients) and group.clients[0].client_id == self.player_id
+ )
+ if is_group_leader and (
+ not self._attr_group_members or self._attr_group_members[0] != self.player_id
+ ):
+ self._attr_group_members = [self.player_id, *self._attr_group_members]
if client_id not in self._attr_group_members:
self._attr_group_members.append(client_id)
self.update_state()
+ self.mass.create_task(self.playback_session.add_member(client_id))
+ self.mass.create_task(self._sync_membership_from_group(group))
case GroupMemberRemovedEvent(client_id=client_id):
- self.logger.debug("Group member removed: %s", client_id)
- self.mass.create_task(self._handle_member_removed(group, client_id))
+ self.mass.create_task(self.playback_session.remove_member(client_id))
+ self.mass.create_task(self._handle_group_member_removed(group, client_id))
+ self.mass.create_task(self._sync_membership_from_group(group))
case GroupDeletedEvent():
pass
+ case ControllerEvent() as controller_event:
+ if self.synced_to is None:
+ self.mass.create_task(self._handle_controller_event(controller_event))
- async def _handle_member_removed(self, group: SendspinGroup, client_id: str) -> None:
- """Handle group member removed event asynchronously."""
+ async def _handle_group_member_removed(self, group: SendspinGroup, client_id: str) -> None:
+ """Handle a group member being removed asynchronously."""
if client_id == self.player_id:
- if len(self._attr_group_members) > 0:
+ if len(group.clients) > 0:
# We were just removed as a leader:
# 1. stop playback on the old group
await group.stop()
- # 2. clear our members (since we are now alone)
- group_members = [
- member for member in self._attr_group_members if member != client_id
- ]
+ # 2. clear our members (since we are now alone in a new group)
self._attr_group_members = []
- # 3. assign new leader if there are members left
- if len(group_members) > 0 and (
- new_leader := self.mass.players.get_player(group_members[0])
- ):
- new_leader = cast("SendspinPlayer", new_leader)
- new_leader._attr_group_members = group_members[1:]
- new_leader.api.disconnect_behaviour = DisconnectBehaviour.STOP
- new_leader.update_state()
self.update_state()
elif client_id in self._attr_group_members:
# Someone else left our group
async def volume_set(self, volume_level: int) -> None:
"""Handle VOLUME_SET command on the player."""
- if player_client := self.api.player:
- player_client.set_volume(volume_level)
+ roles = self.api.roles_by_family("player")
+ for role in roles:
+ role.set_player_volume(volume_level)
async def volume_mute(self, muted: bool) -> None:
"""Handle VOLUME MUTE command on the player."""
- if player_client := self.api.player:
- if muted:
- player_client.mute()
- else:
- player_client.unmute()
+ roles = self.api.roles_by_family("player")
+ for role in roles:
+ role.set_player_mute(muted)
async def stop(self) -> None:
"""Stop command."""
self.logger.debug("Received STOP command on player %s", self.display_name)
- # We don't care if we stopped the stream or it was already stopped
- await self.api.group.stop()
- # Clear the playback task reference (group.stop() handles stopping the stream)
- self._playback_task = None
+ self.mark_stop_called()
self._attr_current_media = None
+ self._attr_playback_state = PlaybackState.IDLE
+ self._attr_elapsed_time = 0
+ self._attr_elapsed_time_last_updated = time.time()
self.update_state()
+ await self.playback_session.cancel("stop command")
+ await self.api.group.stop()
async def play_media(self, media: PlayerMedia) -> None:
"""Play media command."""
# playback_state will be set by the group state change event
# Stop previous stream in case we were already playing something
+ await self.playback_session.cancel("new media requested")
await self.api.group.stop()
- # Run playback in background task to immediately return
- self._playback_task = asyncio.create_task(self._run_playback(media))
+ await self.playback_session.start(media)
self.update_state()
- async def _run_playback(self, media: PlayerMedia) -> None:
- """Run the actual playback in a background task."""
- try:
- # Use 32-bit for the main channel: aiosendspin converts per player as needed
- pcm_format = AudioFormat(
- content_type=ContentType.PCM_S32LE,
- sample_rate=48000,
- bit_depth=32,
- channels=2,
- )
- flow_pcm_format = AudioFormat(
- content_type=INTERNAL_PCM_FORMAT.content_type,
- sample_rate=pcm_format.sample_rate,
- bit_depth=INTERNAL_PCM_FORMAT.bit_depth,
- channels=pcm_format.channels,
- )
-
- output_codec = cast("str", self.config.get_value(CONF_OUTPUT_CODEC, "pcm"))
+ async def on_config_updated(self) -> None:
+ """Apply preferred format when config changes."""
+ await self._apply_preferred_format()
- # Convert string codec to AudioCodec enum
- audio_codec = AudioCodec(output_codec)
+ async def _apply_preferred_format(self) -> None:
+ """Read config and call set_preferred_format() if not automatic."""
+ player_role = self._player_role
+ if player_role is None:
+ return
- # Get clean audio source in flow format (high quality internal format)
- # Format conversion and per-player DSP will be applied via player_channel
- audio_source = self.mass.streams.get_stream(media, flow_pcm_format)
+ config_value = cast(
+ "str",
+ self.config.get_value(CONF_PREFERRED_SENDSPIN_FORMAT, SENDSPIN_FORMAT_AUTOMATIC),
+ )
+ if config_value == SENDSPIN_FORMAT_AUTOMATIC:
+ # Automatic mode: don't set a preferred format, let client decide.
+ return
- # Create TimedClientStream to wrap the clean audio source
- # This distributes the audio to multiple subscribers without DSP
- self.timed_client_stream = TimedClientStream(
- audio_source=audio_source,
- audio_format=flow_pcm_format,
+ parsed = option_value_to_format(config_value)
+ if parsed is None:
+ self.logger.warning(
+ "Invalid audio format config value '%s' for player %s",
+ config_value,
+ self.display_name,
)
+ return
- # Setup the main channel subscription
- main_channel_gen, main_position = await self.timed_client_stream.get_stream(
- output_format=pcm_format,
- filter_params=None, # TODO: this should probably still include the safety limiter
- )
- assert main_position == 0.0 # first subscriber, should be zero
- media_stream = MusicAssistantMediaStream(
- main_channel_source=main_channel_gen,
- main_channel_format=SendspinAudioFormat(
- sample_rate=pcm_format.sample_rate,
- bit_depth=pcm_format.bit_depth,
- channels=pcm_format.channels,
- codec=audio_codec,
- ),
- player_instance=self,
- internal_format=flow_pcm_format,
- output_format=pcm_format,
+ codec, audio_format = parsed
+ if not player_role.set_preferred_format(audio_format, codec):
+ self.logger.warning(
+ "Failed to set preferred audio format %s %s for player %s",
+ codec.name,
+ audio_format,
+ self.display_name,
)
- stop_time = await self.api.group.play_media(media_stream)
- await self.api.group.stop(stop_time)
- except asyncio.CancelledError:
- self.logger.debug("Playback cancelled for player %s", self.display_name)
- raise
- except Exception:
- self.logger.exception("Error during playback for player %s", self.display_name)
- raise
- finally:
- self.timed_client_stream = None
-
async def set_members(
self,
player_ids_to_add: list[str] | None = None,
player_ids_to_remove: list[str] | None = None,
) -> None:
"""Handle SET_MEMBERS command on the player."""
- self.logger.debug(
- "set_members called: adding %s, removing %s", player_ids_to_add, player_ids_to_remove
- )
for player_id in player_ids_to_remove or []:
player = self.mass.players.get_player(player_id, True)
player = cast("SendspinPlayer", player) # For type checking
)
if image_data is not None:
image = await asyncio.to_thread(Image.open, BytesIO(image_data))
- await self.api.group.set_media_art(image, source=ArtworkSource.ALBUM)
- else:
- # Clear artwork if none available
- await self.api.group.set_media_art(None, source=ArtworkSource.ALBUM)
+ if (artwork_role := self._artwork_role) is not None:
+ await artwork_role.set_album_artwork(image)
+ # Clear artwork if none available
+ elif (artwork_role := self._artwork_role) is not None:
+ await artwork_role.set_album_artwork(None)
return artwork_url
)
if artist_image_data is not None:
artist_image = await asyncio.to_thread(Image.open, BytesIO(artist_image_data))
- await self.api.group.set_media_art(artist_image, source=ArtworkSource.ARTIST)
- else:
- # Clear artist artwork if none available
- await self.api.group.set_media_art(None, source=ArtworkSource.ARTIST)
+ if (artwork_role := self._artwork_role) is not None:
+ await artwork_role.set_artist_artwork(artist_image)
+ # Clear artist artwork if none available
+ elif (artwork_role := self._artwork_role) is not None:
+ await artwork_role.set_artist_artwork(None)
def _on_player_media_updated(self) -> None:
"""Handle callback when the current media of the player is updated."""
if self.state.current_media is None:
# Clear metadata when no media loaded
- self.api.group.set_metadata(Metadata())
+ if (metadata_role := self._metadata_role) is not None:
+ metadata_role.set_metadata(Metadata())
return
self.mass.create_task(self.send_current_media_metadata())
metadata = Metadata(
title=current_media.title,
artist=current_media.artist,
- album_artist=None, # TODO: extract from optional queue item
+ album_artist=None,
album=current_media.album,
artwork_url=current_media.image_url,
- year=None, # TODO: extract from optional queue item
- track=None, # TODO: extract from optional queue item
+ year=None,
+ track=None,
track_duration=track_duration * 1000 if track_duration is not None else None,
track_progress=int(current_media.corrected_elapsed_time * 1000)
if current_media.corrected_elapsed_time
)
# Send metadata to the group
- self.api.group.set_metadata(metadata)
+ if (metadata_role := self._metadata_role) is not None:
+ metadata_role.set_metadata(metadata)
+
+ async def get_config_entries(
+ self,
+ action: str | None = None,
+ values: dict[str, ConfigValueType] | None = None,
+ ) -> list[ConfigEntry]:
+ """Return all (provider/player specific) Config Entries for the player."""
+ default_entries = await super().get_config_entries(action=action, values=values)
+ entries = [
+ *default_entries,
+ CONF_ENTRY_OUTPUT_CODEC_HIDDEN,
+ CONF_ENTRY_HTTP_PROFILE_HIDDEN,
+ ConfigEntry.from_dict({**CONF_ENTRY_SAMPLE_RATES.to_dict(), "hidden": True}),
+ ]
+
+ # Build dynamic format options from player's supported formats
+ player_role = self._player_role
+ if player_role is not None:
+ supported_formats = player_role.get_supported_formats()
+ if supported_formats:
+ format_options = [
+ ConfigValueOption(
+ title="Automatic (let client decide)",
+ value=SENDSPIN_FORMAT_AUTOMATIC,
+ ),
+ ]
+ for fmt in supported_formats:
+ format_options.append(
+ ConfigValueOption(
+ title=format_to_display_string(fmt),
+ value=format_to_option_value(fmt),
+ )
+ )
+ entries.append(
+ ConfigEntry(
+ key=CONF_PREFERRED_SENDSPIN_FORMAT,
+ type=ConfigEntryType.STRING,
+ label="Preferred audio format",
+ description="Select the audio format to use for playback on this player.",
+ category="protocol_generic",
+ default_value=SENDSPIN_FORMAT_AUTOMATIC,
+ options=format_options,
+ advanced=True,
+ )
+ )
+
+ return entries
async def on_unload(self) -> None:
"""Handle logic when the player is unloaded from the Player controller."""
+ await self.playback_session.close()
await super().on_unload()
self.unsub_event_cb()
self.unsub_group_event_cb()
from aiosendspin.server import ClientAddedEvent, ClientRemovedEvent, SendspinEvent, SendspinServer
from music_assistant_models.enums import ProviderFeature
+from music_assistant_models.errors import AlreadyRegisteredError
from music_assistant.mass import MusicAssistant
from music_assistant.models.player_provider import PlayerProvider
def event_cb(self, server: SendspinServer, event: SendspinEvent) -> None:
"""Event callback registered to the sendspin server."""
- self.logger.debug("Received SendspinEvent: %s", event)
match event:
case ClientAddedEvent(client_id):
self.mass.create_task(self._handle_client_added(client_id))
self.logger.error("Unknown sendspin event: %s", event)
async def _handle_client_added(self, client_id: str) -> None:
- """Handle client added event asynchronously."""
+ """Handle a new client connection asynchronously."""
# Wait for any pending unregister to complete before registering
# This prevents a race condition where a slow unregister removes
# a newly registered player after a quick reconnect
if self.server_api.get_client(client_id) is None:
self.logger.debug("Client %s gone after waiting for pending unregister", client_id)
return
+ if self.mass.players.get_player(client_id) is not None:
+ self.logger.debug(
+ "Client %s already registered, skipping duplicate add event", client_id
+ )
+ return
player = SendspinPlayer(self, client_id)
self.logger.debug("Client %s connected", client_id)
if player.device_info.manufacturer == "ESPHome" and (
player._attr_name = (
hass_device["name_by_user"] or hass_device["name"] or player.name
)
- await self.mass.players.register(player)
+ try:
+ await self.mass.players.register(player)
+ except AlreadyRegisteredError:
+ self.logger.debug("Client %s already registered while handling add event", client_id)
+ player.unsub_event_cb()
+ player.unsub_group_event_cb()
async def _handle_client_removed(self, client_id: str) -> None:
- """Handle client removed event asynchronously."""
+ """Handle a client disconnection asynchronously."""
self.logger.debug("Client %s disconnected", client_id)
unregister_event = asyncio.Event()
self._pending_unregisters[client_id] = unregister_event
"""
# Disconnect all clients before stopping the server
clients = list(self.server_api.clients)
+ connected_clients = []
disconnect_tasks = []
for client in clients:
- self.logger.debug("Disconnecting client %s", client.client_id)
- disconnect_tasks.append(client.disconnect(retry_connection=False))
+ if client.connection is None:
+ continue
+ connected_clients.append(client)
+ disconnect_tasks.append(client.connection.disconnect(retry_connection=False))
if disconnect_tasks:
results = await asyncio.gather(*disconnect_tasks, return_exceptions=True)
- for client, result in zip(clients, results, strict=True):
+ for client, result in zip(connected_clients, results, strict=True):
if isinstance(result, Exception):
self.logger.warning(
"Error disconnecting client %s: %s", client.client_id, result
+++ /dev/null
-"""
-Timestamped multi-client audio stream for position-aware playback.
-
-This module provides a multi-client streaming implementation optimized for
-aiosendspin's synchronized multi-room audio playback. Each audio chunk is
-timestamped, allowing late-joining players to start at the correct position
-for synchronized playback across multiple devices.
-"""
-
-import asyncio
-import logging
-from collections import deque
-from collections.abc import AsyncGenerator
-from contextlib import suppress
-from uuid import UUID, uuid4
-
-from music_assistant_models.media_items import AudioFormat
-
-from music_assistant.helpers.ffmpeg import get_ffmpeg_stream
-
-LOGGER = logging.getLogger(__name__)
-
-# Minimum/target buffer retention time in seconds
-# This 10s buffer is currently required since:
-# - aiosendspin currently uses a fixed 5s buffer to allow up to ~4s of network interruption
-# - ~2s allows for ffmpeg processing time and some margin
-# - ~3s are currently needed internally by aiosendspin for initial buffering
-MIN_BUFFER_DURATION = 10.0
-# Maximum buffer duration before raising an error (safety mechanism)
-MAX_BUFFER_DURATION = MIN_BUFFER_DURATION + 5.0
-
-
-class TimedClientStream:
- """Multi-client audio stream with timestamped chunks for synchronized playback."""
-
- audio_source: AsyncGenerator[bytes, None]
- """The source audio stream to read from."""
- audio_format: AudioFormat
- """The audio format of the source stream."""
- chunk_buffer: deque[tuple[bytes, float]]
- """Buffer storing chunks with their timestamps in seconds (chunk_data, timestamp_seconds)."""
- subscriber_positions: dict[UUID, int]
- """Subscriber positions: maps subscriber_id to position (index into chunk_buffer)."""
- buffer_lock: asyncio.Lock
- """Lock for buffer and shared state access."""
- source_read_lock: asyncio.Lock
- """Lock to serialize audio source reads."""
- stream_ended: bool = False
- """Track if stream has ended."""
- current_position: float = 0.0
- """Current position in seconds (from stream start)."""
-
- def __init__(
- self,
- audio_source: AsyncGenerator[bytes, None],
- audio_format: AudioFormat,
- ) -> None:
- """Initialize TimedClientStream."""
- self.audio_source = audio_source
- self.audio_format = audio_format
- self.chunk_buffer = deque()
- self.subscriber_positions = {}
- self.buffer_lock = asyncio.Lock()
- self.source_read_lock = asyncio.Lock()
-
- def _get_bytes_per_second(self) -> int:
- """Get bytes per second for the audio format."""
- return (
- self.audio_format.sample_rate
- * self.audio_format.channels
- * (self.audio_format.bit_depth // 8)
- )
-
- def _bytes_to_seconds(self, num_bytes: int) -> float:
- """Convert bytes to seconds based on audio format."""
- bytes_per_second = self._get_bytes_per_second()
- if bytes_per_second == 0:
- return 0.0
- return num_bytes / bytes_per_second
-
- def _get_buffer_duration(self) -> float:
- """Calculate total duration of buffered chunks in seconds."""
- if not self.chunk_buffer:
- return 0.0
- # Duration is from first chunk timestamp to current position
- first_chunk_timestamp = self.chunk_buffer[0][1]
- return self.current_position - first_chunk_timestamp
-
- def _cleanup_old_chunks(self) -> None:
- """Remove old chunks when all subscribers read them and min duration exceeded."""
- # Find the oldest position still needed by any subscriber
- if self.subscriber_positions:
- min_position = min(self.subscriber_positions.values())
- else:
- min_position = len(self.chunk_buffer)
-
- # Calculate target oldest timestamp
- # This ensures buffer contains at least MIN_BUFFER_DURATION seconds of recent data
- target_oldest = self.current_position - MIN_BUFFER_DURATION
-
- # Remove old chunks that meet both conditions:
- # 1. Before min_position (no subscriber needs them)
- # 2. Older than target_oldest (outside minimum retention window)
- chunks_removed = 0
- while chunks_removed < min_position and self.chunk_buffer:
- _chunk_bytes, chunk_timestamp = self.chunk_buffer[0]
- if chunk_timestamp < target_oldest:
- self.chunk_buffer.popleft()
- chunks_removed += 1
- else:
- # Stop when we reach chunks we want to keep
- break
-
- # Adjust all subscriber positions to account for removed chunks
- for sub_id in self.subscriber_positions:
- self.subscriber_positions[sub_id] -= chunks_removed
-
- async def _read_chunk_from_source(self) -> None:
- """Read next chunk from audio source and add to buffer."""
- try:
- chunk = await anext(self.audio_source)
- async with self.buffer_lock:
- # Calculate timestamp for this chunk
- chunk_timestamp = self.current_position
- chunk_duration = self._bytes_to_seconds(len(chunk))
-
- # Append chunk with its timestamp
- self.chunk_buffer.append((chunk, chunk_timestamp))
-
- # Update current position
- self.current_position += chunk_duration
-
- # Safety check: ensure buffer doesn't grow unbounded
- if self._get_buffer_duration() > MAX_BUFFER_DURATION:
- msg = f"Buffer exceeded maximum duration ({MAX_BUFFER_DURATION}s)"
- raise RuntimeError(msg)
- except StopAsyncIteration:
- # Source exhausted, add EOF marker
- async with self.buffer_lock:
- self.chunk_buffer.append((b"", self.current_position))
- self.stream_ended = True
- except Exception:
- # Source errored or was canceled, mark stream as ended
- async with self.buffer_lock:
- self.stream_ended = True
- raise
-
- async def _check_buffer(self, subscriber_id: UUID) -> bool | None:
- """
- Check if buffer has grown or stream ended.
-
- REQUIRES: Caller must hold self.source_read_lock before calling.
-
- Returns:
- True if should continue reading loop (chunk found in buffer),
- False if should break (stream ended),
- None if should proceed to read from source.
- """
- async with self.buffer_lock:
- position = self.subscriber_positions[subscriber_id]
- if position < len(self.chunk_buffer):
- # Another subscriber already read the chunk
- return True
- if self.stream_ended:
- # Stream ended while waiting for source lock
- return False
- return None # Continue to read from source
-
- async def _get_chunk_from_buffer(self, subscriber_id: UUID) -> bytes | None:
- """
- Get next chunk from buffer for subscriber.
-
- Returns:
- Chunk bytes if available, None if no chunk available, or empty bytes for EOF.
- """
- async with self.buffer_lock:
- position = self.subscriber_positions[subscriber_id]
-
- # Check if we have a chunk at this position
- if position < len(self.chunk_buffer):
- # Chunk available in buffer
- chunk_data, _ = self.chunk_buffer[position]
-
- # Move to next position
- self.subscriber_positions[subscriber_id] = position + 1
-
- # Cleanup old chunks that no one needs
- self._cleanup_old_chunks()
- return chunk_data
- if self.stream_ended:
- # Stream ended and we've read all buffered chunks
- return b""
- return None
-
- async def _cleanup_subscriber(self, subscriber_id: UUID) -> None:
- """Clean up subscriber and close stream if no subscribers left."""
- async with self.buffer_lock:
- if subscriber_id in self.subscriber_positions:
- del self.subscriber_positions[subscriber_id]
-
- # If no subscribers left, close the stream
- if not self.subscriber_positions and not self.stream_ended:
- self.stream_ended = True
- # Close the audio source generator to prevent resource leak
- with suppress(Exception):
- await self.audio_source.aclose()
-
- async def get_stream(
- self,
- output_format: AudioFormat,
- filter_params: list[str] | None = None,
- ) -> tuple[AsyncGenerator[bytes, None], float]:
- """
- Get (client specific encoded) ffmpeg stream.
-
- Returns:
- A tuple of (audio generator, actual position in seconds)
- """
- audio_gen, position = await self.subscribe_raw()
-
- # Calculate frame size for alignment
- # Frame size = channels * bytes_per_sample
- bytes_per_frame = output_format.channels * (output_format.bit_depth // 8)
-
- async def _stream_with_ffmpeg() -> AsyncGenerator[bytes, None]:
- buffer = b""
- try:
- async for chunk in get_ffmpeg_stream(
- audio_input=audio_gen,
- input_format=self.audio_format,
- output_format=output_format,
- filter_params=filter_params,
- ):
- buffer += chunk
- # Yield only complete frames
- aligned_size = (len(buffer) // bytes_per_frame) * bytes_per_frame
- if aligned_size > 0:
- yield buffer[:aligned_size]
- buffer = buffer[aligned_size:]
- # Yield any remaining complete frames at end of stream
- if buffer:
- aligned_size = (len(buffer) // bytes_per_frame) * bytes_per_frame
- if aligned_size > 0:
- yield buffer[:aligned_size]
- finally:
- # Ensure audio_gen cleanup runs immediately
- with suppress(Exception):
- await audio_gen.aclose()
-
- return _stream_with_ffmpeg(), position
-
- async def _generate(self, subscriber_id: UUID) -> AsyncGenerator[bytes, None]:
- """
- Generate audio chunks for a subscriber.
-
- Yields chunks from the buffer until the stream ends, reading from the source
- as needed. Automatically cleans up the subscriber on exit.
- """
- try:
- # Position already set above atomically with timestamp capture
- while True:
- # Try to get chunk from buffer
- chunk_bytes = await self._get_chunk_from_buffer(subscriber_id)
-
- # Release lock before yielding to avoid deadlock
- if chunk_bytes is not None:
- if chunk_bytes == b"":
- # End of stream marker
- break
- yield chunk_bytes
- else:
- # No chunk available, need to read from source
- # Use source_read_lock to ensure only one subscriber reads at a time
- async with self.source_read_lock:
- # Check again if buffer has grown or stream ended while waiting
- check_result = await self._check_buffer(subscriber_id)
- if check_result is True:
- # Another subscriber already read the chunk
- continue
- if check_result is False:
- # Stream ended while waiting for source lock
- break
-
- # Read next chunk from source (check_result is None)
- # Note: This may block if the audio_source does synchronous I/O
- await self._read_chunk_from_source()
-
- finally:
- await self._cleanup_subscriber(subscriber_id)
-
- async def subscribe_raw(self) -> tuple[AsyncGenerator[bytes, None], float]:
- """
- Subscribe to the raw/unaltered audio stream.
-
- Returns:
- A tuple of (audio generator, actual position in seconds).
- The position indicates where in the stream the first chunk will be from.
-
- Note:
- Callers must properly consume or cancel the returned generator to prevent
- resource leaks.
- """
- subscriber_id = uuid4()
-
- # Atomically capture starting position and register subscriber while holding lock
- async with self.buffer_lock:
- if self.chunk_buffer:
- _, starting_position = self.chunk_buffer[0]
- # Log buffer time range for debugging
- newest_ts = self.chunk_buffer[-1][1]
- oldest_relative = starting_position - self.current_position
- newest_relative = newest_ts - self.current_position
- LOGGER.debug(
- "New subscriber joining: buffer contains %.3fs (from %.3fs to %.3fs, "
- "current_position=%.3fs)",
- newest_ts - starting_position,
- oldest_relative,
- newest_relative,
- self.current_position,
- )
- else:
- starting_position = self.current_position
- LOGGER.debug(
- "New subscriber joining: buffer is empty, starting at current_position=%.3fs",
- self.current_position,
- )
- # Register subscriber at position 0 (start of buffer)
- self.subscriber_positions[subscriber_id] = 0
-
- # Return generator and starting position in seconds
- return self._generate(subscriber_id), starting_position
aiomusiccast==0.15.0
aiortc>=1.6.0
aiorun==2025.1.1
-aiosendspin==3.0.0
+aiosendspin==4.0.1
aioslimproto==3.1.5
aiosonos==0.1.9
aiosqlite==0.22.1