From 7b82a3a009a9b06492bf05f145a7fa44f356b45d Mon Sep 17 00:00:00 2001 From: Maxim Raznatovski Date: Tue, 17 Feb 2026 22:46:52 +0100 Subject: [PATCH] Update Sendspin provider to version 4.0 with many improvements (#3158) --- music_assistant/providers/sendspin/README.md | 2 +- .../providers/sendspin/__init__.py | 2 +- .../providers/sendspin/manifest.json | 2 +- .../providers/sendspin/playback.py | 1154 +++++++++++++++++ music_assistant/providers/sendspin/player.py | 569 ++++---- .../providers/sendspin/provider.py | 27 +- .../providers/sendspin/timed_client_stream.py | 331 ----- requirements_all.txt | 2 +- 8 files changed, 1486 insertions(+), 603 deletions(-) create mode 100644 music_assistant/providers/sendspin/playback.py delete mode 100644 music_assistant/providers/sendspin/timed_client_stream.py diff --git a/music_assistant/providers/sendspin/README.md b/music_assistant/providers/sendspin/README.md index 24bd1584..7dd50b92 100644 --- a/music_assistant/providers/sendspin/README.md +++ b/music_assistant/providers/sendspin/README.md @@ -198,7 +198,7 @@ Sendspin players support: |------|-------------| | `provider.py` | Main provider class, handles WebRTC signaling and server lifecycle | | `player.py` | Player implementation with playback, grouping, and metadata handling | -| `timed_client_stream.py` | Multi-client audio stream distribution with timing | +| `playback.py` | Playback pipeline with DSP channel processing and timed frame commits | | `__init__.py` | Provider setup and configuration | | `manifest.json` | Provider metadata | diff --git a/music_assistant/providers/sendspin/__init__.py b/music_assistant/providers/sendspin/__init__.py index 31686d48..2108f1b5 100644 --- a/music_assistant/providers/sendspin/__init__.py +++ b/music_assistant/providers/sendspin/__init__.py @@ -8,7 +8,7 @@ from __future__ import annotations from typing import TYPE_CHECKING -from .provider import SendspinProvider +from music_assistant.providers.sendspin.provider import SendspinProvider if TYPE_CHECKING: from music_assistant_models.config_entries import ConfigEntry, ConfigValueType, ProviderConfig diff --git a/music_assistant/providers/sendspin/manifest.json b/music_assistant/providers/sendspin/manifest.json index c25c8350..1e945ecb 100644 --- a/music_assistant/providers/sendspin/manifest.json +++ b/music_assistant/providers/sendspin/manifest.json @@ -7,7 +7,7 @@ "documentation": "https://music-assistant.io/player-support/sendspin/", "codeowners": ["@music-assistant"], "credits": ["[Sendspin](https://sendspin-audio.com)"], - "requirements": ["aiosendspin==3.0.0", "av==16.1.0"], + "requirements": ["aiosendspin==4.0.1", "av==16.1.0"], "builtin": true, "allow_disable": false } diff --git a/music_assistant/providers/sendspin/playback.py b/music_assistant/providers/sendspin/playback.py new file mode 100644 index 00000000..a8ff2e89 --- /dev/null +++ b/music_assistant/providers/sendspin/playback.py @@ -0,0 +1,1154 @@ +"""Playback session coordinator for Sendspin players.""" + +from __future__ import annotations + +import asyncio +import time +from collections import deque +from collections.abc import Iterator +from contextlib import suppress +from dataclasses import dataclass, field +from typing import TYPE_CHECKING, Any +from uuid import UUID, uuid4 + +from aiosendspin.server.audio import AudioFormat as SendspinAudioFormat +from aiosendspin.server.push_stream import MAIN_CHANNEL, PushStream +from music_assistant_models.enums import ContentType +from music_assistant_models.media_items.audio_format import AudioFormat + +from music_assistant.constants import CONF_OUTPUT_CHANNELS +from music_assistant.helpers.audio import get_player_filter_params +from music_assistant.helpers.ffmpeg import FFMpeg +from music_assistant.models.player import PlayerMedia + +if TYPE_CHECKING: + from .player import SendspinPlayer + + +# Same sample format expressed in both MA and Sendspin type systems. +_PCM_FORMAT = AudioFormat( + content_type=ContentType.PCM_S32LE, + sample_rate=48000, + bit_depth=32, + channels=2, +) +_SENDSPIN_PCM_FORMAT = SendspinAudioFormat( + sample_rate=48000, + bit_depth=32, + channels=2, +) +# Max PCM slice fed to the producer per iteration. +_PRODUCER_SLICE_US = 100_000 +# Max pending chunks between producer and committer before the producer blocks. +_PRODUCER_BACKLOG_SIZE = 64 +# Backpressure threshold: push stream sleeps when buffered audio exceeds this. +_PRODUCER_BUFFER_LIMIT_US = 30_000_000 +# Start join promotion once catchup processor lag is within this window of the history tail. +_JOIN_PROMOTE_ARM_WINDOW_US = 2_000_000 +# Accept catchup output within this margin of the promotion target. +_JOIN_PROMOTE_TOLERANCE_US = 50_000 +# Abort join catchup if promotion hasn't completed within this. +_JOIN_PROMOTION_TIMEOUT_S = 15.0 +# Retain committed history this far behind real-time for late-join backfill. +# This pre-history also warms up ffmpeg's internal filter buffers so the DSP +# output has settled by the time the member's channel goes live. +_HISTORY_KEEP_PAST_US = 1_000_000 + + +class _BufferedFfmpegProcessor: + """FFmpeg wrapper with small output carry-over buffer and duration-based reads.""" + + def __init__(self, ffmpeg: FFMpeg, audio_format: AudioFormat) -> None: + self._ffmpeg = ffmpeg + self._output_buffer = bytearray() + bytes_per_sample = max(1, int(audio_format.bit_depth // 8)) + self._sample_rate = int(audio_format.sample_rate) + self._frame_size = bytes_per_sample * int(audio_format.channels) + self._bytes_per_second = self._sample_rate * self._frame_size + # ~25ms worth of audio per read syscall. + self._read_quantum_bytes = max(1, int(self._bytes_per_second * 0.025)) + self._produced_output_us = 0 + + async def start(self) -> None: + await self._ffmpeg.start() + + async def close(self) -> None: + await self._ffmpeg.close() + + async def push(self, pcm: bytes) -> None: + await self._ffmpeg.write(pcm) + + @property + def produced_output_us(self) -> int: + """Return cumulative output duration currently drained from ffmpeg.""" + return self._produced_output_us + + async def read_duration_us(self, duration_us: int) -> bytes: + """Block-read exactly `duration_us` worth of processed PCM from ffmpeg.""" + target_bytes = self._target_bytes_for_duration_us(duration_us) + if target_bytes == 0: + return b"" + + while len(self._output_buffer) < target_bytes: + missing = target_bytes - len(self._output_buffer) + read_size = max(self._read_quantum_bytes, missing) + chunk = await self._ffmpeg.readexactly(read_size) + self._output_buffer.extend(chunk) + + out = bytes(self._output_buffer[:target_bytes]) + del self._output_buffer[:target_bytes] + return out + + async def drain_available(self) -> int: + """Non-blocking drain of ffmpeg stdout into internal buffer. + + Returns cumulative produced output duration in microseconds. + """ + while True: + try: + # 1ms timeout: non-blocking check for available data. + chunk = await asyncio.wait_for( + self._ffmpeg.read(self._read_quantum_bytes), + timeout=0.001, + ) + except TimeoutError: + break + if not chunk: + break + self._output_buffer.extend(chunk) + self._produced_output_us += self._duration_us_for_bytes(len(chunk)) + if len(chunk) < self._read_quantum_bytes: + break + return self._produced_output_us + + async def drain_forever(self) -> None: + """Continuously drain ffmpeg stdout into internal buffer until EOF.""" + while True: + chunk = await self._ffmpeg.read(self._read_quantum_bytes) + if not chunk: + break + self._output_buffer.extend(chunk) + self._produced_output_us += self._duration_us_for_bytes(len(chunk)) + + def pop_duration_us(self, duration_us: int) -> bytes | None: + """Pop exactly `duration_us` from already buffered output, or None if insufficient.""" + target_bytes = self._target_bytes_for_duration_us(duration_us) + if target_bytes == 0: + return b"" + if len(self._output_buffer) < target_bytes: + return None + out = bytes(self._output_buffer[:target_bytes]) + del self._output_buffer[:target_bytes] + return out + + def buffered_duration_us(self) -> int: + """Return buffered output duration currently available for immediate pop.""" + return self._duration_us_for_bytes(len(self._output_buffer)) + + def pop_duration_us_or_pad(self, duration_us: int, pad_tolerance_us: int) -> bytes | None: + """Pop target duration; if short within tolerance, pad tail with silence.""" + target_bytes = self._target_bytes_for_duration_us(duration_us) + if target_bytes == 0: + return b"" + available = len(self._output_buffer) + if available >= target_bytes: + out = bytes(self._output_buffer[:target_bytes]) + del self._output_buffer[:target_bytes] + return out + short_bytes = target_bytes - available + short_us = self._duration_us_for_bytes(short_bytes) + if short_us > max(0, pad_tolerance_us): + return None + out = bytes(self._output_buffer) + self._output_buffer.clear() + return out + (b"\x00" * short_bytes) + + def _duration_us_for_bytes(self, byte_count: int) -> int: + if byte_count <= 0 or self._sample_rate <= 0 or self._frame_size <= 0: + return 0 + frames = byte_count // self._frame_size + if frames <= 0: + return 0 + return int((frames * 1_000_000) / self._sample_rate) + + def _target_bytes_for_duration_us(self, duration_us: int) -> int: + """Convert duration to frame-aligned PCM byte count.""" + if duration_us <= 0 or self._sample_rate <= 0 or self._frame_size <= 0: + return 0 + samples = max(0, int((duration_us * self._sample_rate + 500_000) / 1_000_000)) + return samples * self._frame_size + + +@dataclass(slots=True) +class _HistoryChunk: + start_time_us: int + duration_us: int + pcm: bytes + + +@dataclass(slots=True) +class _PendingChunk: + pcm: bytes + duration_us: int + + +@dataclass(slots=True) +class _JoinCatchupState: + """Per-member state for a join-catchup processor replaying history through DSP. + + The processor is fed historical + live PCM via ``input_queue``. Once its + output catches up to the live stream (within tolerance), it is promoted to + the member's live pipeline. See ``_inject_ready_join_historical`` for the + full promotion lifecycle. + """ + + processor: _BufferedFfmpegProcessor + input_queue: asyncio.Queue[bytes | None] + writer_task: asyncio.Task[None] + drainer_task: asyncio.Task[None] + snapshot_task: asyncio.Task[None] | None = None + # Timeline position of the first history chunk fed into the processor. + first_history_start_us: int | None = None + # Timeline position up to which PCM has been enqueued into the processor. + fed_until_us: int | None = None + # End of the history snapshot taken when catchup started. + history_end_us: int | None = None + # Locked target: once set, promotion fires when output reaches this point. + promotion_target_end_us: int | None = None + # Monotonic time when promotion was armed, used for timeout detection. + promotion_armed_monotonic_s: float | None = None + write_lock: asyncio.Lock = field(default_factory=asyncio.Lock) + + +@dataclass(slots=True) +class _PipelineConfig: + requires_transform: bool + output_channels: str + filter_params: tuple[str, ...] + + @property + def signature(self) -> tuple[bool, str, tuple[str, ...]]: + return (self.requires_transform, self.output_channels, self.filter_params) + + +@dataclass(slots=True) +class _MemberPipeline: + player_id: str + channel_id: UUID + config: _PipelineConfig + processor: _BufferedFfmpegProcessor | None = None + ready: bool = False + + +class SendspinPlaybackSession: + """Coordinates playback for a Sendspin player group leader. + + The push stream supports multi-channel audio: members that need per-player + DSP (EQ, channel mixing, output routing) each get a dedicated ffmpeg + processor and a separate channel. Members without DSP share MAIN_CHANNEL + and receive the raw PCM directly. + + Playback runs as two concurrent coroutines inside ``_run_playback``: + + * **Producer** -- reads PCM from the MA stream, slices it into fixed-size + chunks, queues them, and writes each slice into per-member ffmpeg + processors (transform push) in parallel. + * **Consumer** -- dequeues chunks, reads the corresponding transformed + output from each processor (transform read), prepares all channels on + the push stream, commits audio, and applies backpressure via + ``sleep_to_limit_buffer``. + + When a new member joins mid-playback, a *join-catchup* processor replays + committed history through the member's DSP chain so it can be promoted + to the live pipeline without an audible gap. + """ + + def __init__(self, player: SendspinPlayer) -> None: + """Initialize session coordinator bound to the owning player.""" + self.player = player + self.playback_task: asyncio.Task[None] | None = None + self.pending_join_members: set[str] = set() + self._state_lock = asyncio.Lock() + self._members: set[str] = set() + self._member_pipelines: dict[str, _MemberPipeline] = {} + self._push_stream: PushStream | None = None + self._playback_running = False + self._timeline_start_us: int | None = None + self._first_commit_monotonic_us: int | None = None + self._produced_audio_us = 0 + self._history: deque[_HistoryChunk] = deque() + self._join_catchup: dict[str, _JoinCatchupState] = {} + self._pipeline_config_cache: dict[str, _PipelineConfig] = {} + self._preassigned_channels: dict[str, UUID] = {} + self._mapping_dirty = True + + # -- Helpers --------------------------------------------------------------- + + def _attach_task_exception_logger(self, task: asyncio.Task[Any], name: str) -> None: + """Log unhandled exception from background task when it finishes.""" + + def _done_callback(done_task: asyncio.Task[Any]) -> None: + if done_task.cancelled(): + return + with suppress(Exception): + exc = done_task.exception() + if exc is not None: + self.player.logger.exception( + "Background task failed: %s", + name, + exc_info=exc, + ) + + task.add_done_callback(_done_callback) + + def _get_join_readiness(self) -> tuple[bool, str | None]: + """Check whether live join DSP preparation can be performed right now.""" + if self._playback_running and self._push_stream is not None: + return (True, None) + return (False, "no active stream context") + + # -- Snapshot helper ------------------------------------------------------- + + async def _snapshot_active_pipelines( + self, + ) -> tuple[set[str], tuple[tuple[str, _MemberPipeline], ...]]: + """Return (join_pending_ids, active_pipelines) under lock.""" + async with self._state_lock: + members = self._members + return set(self._join_catchup), tuple( + (mid, p) for mid, p in self._member_pipelines.items() if mid in members + ) + + # -- Public API ------------------------------------------------------------ + + async def cancel(self, reason: str) -> None: + """Cancel and await the active playback task, if any.""" + task = self.playback_task + if task is None: + return + if task.done(): + if self.playback_task is task: + self.playback_task = None + return + self.player.logger.debug("Cancelling playback task (%s)", reason) + task.cancel() + with suppress(asyncio.CancelledError, Exception): + await task + if self.playback_task is task: + self.playback_task = None + + async def start(self, media: PlayerMedia, restart: bool = False) -> None: + """Start background playback for `media`.""" + active_task = self.playback_task + if active_task is not None and not active_task.done(): + if not restart: + raise RuntimeError("playback already active") + await self.cancel("restart requested") + self.playback_task = asyncio.create_task(self._run_playback(media)) + + async def close(self) -> None: + """Stop playback and release all managed resources.""" + await self.cancel("session close") + self.pending_join_members.clear() + async with self._state_lock: + self._members.clear() + self._mapping_dirty = True + await self._clear_member_pipelines() + await self._clear_join_catchup() + async with self._state_lock: + self._history.clear() + self._produced_audio_us = 0 + self._timeline_start_us = None + self._first_commit_monotonic_us = None + self._pipeline_config_cache.clear() + self._preassigned_channels.clear() + + async def add_member(self, player_id: str) -> None: + """Add a member to the group with DSP-aware lifecycle handling.""" + async with self._state_lock: + if player_id in self._members: + self.pending_join_members.discard(player_id) + return + # Force a fresh channel identity for every new join cycle. + self._preassigned_channels[player_id] = uuid4() + self.pending_join_members.add(player_id) + try: + await self._start_join_catchup(player_id) + async with self._state_lock: + self._members.add(player_id) + self._mapping_dirty = True + except Exception: + await self._release_player_channel(player_id) + raise + finally: + self.pending_join_members.discard(player_id) + + async def remove_member(self, player_id: str) -> None: + """Remove a member from the group and clean up per-member playback state.""" + self.pending_join_members.discard(player_id) + async with self._state_lock: + self._members.discard(player_id) + self._mapping_dirty = True + self._pipeline_config_cache.pop(player_id, None) + self._preassigned_channels.pop(player_id, None) + await self._stop_join_catchup(player_id) + await self._release_player_channel(player_id) + + async def sync_members(self, member_ids: set[str]) -> None: + """Reconcile session members to exactly the provided set.""" + async with self._state_lock: + current_members = set(self._members) + for player_id in current_members - member_ids: + await self.remove_member(player_id) + for player_id in member_ids - current_members: + await self.add_member(player_id) + + # -- Join catchup ---------------------------------------------------------- + + async def _start_join_catchup(self, player_id: str) -> None: + """Start dedicated join catchup processor fed from committed history.""" + async with self._state_lock: + playback_active = self._playback_running and self._push_stream is not None + if not playback_active: + return + + pipeline = await self._sync_member_pipeline(player_id) + if not pipeline.config.requires_transform: + return + + await self._stop_join_catchup(player_id) + + ffmpeg_obj = self._create_member_ffmpeg(pipeline.config.filter_params) + processor = _BufferedFfmpegProcessor(ffmpeg_obj, _PCM_FORMAT) + await processor.start() + # Bounded queue sized to hold the full buffer duration with some headroom. + queue_size = (_PRODUCER_BUFFER_LIMIT_US // _PRODUCER_SLICE_US) + _PRODUCER_BACKLOG_SIZE + input_queue: asyncio.Queue[bytes | None] = asyncio.Queue(maxsize=queue_size) + + async with self._state_lock: + history_snapshot = list(self._history) + if not history_snapshot: + await processor.close() + return + history_end_us = history_snapshot[-1].start_time_us + history_snapshot[-1].duration_us + + async def _writer() -> None: + while True: + chunk = await input_queue.get() + if chunk is None: + return + await processor.push(chunk) + + async def _drainer() -> None: + await processor.drain_forever() + + writer_task = asyncio.create_task(_writer()) + drainer_task = asyncio.create_task(_drainer()) + self._attach_task_exception_logger(writer_task, f"join_writer:{player_id}") + self._attach_task_exception_logger(drainer_task, f"join_drainer:{player_id}") + + state = _JoinCatchupState( + processor=processor, + input_queue=input_queue, + writer_task=writer_task, + drainer_task=drainer_task, + history_end_us=history_end_us, + ) + async with self._state_lock: + self._join_catchup[player_id] = state + + async with self._state_lock: + current = self._join_catchup.get(player_id) + if current is not None and current.processor is processor: + current.snapshot_task = asyncio.create_task( + self._feed_join_history(player_id, processor, history_snapshot) + ) + self._attach_task_exception_logger( + current.snapshot_task, f"join_snapshot:{player_id}" + ) + + async def _feed_join_history( + self, + player_id: str, + processor: _BufferedFfmpegProcessor, + history_snapshot: list[_HistoryChunk], + ) -> None: + """Feed historical PCM into a join-catchup processor.""" + async with self._state_lock: + state = self._join_catchup.get(player_id) + if state is None or state.processor is not processor: + return + async with state.write_lock: + first_history_start_us: int | None = None + previous_end_us: int | None = None + for hist_chunk in history_snapshot: + if first_history_start_us is None: + first_history_start_us = hist_chunk.start_time_us + async with self._state_lock: + current = self._join_catchup.get(player_id) + if current is not None and current.processor is processor: + current.first_history_start_us = first_history_start_us + current.fed_until_us = first_history_start_us + if previous_end_us is not None and hist_chunk.start_time_us > previous_end_us: + gap_us = hist_chunk.start_time_us - previous_end_us + silence = self._silence_for_duration_us(gap_us) + if silence: + await self._enqueue_join_pcm(state, silence) + await self._enqueue_join_pcm(state, hist_chunk.pcm) + previous_end_us = hist_chunk.start_time_us + hist_chunk.duration_us + async with self._state_lock: + current = self._join_catchup.get(player_id) + if current is not None and current.processor is processor: + current.fed_until_us = previous_end_us + + async def _stop_join_catchup(self, player_id: str) -> None: + """Stop and remove dedicated join catchup processor for one player.""" + async with self._state_lock: + state = self._join_catchup.pop(player_id, None) + if state is None: + return + if state.snapshot_task is not None: + state.snapshot_task.cancel() + with suppress(asyncio.CancelledError, Exception): + await state.snapshot_task + state.writer_task.cancel() + with suppress(asyncio.CancelledError, Exception): + await state.writer_task + state.drainer_task.cancel() + with suppress(asyncio.CancelledError, Exception): + await state.drainer_task + with suppress(Exception): + await state.processor.close() + + async def _promote_join_catchup_processor( + self, + player_id: str, + pipeline: _MemberPipeline, + target_end_us: int, + ) -> None: + """Promote join catchup processor to the member's live DSP processor.""" + old_processor: _BufferedFfmpegProcessor | None = None + async with self._state_lock: + state = self._join_catchup.pop(player_id, None) + if state is None: + return + old_processor = pipeline.processor + pipeline.processor = state.processor + if state.snapshot_task is not None: + state.snapshot_task.cancel() + with suppress(asyncio.CancelledError, Exception): + await state.snapshot_task + # Let writer flush queued PCM before handoff; cancel if queue is full. + try: + state.input_queue.put_nowait(None) + except asyncio.QueueFull: + state.writer_task.cancel() + with suppress(asyncio.CancelledError, Exception): + await state.writer_task + state.drainer_task.cancel() + with suppress(asyncio.CancelledError, Exception): + await state.drainer_task + if old_processor is not None and old_processor is not state.processor: + with suppress(Exception): + await old_processor.close() + + async def _clear_join_catchup(self) -> None: + """Stop and remove all dedicated join catchup processors.""" + async with self._state_lock: + player_ids = list(self._join_catchup.keys()) + for player_id in player_ids: + await self._stop_join_catchup(player_id) + + async def _release_player_channel(self, player_id: str) -> None: + """Release per-member channel/DSP state for a removed member.""" + async with self._state_lock: + pipeline = self._member_pipelines.pop(player_id, None) + self._preassigned_channels.pop(player_id, None) + if pipeline is None or pipeline.processor is None: + return + await self._close_member_ffmpeg(pipeline.processor) + + # -- Playback pipeline ----------------------------------------------------- + + async def _run_playback(self, media: PlayerMedia) -> None: # noqa: PLR0915 + """Run the playback pipeline for a single media session. + + Pulls PCM from the MA stream, feeds main + per-member DSP channels into the + Sendspin push stream, and commits audio continuously. Supports dynamic group + membership changes and late-join historical backfill while running. + """ + push_stream = self._create_push_stream() + async with self._state_lock: + self._push_stream = push_stream + self._playback_running = True + self._history.clear() + self._produced_audio_us = 0 + self._timeline_start_us = None + self._first_commit_monotonic_us = None + self._mapping_dirty = True + # Bounded queue between producer (stream reader) and consumer (committer). + pending_chunks: asyncio.Queue[_PendingChunk | None] = asyncio.Queue( + maxsize=_PRODUCER_BACKLOG_SIZE + ) + # Shadow deque mirroring pending_chunks for join-catchup backlog peeking. + pending_backlog: deque[_PendingChunk] = deque() + pending_duration_us = 0 + + async def _produce_pending_chunks() -> None: + nonlocal pending_duration_us + audio_source = self.player.mass.streams.get_stream(media, _PCM_FORMAT) + async for chunk in audio_source: + if not chunk: + continue + for slice_chunk in self._iter_pcm_slices(chunk, _PCM_FORMAT, _PRODUCER_SLICE_US): + if not slice_chunk: + continue + duration_us = self._duration_us(slice_chunk, _PCM_FORMAT) + if duration_us <= 0: + continue + await self._refresh_member_mappings() + pending = _PendingChunk(pcm=slice_chunk, duration_us=duration_us) + await pending_chunks.put(pending) + pending_backlog.append(pending) + pending_duration_us += duration_us + join_pending_ids, pipelines = await self._snapshot_active_pipelines() + transform_pipelines: list[_MemberPipeline] = [] + for member_id, pipeline in pipelines: + if not pipeline.config.requires_transform: + continue + if member_id in join_pending_ids: + continue + transform_pipelines.append(pipeline) + results = await asyncio.gather( + *( + self._transform_member_chunk(pipeline, slice_chunk) + for pipeline in transform_pipelines + ), + return_exceptions=True, + ) + for pipeline, result in zip(transform_pipelines, results, strict=True): + if isinstance(result, BaseException): + self.player.logger.warning( + "Transform push failed for channel %s: %s", + pipeline.channel_id, + result, + ) + + async def _commit_pending_chunks() -> None: + nonlocal pending_duration_us + while True: + pending = await pending_chunks.get() + if pending is None: + break + pending_backlog.popleft() + pending_duration_us = max(0, pending_duration_us - pending.duration_us) + await self._inject_ready_join_historical(push_stream, pending_backlog, pending.pcm) + push_stream.prepare_audio( + pending.pcm, _SENDSPIN_PCM_FORMAT, channel_id=MAIN_CHANNEL + ) + join_pending_ids, pipelines = await self._snapshot_active_pipelines() + transform_pipelines: list[_MemberPipeline] = [] + for member_id, pipeline in pipelines: + if not pipeline.config.requires_transform: + continue + if member_id in join_pending_ids: + continue + transform_pipelines.append(pipeline) + transformed_chunks = await asyncio.gather( + *( + self._read_member_chunk(pipeline, pending.duration_us) + for pipeline in transform_pipelines + ), + return_exceptions=True, + ) + for pipeline, transformed_chunk in zip( + transform_pipelines, transformed_chunks, strict=True + ): + if isinstance(transformed_chunk, BaseException): + self.player.logger.warning( + "Transform read failed for channel %s: %s", + pipeline.channel_id, + transformed_chunk, + ) + continue + if transformed_chunk is None: + continue + push_stream.prepare_audio( + transformed_chunk, + _SENDSPIN_PCM_FORMAT, + channel_id=pipeline.channel_id, + ) + commit_start_us = await push_stream.commit_audio() + await push_stream.sleep_to_limit_buffer(_PRODUCER_BUFFER_LIMIT_US) + commit_now_us = int(time.monotonic_ns() / 1000) + committed_history_chunk = _HistoryChunk( + start_time_us=int(commit_start_us), + duration_us=pending.duration_us, + pcm=pending.pcm, + ) + async with self._state_lock: + if self._timeline_start_us is None: + self._timeline_start_us = int(commit_start_us) + if self._first_commit_monotonic_us is None: + self._first_commit_monotonic_us = commit_now_us + self._history.append(committed_history_chunk) + self._produced_audio_us += pending.duration_us + self._prune_history_locked(commit_now_us) + await self._fanout_history_chunk_to_join_processors(committed_history_chunk) + + commit_task = asyncio.create_task(_commit_pending_chunks()) + self._attach_task_exception_logger(commit_task, "commit_pending_chunks") + producer_stopped_cleanly = False + try: + await _produce_pending_chunks() + producer_stopped_cleanly = True + finally: + if producer_stopped_cleanly and not commit_task.done(): + # Producer finished normally; send a None sentinel so the + # consumer exits cleanly. The queue may be full, so retry + # with a deadline before falling back to cancellation. + sentinel_sent = False + deadline = time.monotonic() + 1.0 + while not sentinel_sent and not commit_task.done(): + try: + pending_chunks.put_nowait(None) + sentinel_sent = True + except asyncio.QueueFull: + if time.monotonic() >= deadline: + break + await asyncio.sleep(0.01) + if sentinel_sent: + with suppress(asyncio.CancelledError, Exception): + await commit_task + else: + commit_task.cancel() + with suppress(asyncio.CancelledError, Exception): + await commit_task + else: + commit_task.cancel() + with suppress(asyncio.CancelledError, Exception): + await commit_task + with suppress(Exception): + self._stop_push_stream() + await self._clear_join_catchup() + await self._clear_member_pipelines() + async with self._state_lock: + self._push_stream = None + self._playback_running = False + self._timeline_start_us = None + self._first_commit_monotonic_us = None + self._produced_audio_us = 0 + self._history.clear() + + # -- Join injection -------------------------------------------------------- + + async def _inject_ready_join_historical( + self, + push_stream: PushStream, + pending_backlog: deque[_PendingChunk], + current_pcm: bytes, + ) -> bool: + """Inject join-catchup historical audio once processor output reaches history end. + + Join promotion lifecycle: + 1. A catchup processor is fed historical PCM and new commits in parallel. + 2. Once the processor's output lag falls within _JOIN_PROMOTE_ARM_WINDOW_US + of the history tail, promotion is "armed" and a target end timestamp is locked. + 3. Once output reaches the target (within _JOIN_PROMOTE_TOLERANCE_US), the + catchup processor is promoted to the member's live DSP pipeline. + 4. If promotion doesn't complete within _JOIN_PROMOTION_TIMEOUT_S, it's aborted. + """ + injected_any = False + async with self._state_lock: + items = list(self._join_catchup.items()) + for player_id, state in items: + produced_output_us = state.processor.produced_output_us + async with self._state_lock: + current = self._join_catchup.get(player_id) + if current is None or current.processor is not state.processor: + continue + first_history_start_us = current.first_history_start_us + fed_until_us = current.fed_until_us + history_end_us = current.history_end_us + promotion_target_end_us = current.promotion_target_end_us + promotion_armed_monotonic_s = current.promotion_armed_monotonic_s + if first_history_start_us is None or fed_until_us is None or history_end_us is None: + continue + max_ready_end_us = min( + fed_until_us, + first_history_start_us + max(0, produced_output_us), + ) + if promotion_target_end_us is None: + lag_to_tail_us = history_end_us - max_ready_end_us + if lag_to_tail_us > _JOIN_PROMOTE_ARM_WINDOW_US: + continue + async with self._state_lock: + current = self._join_catchup.get(player_id) + if current is None or current.processor is not state.processor: + continue + if current.promotion_target_end_us is None: + current.promotion_target_end_us = history_end_us + current.promotion_armed_monotonic_s = time.monotonic() + promotion_target_end_us = current.promotion_target_end_us + promotion_armed_monotonic_s = current.promotion_armed_monotonic_s + target_end_us = promotion_target_end_us + if ( + promotion_armed_monotonic_s is not None + and time.monotonic() - promotion_armed_monotonic_s > _JOIN_PROMOTION_TIMEOUT_S + ): + self.player.logger.error( + "Join promotion timed out for %s after %.1fs; dropping join catchup", + player_id, + _JOIN_PROMOTION_TIMEOUT_S, + ) + await self._stop_join_catchup(player_id) + continue + if max_ready_end_us + _JOIN_PROMOTE_TOLERANCE_US < target_end_us: + continue + inject_duration_us = target_end_us - first_history_start_us + transformed_history = state.processor.pop_duration_us_or_pad( + inject_duration_us, _JOIN_PROMOTE_TOLERANCE_US + ) + if transformed_history is None: + continue + pipeline = await self._sync_member_pipeline(player_id) + # Split the blob into slices so push_stream can yield between encodes. + frame_stride = (_SENDSPIN_PCM_FORMAT.bit_depth // 8) * _SENDSPIN_PCM_FORMAT.channels + slice_bytes = ( + int(_SENDSPIN_PCM_FORMAT.sample_rate * _PRODUCER_SLICE_US / 1_000_000) + * frame_stride + ) + for offset in range(0, len(transformed_history), slice_bytes): + push_stream.prepare_historical_audio( + transformed_history[offset : offset + slice_bytes], + _SENDSPIN_PCM_FORMAT, + channel_id=pipeline.channel_id, + start_time_us=first_history_start_us if offset == 0 else None, + ) + await self._prefeed_pending_backlog_for_join(state, current_pcm, pending_backlog) + await self._promote_join_catchup_processor(player_id, pipeline, target_end_us) + injected_any = True + return injected_any + + async def _prefeed_pending_backlog_for_join( + self, + state: _JoinCatchupState, + current_pcm: bytes, + pending_backlog: deque[_PendingChunk], + ) -> None: + """Push current chunk + queued pending chunks into join processor before promotion. + + Between the last committed chunk and the next commit, there may be + chunks already queued by the producer that the catchup processor hasn't + seen yet. Feeding them now avoids a gap in transformed audio after + promotion. + """ + await self._enqueue_join_pcm(state, current_pcm) + for item in list(pending_backlog): + await self._enqueue_join_pcm(state, item.pcm) + + async def _fanout_history_chunk_to_join_processors(self, hist_chunk: _HistoryChunk) -> None: + """Feed newly committed history chunk into all active join-catchup processors.""" + async with self._state_lock: + items = list(self._join_catchup.items()) + for player_id, state in items: + async with state.write_lock: + # Read current state under lock. + async with self._state_lock: + current = self._join_catchup.get(player_id) + if current is None or current.processor is not state.processor: + continue + previous_end_us = current.fed_until_us + first_history_start_us = current.first_history_start_us + # Initialize first_history_start_us if this is the first chunk. + if first_history_start_us is None: + first_history_start_us = hist_chunk.start_time_us + previous_end_us = first_history_start_us + # Fill timeline gaps with silence. + if previous_end_us is not None and hist_chunk.start_time_us > previous_end_us: + gap_us = hist_chunk.start_time_us - previous_end_us + silence = self._silence_for_duration_us(gap_us) + if silence: + await self._enqueue_join_pcm(state, silence) + await self._enqueue_join_pcm(state, hist_chunk.pcm) + # Write updated state back under lock. + new_end_us = hist_chunk.start_time_us + hist_chunk.duration_us + async with self._state_lock: + current = self._join_catchup.get(player_id) + if current is not None and current.processor is state.processor: + if current.first_history_start_us is None: + current.first_history_start_us = first_history_start_us + if current.fed_until_us is None: + current.fed_until_us = first_history_start_us + current.fed_until_us = new_end_us + current.history_end_us = new_end_us + + async def _enqueue_join_pcm( + self, + state: _JoinCatchupState, + pcm: bytes, + ) -> None: + """Enqueue PCM into a joining member writer queue. + + Bails out immediately if the writer task is dead to avoid blocking + the commit loop on a queue with no consumer. + """ + if state.writer_task.done(): + return + try: + state.input_queue.put_nowait(pcm) + except asyncio.QueueFull: + if state.writer_task.done(): + return + await state.input_queue.put(pcm) + + # -- Member pipeline management -------------------------------------------- + + async def _refresh_member_mappings(self) -> None: + """Re-evaluate per-member channel mapping and DSP requirements.""" + async with self._state_lock: + if not self._mapping_dirty: + return + member_ids = tuple(self._members) + self._mapping_dirty = False + for member_id in member_ids: + await self._sync_member_pipeline(member_id) + + async def _sync_member_pipeline(self, player_id: str) -> _MemberPipeline: + """Create/update pipeline state for one member from current MA config.""" + config = self._get_pipeline_config_cached(player_id) + release_processor: _BufferedFfmpegProcessor | None = None + start_processor: _BufferedFfmpegProcessor | None = None + async with self._state_lock: + current = self._member_pipelines.get(player_id) + if current is not None and current.config.signature == config.signature: + return current + if current and current.config.requires_transform: + channel_id = current.channel_id if config.requires_transform else MAIN_CHANNEL + release_processor = current.processor + elif config.requires_transform: + channel_id = self._get_or_create_preassigned_channel(player_id) + else: + channel_id = MAIN_CHANNEL + self._preassigned_channels.pop(player_id, None) + processor: _BufferedFfmpegProcessor | None = None + if config.requires_transform: + ffmpeg_obj = self._create_member_ffmpeg(config.filter_params) + processor = _BufferedFfmpegProcessor(ffmpeg_obj, _PCM_FORMAT) + start_processor = processor + pipeline = _MemberPipeline( + player_id=player_id, + channel_id=channel_id, + config=config, + processor=processor, + ) + self._member_pipelines[player_id] = pipeline + if start_processor is not None: + try: + await start_processor.start() + except Exception as err: + async with self._state_lock: + if ( + self._member_pipelines.get(player_id) is not None + and self._member_pipelines[player_id].processor is start_processor + ): + self._member_pipelines.pop(player_id, None) + with suppress(Exception): + await self._close_member_ffmpeg(start_processor) + raise RuntimeError(f"Failed to start member DSP ffmpeg for {player_id}") from err + if release_processor is not None: + await self._close_member_ffmpeg(release_processor) + return pipeline + + def _get_pipeline_config_cached( + self, + player_id: str, + *, + force_refresh: bool = False, + ) -> _PipelineConfig: + """Return cached pipeline config for a player, calculating on cache miss.""" + if not force_refresh and (cached := self._pipeline_config_cache.get(player_id)) is not None: + return cached + config = self._read_pipeline_config(player_id) + self._pipeline_config_cache[player_id] = config + return config + + def _read_pipeline_config(self, player_id: str) -> _PipelineConfig: + """Read MA config and determine if member needs a dedicated DSP channel.""" + dsp_config = self.player.mass.config.get_player_dsp_config(player_id) + dsp_enabled = bool(dsp_config.enabled) + raw_output_channels = self.player.mass.config.get_raw_player_config_value( + player_id, + CONF_OUTPUT_CHANNELS, + "stereo", + ) + output_channels = str(raw_output_channels or "stereo").strip().lower() + if output_channels not in {"stereo", "left", "right", "mono"}: + output_channels = "stereo" + try: + filter_params = tuple( + get_player_filter_params( + self.player.mass, + player_id, + _PCM_FORMAT, + _PCM_FORMAT, + ) + ) + except Exception: + filter_params = () + custom_filter_graph = any( + param.strip() and not param.strip().startswith("alimiter=") for param in filter_params + ) + requires_transform = dsp_enabled or output_channels != "stereo" or custom_filter_graph + return _PipelineConfig( + requires_transform=requires_transform, + output_channels=output_channels, + filter_params=filter_params, + ) + + def _get_or_create_preassigned_channel(self, player_id: str) -> UUID: + """Return stable dedicated channel id for transform-required player.""" + if (channel_id := self._preassigned_channels.get(player_id)) is not None: + return channel_id + channel_id = uuid4() + self._preassigned_channels[player_id] = channel_id + return channel_id + + # -- FFmpeg lifecycle ------------------------------------------------------ + + def _create_member_ffmpeg(self, filter_params: tuple[str, ...]) -> FFMpeg: + """Create per-member FFMpeg for DSP pipeline.""" + return FFMpeg( + audio_input="-", + input_format=_PCM_FORMAT, + output_format=_PCM_FORMAT, + filter_params=list(filter_params), + ) + + async def _transform_member_chunk(self, pipeline: _MemberPipeline, chunk: bytes) -> None: + """Push one PCM chunk into a member DSP pipeline.""" + processor = pipeline.processor + if processor is None: + return + await processor.push(chunk) + + async def _read_member_chunk( + self, + pipeline: _MemberPipeline, + duration_us: int, + ) -> bytes | None: + """Read one transformed chunk from a member DSP pipeline.""" + processor = pipeline.processor + if processor is None or duration_us <= 0: + return b"" + transformed = await processor.read_duration_us(duration_us) + if not transformed: + return None + pipeline.ready = True + return bytes(transformed) + + async def _close_member_ffmpeg(self, processor: _BufferedFfmpegProcessor) -> None: + """Close an ffmpeg processor, suppressing errors.""" + with suppress(Exception): + await processor.close() + + async def _clear_member_pipelines(self) -> None: + """Release all member pipeline resources.""" + async with self._state_lock: + pipelines = list(self._member_pipelines.values()) + self._member_pipelines.clear() + for pipeline in pipelines: + if pipeline.processor is not None: + await self._close_member_ffmpeg(pipeline.processor) + + # -- Push stream ----------------------------------------------------------- + + def _create_push_stream(self) -> PushStream: + """Create PushStream with channel resolver for per-member routing.""" + return self.player.api.group.start_stream(channel_resolver=self._resolve_channel_for_player) + + def _stop_push_stream(self) -> None: + """Stop the active PushStream.""" + self.player.api.group.stop_stream() + + def _resolve_channel_for_player(self, player_id: str) -> UUID: + """Channel resolver callback for per-player routing.""" + pipeline = self._member_pipelines.get(player_id) + if pipeline is not None: + return pipeline.channel_id + # The leader always receives MAIN_CHANNEL audio directly from the + # commit loop; only group members get per-player DSP channels. + if player_id == self.player.player_id: + return MAIN_CHANNEL + # Force a fresh config read for pending/unknown joiners so the very + # first resolution (triggered by add_client) uses up-to-date DSP settings. + force = player_id not in self._members + config = self._get_pipeline_config_cached(player_id, force_refresh=force) + if not config.requires_transform: + return MAIN_CHANNEL + return self._get_or_create_preassigned_channel(player_id) + + # -- History --------------------------------------------------------------- + + def _prune_history_locked(self, now_monotonic_us: int) -> None: + """Drop old history chunks that are fully in the past.""" + if self._timeline_start_us is None or self._first_commit_monotonic_us is None: + return + elapsed_real_us = max(0, now_monotonic_us - self._first_commit_monotonic_us) + source_now_us = self._timeline_start_us + elapsed_real_us + cutoff_us = source_now_us - _HISTORY_KEEP_PAST_US + while self._history and ( + self._history[0].start_time_us + self._history[0].duration_us <= cutoff_us + ): + self._history.popleft() + + # -- PCM utilities --------------------------------------------------------- + + @staticmethod + def _duration_us(audio: bytes, audio_format: AudioFormat) -> int: + """Compute chunk duration from PCM payload size.""" + bytes_per_sample = max(1, int(audio_format.bit_depth // 8)) + bytes_per_second = ( + int(audio_format.sample_rate) * bytes_per_sample * int(audio_format.channels) + ) + if bytes_per_second <= 0: + return 0 + return int((len(audio) / bytes_per_second) * 1_000_000) + + @staticmethod + def _iter_pcm_slices( + audio: bytes, audio_format: AudioFormat, target_duration_us: int + ) -> Iterator[bytes]: + """Yield frame-aligned PCM slices up to target duration.""" + if not audio: + return + bytes_per_sample = max(1, int(audio_format.bit_depth // 8)) + frame_size = bytes_per_sample * int(audio_format.channels) + if frame_size <= 0: + yield audio + return + samples_per_slice = max( + 1, round((target_duration_us / 1_000_000) * int(audio_format.sample_rate)) + ) + slice_size = max(frame_size, samples_per_slice * frame_size) + offset = 0 + audio_len = len(audio) + while offset < audio_len: + end = min(audio_len, offset + slice_size) + if end < audio_len: + aligned_end = end - (end % frame_size) + if aligned_end <= offset: + aligned_end = min(audio_len, offset + frame_size) + end = aligned_end + yield audio[offset:end] + offset = end + + @staticmethod + def _silence_for_duration_us(duration_us: int) -> bytes: + """Generate silent PCM with frame-aligned duration for the default format.""" + if duration_us <= 0: + return b"" + bytes_per_sample = max(1, int(_PCM_FORMAT.bit_depth // 8)) + frame_size = bytes_per_sample * int(_PCM_FORMAT.channels) + samples = max(0, round((duration_us / 1_000_000) * int(_PCM_FORMAT.sample_rate))) + return b"\x00" * (samples * frame_size) diff --git a/music_assistant/providers/sendspin/player.py b/music_assistant/providers/sendspin/player.py index 91987a35..4fdaffd5 100644 --- a/music_assistant/providers/sendspin/player.py +++ b/music_assistant/providers/sendspin/player.py @@ -4,49 +4,65 @@ from __future__ import annotations import asyncio import time -from collections.abc import AsyncGenerator, Callable +from collections.abc import Callable from io import BytesIO from typing import TYPE_CHECKING, cast -from aiosendspin.models import MediaCommand -from aiosendspin.models.types import ArtworkSource, PlaybackStateType +from aiosendspin.models import AudioCodec, MediaCommand +from aiosendspin.models.types import PlaybackStateType from aiosendspin.models.types import RepeatMode as SendspinRepeatMode -from aiosendspin.server import AudioFormat as SendspinAudioFormat from aiosendspin.server import ( ClientEvent, - GroupCommandEvent, GroupEvent, - GroupStateChangedEvent, SendspinGroup, VolumeChangedEvent, ) +from aiosendspin.server.audio import AudioFormat as SendspinAudioFormat from aiosendspin.server.client import DisconnectBehaviour -from aiosendspin.server.events import ClientGroupChangedEvent -from aiosendspin.server.group import ( +from aiosendspin.server.events import ( + ClientGroupChangedEvent, GroupDeletedEvent, GroupMemberAddedEvent, GroupMemberRemovedEvent, + GroupStateChangedEvent, +) +from aiosendspin.server.roles import ( + ArtworkGroupRole, + ControllerEvent, + ControllerGroupRole, + ControllerNextEvent, + ControllerPauseEvent, + ControllerPlayEvent, + ControllerPreviousEvent, + ControllerRepeatEvent, + ControllerShuffleEvent, + ControllerStopEvent, + MetadataGroupRole, ) -from aiosendspin.server.metadata import Metadata -from aiosendspin.server.stream import AudioCodec, MediaStream +from aiosendspin.server.roles.metadata.state import Metadata +from aiosendspin.server.roles.player.types import PlayerRoleProtocol +from music_assistant_models.config_entries import ConfigEntry, ConfigValueOption from music_assistant_models.constants import PLAYER_CONTROL_NONE from music_assistant_models.enums import ( - ContentType, + ConfigEntryType, ImageType, PlaybackState, PlayerFeature, PlayerType, RepeatMode, ) -from music_assistant_models.media_items import AudioFormat from music_assistant_models.player import DeviceInfo from PIL import Image -from music_assistant.constants import CONF_OUTPUT_CHANNELS, CONF_OUTPUT_CODEC, INTERNAL_PCM_FORMAT -from music_assistant.helpers.audio import get_player_filter_params +from music_assistant.constants import ( + CONF_ENTRY_HTTP_PROFILE_HIDDEN, + CONF_ENTRY_OUTPUT_CODEC_HIDDEN, + CONF_ENTRY_SAMPLE_RATES, +) from music_assistant.models.player import Player, PlayerMedia - -from .timed_client_stream import TimedClientStream +from music_assistant.providers.sendspin.playback import ( + SendspinPlaybackSession, +) # Supported group commands for Sendspin players SUPPORTED_GROUP_COMMANDS = [ @@ -62,111 +78,61 @@ SUPPORTED_GROUP_COMMANDS = [ MediaCommand.UNSHUFFLE, ] -if TYPE_CHECKING: - from aiosendspin.server.client import SendspinClient - from music_assistant_models.player_queue import PlayerQueue - from music_assistant_models.queue_item import QueueItem - - from .provider import SendspinProvider +# Config constants for Sendspin audio format +CONF_PREFERRED_SENDSPIN_FORMAT = "preferred_sendspin_format" +SENDSPIN_FORMAT_AUTOMATIC = "automatic" -class MusicAssistantMediaStream(MediaStream): - """MediaStream implementation for Music Assistant with per-player DSP support.""" - - player_instance: SendspinPlayer - internal_format: AudioFormat - output_format: AudioFormat - - def __init__( - self, - *, - main_channel_source: AsyncGenerator[bytes, None], - main_channel_format: SendspinAudioFormat, - player_instance: SendspinPlayer, - internal_format: AudioFormat, - output_format: AudioFormat, - ) -> None: - """ - Initialise the media stream with audio source and format for main_channel(). - - Args: - main_channel_source: Audio source generator for the main channel. - main_channel_format: Audio format for the main channel (includes codec). - player_instance: The SendspinPlayer instance for accessing mass and streams. - internal_format: Internal processing format (float32 for headroom). - output_format: Output PCM format (16-bit for player output). - """ - super().__init__( - main_channel_source=main_channel_source, - main_channel_format=main_channel_format, - ) - self.player_instance = player_instance - self.internal_format = internal_format - self.output_format = output_format - - async def player_channel( - self, - player_id: str, - preferred_format: SendspinAudioFormat | None = None, - position_us: int = 0, - ) -> tuple[AsyncGenerator[bytes, None], SendspinAudioFormat, int] | None: - """ - Get a player-specific audio stream with per-player DSP. +def format_to_option_value(fmt: SupportedAudioFormat) -> str: + """Convert SupportedAudioFormat to "codec:sample_rate:bit_depth:channels".""" + return f"{fmt.codec.value}:{fmt.sample_rate}:{fmt.bit_depth}:{fmt.channels}" - Args: - player_id: Identifier for the player requesting the stream. - preferred_format: The player's preferred native format for the stream. - The implementation may return a different format; the library - will handle any necessary conversion. - position_us: Position in microseconds relative to the main_stream start. - Used for late-joining players to sync with the main stream. - - Returns: - A tuple of (audio generator, audio format, actual position in microseconds) - or None if unavailable. If None, the main_stream is used as fallback. - """ - mass = self.player_instance.mass - multi_client_stream = self.player_instance.timed_client_stream - assert multi_client_stream is not None - dsp = mass.config.get_player_dsp_config(player_id) - output_channels = mass.config.get_raw_player_config_value( - player_id, CONF_OUTPUT_CHANNELS, "stereo" - ) - if not dsp.enabled and output_channels == "stereo": - # DSP is disabled and output is stereo, use main_stream - return None +def option_value_to_format(value: str) -> tuple[AudioCodec, SendspinAudioFormat] | None: + """Parse option value back to (AudioCodec, SendspinAudioFormat). - # Get per-player DSP filter parameters - filter_params = get_player_filter_params( - mass, player_id, self.internal_format, self.output_format + :param value: Option value in format "codec:sample_rate:bit_depth:channels". + :return: Tuple of (AudioCodec, SendspinAudioFormat) or None if parsing fails. + """ + try: + codec_str, sample_rate_str, bit_depth_str, channels_str = value.split(":") + codec = AudioCodec(codec_str) + audio_format = SendspinAudioFormat( + sample_rate=int(sample_rate_str), + bit_depth=int(bit_depth_str), + channels=int(channels_str), ) + return (codec, audio_format) + except (ValueError, KeyError): + return None + + +def format_to_display_string(fmt: SupportedAudioFormat) -> str: + """Convert to display string like "FLAC 48kHz/24bit stereo".""" + codec_name = fmt.codec.name + sample_rate_khz = fmt.sample_rate / 1000 + # Format sample rate: show as integer if whole number, otherwise one decimal + if sample_rate_khz == int(sample_rate_khz): + sample_rate_str = f"{int(sample_rate_khz)}kHz" + else: + sample_rate_str = f"{sample_rate_khz:.1f}kHz" + if fmt.channels == 2: + channels_str = "stereo" + elif fmt.channels == 1: + channels_str = "mono" + else: + channels_str = f"{fmt.channels}ch" + return f"{codec_name} {sample_rate_str}/{fmt.bit_depth}bit {channels_str}" - # Get the stream with position (in seconds) - stream_gen, actual_position = await multi_client_stream.get_stream( - output_format=self.output_format, - filter_params=filter_params, - ) - # Convert position from seconds to microseconds for aiosendspin API - actual_position_us = int(actual_position * 1_000_000) +if TYPE_CHECKING: + from aiosendspin.models.player import SupportedAudioFormat + from aiosendspin.server.client import SendspinClient + from music_assistant_models.config_entries import ConfigValueType + from music_assistant_models.player_queue import PlayerQueue + from music_assistant_models.queue_item import QueueItem - # Return actual position in microseconds relative to main_stream start - self.player_instance.logger.debug( - "Providing channel stream for player %s at position %d us", - player_id, - actual_position_us, - ) - return ( - stream_gen, - SendspinAudioFormat( - sample_rate=self.output_format.sample_rate, - bit_depth=self.output_format.bit_depth, - channels=self.output_format.channels, - codec=self._main_channel_format.codec, - ), - actual_position_us, - ) + from .provider import SendspinProvider class SendspinPlayer(Player): @@ -179,8 +145,7 @@ class SendspinPlayer(Player): unsub_group_event_cb: Callable[[], None] last_sent_artwork_url: str | None = None last_sent_artist_artwork_url: str | None = None - _playback_task: asyncio.Task[None] | None = None - timed_client_stream: TimedClientStream | None = None + playback_session: SendspinPlaybackSession is_web_player: bool = False @property @@ -197,7 +162,10 @@ class SendspinPlayer(Player): self.api.disconnect_behaviour = DisconnectBehaviour.STOP self.unsub_event_cb = sendspin_client.add_event_listener(self.event_cb) self.unsub_group_event_cb = sendspin_client.group.add_event_listener(self.group_event_cb) - sendspin_client.group.set_supported_commands(SUPPORTED_GROUP_COMMANDS) + if controller_role := self._controller_role: + controller_role.set_supported_commands(SUPPORTED_GROUP_COMMANDS) + + self.playback_session = SendspinPlaybackSession(self) self.logger = self.provider.logger.getChild(player_id) # init some static variables @@ -205,9 +173,9 @@ class SendspinPlayer(Player): self._attr_supported_features = { PlayerFeature.PLAY_MEDIA, PlayerFeature.SET_MEMBERS, - PlayerFeature.MULTI_DEVICE_DSP, PlayerFeature.VOLUME_SET, PlayerFeature.VOLUME_MUTE, + PlayerFeature.MULTI_DEVICE_DSP, } self._attr_can_group_with = {provider.instance_id} self._attr_power_control = PLAYER_CONTROL_NONE @@ -219,9 +187,16 @@ class SendspinPlayer(Player): ) else: self._attr_device_info = DeviceInfo() - if player_client := sendspin_client.player: - self._attr_volume_level = player_client.volume - self._attr_volume_muted = player_client.muted + if sendspin_client.info.player_support: + for role in sendspin_client.roles_by_family("player"): + volume = role.get_player_volume() + muted = role.get_player_muted() + if volume is not None: + self._attr_volume_level = volume + if muted is not None: + self._attr_volume_muted = muted + if volume is not None or muted is not None: + break self._attr_available = True self.is_web_player = sendspin_client.name.startswith( "Web (" # The regular Web Interface @@ -231,9 +206,85 @@ class SendspinPlayer(Player): self._attr_expose_to_ha_by_default = not self.is_web_player self._attr_hidden_by_default = self.is_web_player + @property + def _artwork_role(self) -> ArtworkGroupRole | None: + """Get the ArtworkGroupRole for this player's group.""" + role = self.api.group.group_role("artwork") + if isinstance(role, ArtworkGroupRole): + return role + return None + + @property + def _metadata_role(self) -> MetadataGroupRole | None: + """Get the MetadataGroupRole for this player's group.""" + role = self.api.group.group_role("metadata") + if isinstance(role, MetadataGroupRole): + return role + return None + + @property + def _controller_role(self) -> ControllerGroupRole | None: + """Get the ControllerGroupRole for this player's group.""" + role = self.api.group.group_role("controller") + if isinstance(role, ControllerGroupRole): + return role + return None + + @property + def _player_role(self) -> PlayerRoleProtocol | None: + """Get the player role for this client (not group role).""" + for role in self.api.roles_by_family("player"): + if isinstance(role, PlayerRoleProtocol): + return role + return None + + async def _handle_controller_event(self, event: ControllerEvent) -> None: + """Handle a controller event from the ControllerGroupRole.""" + queue = self.mass.player_queues.get_active_queue(self.player_id) + match event: + case ControllerPlayEvent(): + await self.mass.players.cmd_play(self.player_id) + case ControllerPauseEvent(): + await self.mass.players.cmd_pause(self.player_id) + case ControllerStopEvent(): + await self.mass.players.cmd_stop(self.player_id) + case ControllerNextEvent(): + await self.mass.players.cmd_next_track(self.player_id) + case ControllerPreviousEvent(): + await self.mass.players.cmd_previous_track(self.player_id) + case ControllerRepeatEvent(mode=mode) if queue: + match mode: + case SendspinRepeatMode.OFF: + self.mass.player_queues.set_repeat(queue.queue_id, RepeatMode.OFF) + case SendspinRepeatMode.ONE: + self.mass.player_queues.set_repeat(queue.queue_id, RepeatMode.ONE) + case SendspinRepeatMode.ALL: + self.mass.player_queues.set_repeat(queue.queue_id, RepeatMode.ALL) + case ControllerShuffleEvent(shuffle=shuffle) if queue: + await self.mass.player_queues.set_shuffle(queue.queue_id, shuffle_enabled=shuffle) + + async def _sync_membership_from_group(self, group: SendspinGroup) -> None: + """Sync MA/player + playback session membership from authoritative group state.""" + # Ignore stale events from a group we no longer belong to. + if group is not self.api.group: + return + group_client_ids = [client.client_id for client in group.clients] + is_leader = bool(group_client_ids) and group_client_ids[0] == self.player_id + desired_group_members = group_client_ids if is_leader else [] + desired_session_members = group_client_ids[1:] if is_leader else [] + if self._attr_group_members != desired_group_members: + self._attr_group_members = desired_group_members + self.update_state() + # Only use STOP when we actually lead other members. + self.api.disconnect_behaviour = ( + DisconnectBehaviour.STOP + if is_leader and len(desired_session_members) > 0 + else DisconnectBehaviour.UNGROUP + ) + await self.playback_session.sync_members(set(desired_session_members)) + def event_cb(self, client: SendspinClient, event: ClientEvent) -> None: - """Event callback registered to the sendspin server.""" - self.logger.debug("Received PlayerEvent: %s", event) + """Event callback registered to the sendspin client.""" match event: case VolumeChangedEvent(volume=volume, muted=muted): self._attr_volume_level = volume @@ -242,6 +293,10 @@ class SendspinPlayer(Player): case ClientGroupChangedEvent(new_group=new_group): self.unsub_group_event_cb() self.unsub_group_event_cb = new_group.add_event_listener(self.group_event_cb) + if controller_role := self._controller_role: + controller_role.set_supported_commands(SUPPORTED_GROUP_COMMANDS) + # Cancel active playback - push stream belongs to the old group + self.mass.create_task(self.playback_session.cancel("group changed")) # Sync playback state from the new group match new_group.state: case PlaybackStateType.PLAYING: @@ -250,42 +305,14 @@ class SendspinPlayer(Player): self._attr_playback_state = PlaybackState.PAUSED case PlaybackStateType.STOPPED: self._attr_playback_state = PlaybackState.IDLE + self._attr_elapsed_time = 0 + self._attr_elapsed_time_last_updated = time.time() # Update in case this is a newly created group - new_group.set_supported_commands(SUPPORTED_GROUP_COMMANDS) # GroupMemberAddedEvent or GroupMemberRemovedEvent will be fired before this # so group members are already up to date at this point - if self.synced_to is None: - # We are the leader, stop on disconnect - self.api.disconnect_behaviour = DisconnectBehaviour.STOP - else: - self.api.disconnect_behaviour = DisconnectBehaviour.UNGROUP + self.mass.create_task(self._sync_membership_from_group(new_group)) self.update_state() - async def _handle_group_command(self, command: MediaCommand) -> None: - """Handle a group command from aiosendspin.""" - queue = self.mass.player_queues.get_active_queue(self.player_id) - match command: - case MediaCommand.PLAY: - await self.mass.players.cmd_play(self.player_id) - case MediaCommand.PAUSE: - await self.mass.players.cmd_pause(self.player_id) - case MediaCommand.STOP: - await self.mass.players.cmd_stop(self.player_id) - case MediaCommand.NEXT: - await self.mass.players.cmd_next_track(self.player_id) - case MediaCommand.PREVIOUS: - await self.mass.players.cmd_previous_track(self.player_id) - case MediaCommand.REPEAT_OFF if queue: - self.mass.player_queues.set_repeat(queue.queue_id, RepeatMode.OFF) - case MediaCommand.REPEAT_ONE if queue: - self.mass.player_queues.set_repeat(queue.queue_id, RepeatMode.ONE) - case MediaCommand.REPEAT_ALL if queue: - self.mass.player_queues.set_repeat(queue.queue_id, RepeatMode.ALL) - case MediaCommand.SHUFFLE if queue: - await self.mass.player_queues.set_shuffle(queue.queue_id, shuffle_enabled=True) - case MediaCommand.UNSHUFFLE if queue: - await self.mass.player_queues.set_shuffle(queue.queue_id, shuffle_enabled=False) - def group_event_cb(self, group: SendspinGroup, event: GroupEvent) -> None: """Event callback registered to the sendspin group this player belongs to.""" if self.synced_to is not None: @@ -294,14 +321,8 @@ class SendspinPlayer(Player): # - GroupStateChangedEvent: to update playback state when leader stops/disconnects if not isinstance(event, (GroupMemberRemovedEvent, GroupStateChangedEvent)): return - self.logger.debug("Received GroupEvent: %s", event) - match event: - case GroupCommandEvent(command=command): - self.logger.debug("Group command received: %s", command) - self.mass.create_task(self._handle_group_command(command)) case GroupStateChangedEvent(state=state): - self.logger.debug("Group state changed to: %s", state) match state: case PlaybackStateType.PLAYING: self._attr_playback_state = PlaybackState.PLAYING @@ -311,38 +332,41 @@ class SendspinPlayer(Player): self._attr_playback_state = PlaybackState.IDLE self._attr_elapsed_time = 0 self._attr_elapsed_time_last_updated = time.time() + if self.synced_to is None: + self.mass.create_task(self.playback_session.cancel("group stopped")) self.update_state() case GroupMemberAddedEvent(client_id=client_id): - self.logger.debug("Group member added: %s", client_id) + is_group_leader = ( + bool(group.clients) and group.clients[0].client_id == self.player_id + ) + if is_group_leader and ( + not self._attr_group_members or self._attr_group_members[0] != self.player_id + ): + self._attr_group_members = [self.player_id, *self._attr_group_members] if client_id not in self._attr_group_members: self._attr_group_members.append(client_id) self.update_state() + self.mass.create_task(self.playback_session.add_member(client_id)) + self.mass.create_task(self._sync_membership_from_group(group)) case GroupMemberRemovedEvent(client_id=client_id): - self.logger.debug("Group member removed: %s", client_id) - self.mass.create_task(self._handle_member_removed(group, client_id)) + self.mass.create_task(self.playback_session.remove_member(client_id)) + self.mass.create_task(self._handle_group_member_removed(group, client_id)) + self.mass.create_task(self._sync_membership_from_group(group)) case GroupDeletedEvent(): pass + case ControllerEvent() as controller_event: + if self.synced_to is None: + self.mass.create_task(self._handle_controller_event(controller_event)) - async def _handle_member_removed(self, group: SendspinGroup, client_id: str) -> None: - """Handle group member removed event asynchronously.""" + async def _handle_group_member_removed(self, group: SendspinGroup, client_id: str) -> None: + """Handle a group member being removed asynchronously.""" if client_id == self.player_id: - if len(self._attr_group_members) > 0: + if len(group.clients) > 0: # We were just removed as a leader: # 1. stop playback on the old group await group.stop() - # 2. clear our members (since we are now alone) - group_members = [ - member for member in self._attr_group_members if member != client_id - ] + # 2. clear our members (since we are now alone in a new group) self._attr_group_members = [] - # 3. assign new leader if there are members left - if len(group_members) > 0 and ( - new_leader := self.mass.players.get_player(group_members[0]) - ): - new_leader = cast("SendspinPlayer", new_leader) - new_leader._attr_group_members = group_members[1:] - new_leader.api.disconnect_behaviour = DisconnectBehaviour.STOP - new_leader.update_state() self.update_state() elif client_id in self._attr_group_members: # Someone else left our group @@ -351,26 +375,27 @@ class SendspinPlayer(Player): async def volume_set(self, volume_level: int) -> None: """Handle VOLUME_SET command on the player.""" - if player_client := self.api.player: - player_client.set_volume(volume_level) + roles = self.api.roles_by_family("player") + for role in roles: + role.set_player_volume(volume_level) async def volume_mute(self, muted: bool) -> None: """Handle VOLUME MUTE command on the player.""" - if player_client := self.api.player: - if muted: - player_client.mute() - else: - player_client.unmute() + roles = self.api.roles_by_family("player") + for role in roles: + role.set_player_mute(muted) async def stop(self) -> None: """Stop command.""" self.logger.debug("Received STOP command on player %s", self.display_name) - # We don't care if we stopped the stream or it was already stopped - await self.api.group.stop() - # Clear the playback task reference (group.stop() handles stopping the stream) - self._playback_task = None + self.mark_stop_called() self._attr_current_media = None + self._attr_playback_state = PlaybackState.IDLE + self._attr_elapsed_time = 0 + self._attr_elapsed_time_last_updated = time.time() self.update_state() + await self.playback_session.cancel("stop command") + await self.api.group.stop() async def play_media(self, media: PlayerMedia) -> None: """Play media command.""" @@ -385,83 +410,53 @@ class SendspinPlayer(Player): # playback_state will be set by the group state change event # Stop previous stream in case we were already playing something + await self.playback_session.cancel("new media requested") await self.api.group.stop() - # Run playback in background task to immediately return - self._playback_task = asyncio.create_task(self._run_playback(media)) + await self.playback_session.start(media) self.update_state() - async def _run_playback(self, media: PlayerMedia) -> None: - """Run the actual playback in a background task.""" - try: - # Use 32-bit for the main channel: aiosendspin converts per player as needed - pcm_format = AudioFormat( - content_type=ContentType.PCM_S32LE, - sample_rate=48000, - bit_depth=32, - channels=2, - ) - flow_pcm_format = AudioFormat( - content_type=INTERNAL_PCM_FORMAT.content_type, - sample_rate=pcm_format.sample_rate, - bit_depth=INTERNAL_PCM_FORMAT.bit_depth, - channels=pcm_format.channels, - ) - - output_codec = cast("str", self.config.get_value(CONF_OUTPUT_CODEC, "pcm")) + async def on_config_updated(self) -> None: + """Apply preferred format when config changes.""" + await self._apply_preferred_format() - # Convert string codec to AudioCodec enum - audio_codec = AudioCodec(output_codec) + async def _apply_preferred_format(self) -> None: + """Read config and call set_preferred_format() if not automatic.""" + player_role = self._player_role + if player_role is None: + return - # Get clean audio source in flow format (high quality internal format) - # Format conversion and per-player DSP will be applied via player_channel - audio_source = self.mass.streams.get_stream(media, flow_pcm_format) + config_value = cast( + "str", + self.config.get_value(CONF_PREFERRED_SENDSPIN_FORMAT, SENDSPIN_FORMAT_AUTOMATIC), + ) + if config_value == SENDSPIN_FORMAT_AUTOMATIC: + # Automatic mode: don't set a preferred format, let client decide. + return - # Create TimedClientStream to wrap the clean audio source - # This distributes the audio to multiple subscribers without DSP - self.timed_client_stream = TimedClientStream( - audio_source=audio_source, - audio_format=flow_pcm_format, + parsed = option_value_to_format(config_value) + if parsed is None: + self.logger.warning( + "Invalid audio format config value '%s' for player %s", + config_value, + self.display_name, ) + return - # Setup the main channel subscription - main_channel_gen, main_position = await self.timed_client_stream.get_stream( - output_format=pcm_format, - filter_params=None, # TODO: this should probably still include the safety limiter - ) - assert main_position == 0.0 # first subscriber, should be zero - media_stream = MusicAssistantMediaStream( - main_channel_source=main_channel_gen, - main_channel_format=SendspinAudioFormat( - sample_rate=pcm_format.sample_rate, - bit_depth=pcm_format.bit_depth, - channels=pcm_format.channels, - codec=audio_codec, - ), - player_instance=self, - internal_format=flow_pcm_format, - output_format=pcm_format, + codec, audio_format = parsed + if not player_role.set_preferred_format(audio_format, codec): + self.logger.warning( + "Failed to set preferred audio format %s %s for player %s", + codec.name, + audio_format, + self.display_name, ) - stop_time = await self.api.group.play_media(media_stream) - await self.api.group.stop(stop_time) - except asyncio.CancelledError: - self.logger.debug("Playback cancelled for player %s", self.display_name) - raise - except Exception: - self.logger.exception("Error during playback for player %s", self.display_name) - raise - finally: - self.timed_client_stream = None - async def set_members( self, player_ids_to_add: list[str] | None = None, player_ids_to_remove: list[str] | None = None, ) -> None: """Handle SET_MEMBERS command on the player.""" - self.logger.debug( - "set_members called: adding %s, removing %s", player_ids_to_add, player_ids_to_remove - ) for player_id in player_ids_to_remove or []: player = self.mass.players.get_player(player_id, True) player = cast("SendspinPlayer", player) # For type checking @@ -492,10 +487,11 @@ class SendspinPlayer(Player): ) if image_data is not None: image = await asyncio.to_thread(Image.open, BytesIO(image_data)) - await self.api.group.set_media_art(image, source=ArtworkSource.ALBUM) - else: - # Clear artwork if none available - await self.api.group.set_media_art(None, source=ArtworkSource.ALBUM) + if (artwork_role := self._artwork_role) is not None: + await artwork_role.set_album_artwork(image) + # Clear artwork if none available + elif (artwork_role := self._artwork_role) is not None: + await artwork_role.set_album_artwork(None) return artwork_url @@ -526,10 +522,11 @@ class SendspinPlayer(Player): ) if artist_image_data is not None: artist_image = await asyncio.to_thread(Image.open, BytesIO(artist_image_data)) - await self.api.group.set_media_art(artist_image, source=ArtworkSource.ARTIST) - else: - # Clear artist artwork if none available - await self.api.group.set_media_art(None, source=ArtworkSource.ARTIST) + if (artwork_role := self._artwork_role) is not None: + await artwork_role.set_artist_artwork(artist_image) + # Clear artist artwork if none available + elif (artwork_role := self._artwork_role) is not None: + await artwork_role.set_artist_artwork(None) def _on_player_media_updated(self) -> None: """Handle callback when the current media of the player is updated.""" @@ -539,7 +536,8 @@ class SendspinPlayer(Player): if self.state.current_media is None: # Clear metadata when no media loaded - self.api.group.set_metadata(Metadata()) + if (metadata_role := self._metadata_role) is not None: + metadata_role.set_metadata(Metadata()) return self.mass.create_task(self.send_current_media_metadata()) @@ -576,11 +574,11 @@ class SendspinPlayer(Player): metadata = Metadata( title=current_media.title, artist=current_media.artist, - album_artist=None, # TODO: extract from optional queue item + album_artist=None, album=current_media.album, artwork_url=current_media.image_url, - year=None, # TODO: extract from optional queue item - track=None, # TODO: extract from optional queue item + year=None, + track=None, track_duration=track_duration * 1000 if track_duration is not None else None, track_progress=int(current_media.corrected_elapsed_time * 1000) if current_media.corrected_elapsed_time @@ -591,10 +589,59 @@ class SendspinPlayer(Player): ) # Send metadata to the group - self.api.group.set_metadata(metadata) + if (metadata_role := self._metadata_role) is not None: + metadata_role.set_metadata(metadata) + + async def get_config_entries( + self, + action: str | None = None, + values: dict[str, ConfigValueType] | None = None, + ) -> list[ConfigEntry]: + """Return all (provider/player specific) Config Entries for the player.""" + default_entries = await super().get_config_entries(action=action, values=values) + entries = [ + *default_entries, + CONF_ENTRY_OUTPUT_CODEC_HIDDEN, + CONF_ENTRY_HTTP_PROFILE_HIDDEN, + ConfigEntry.from_dict({**CONF_ENTRY_SAMPLE_RATES.to_dict(), "hidden": True}), + ] + + # Build dynamic format options from player's supported formats + player_role = self._player_role + if player_role is not None: + supported_formats = player_role.get_supported_formats() + if supported_formats: + format_options = [ + ConfigValueOption( + title="Automatic (let client decide)", + value=SENDSPIN_FORMAT_AUTOMATIC, + ), + ] + for fmt in supported_formats: + format_options.append( + ConfigValueOption( + title=format_to_display_string(fmt), + value=format_to_option_value(fmt), + ) + ) + entries.append( + ConfigEntry( + key=CONF_PREFERRED_SENDSPIN_FORMAT, + type=ConfigEntryType.STRING, + label="Preferred audio format", + description="Select the audio format to use for playback on this player.", + category="protocol_generic", + default_value=SENDSPIN_FORMAT_AUTOMATIC, + options=format_options, + advanced=True, + ) + ) + + return entries async def on_unload(self) -> None: """Handle logic when the player is unloaded from the Player controller.""" + await self.playback_session.close() await super().on_unload() self.unsub_event_cb() self.unsub_group_event_cb() diff --git a/music_assistant/providers/sendspin/provider.py b/music_assistant/providers/sendspin/provider.py index 1866a53c..392cc46f 100644 --- a/music_assistant/providers/sendspin/provider.py +++ b/music_assistant/providers/sendspin/provider.py @@ -8,6 +8,7 @@ from typing import TYPE_CHECKING, cast from aiosendspin.server import ClientAddedEvent, ClientRemovedEvent, SendspinEvent, SendspinServer from music_assistant_models.enums import ProviderFeature +from music_assistant_models.errors import AlreadyRegisteredError from music_assistant.mass import MusicAssistant from music_assistant.models.player_provider import PlayerProvider @@ -42,7 +43,6 @@ class SendspinProvider(PlayerProvider): def event_cb(self, server: SendspinServer, event: SendspinEvent) -> None: """Event callback registered to the sendspin server.""" - self.logger.debug("Received SendspinEvent: %s", event) match event: case ClientAddedEvent(client_id): self.mass.create_task(self._handle_client_added(client_id)) @@ -52,7 +52,7 @@ class SendspinProvider(PlayerProvider): self.logger.error("Unknown sendspin event: %s", event) async def _handle_client_added(self, client_id: str) -> None: - """Handle client added event asynchronously.""" + """Handle a new client connection asynchronously.""" # Wait for any pending unregister to complete before registering # This prevents a race condition where a slow unregister removes # a newly registered player after a quick reconnect @@ -63,6 +63,11 @@ class SendspinProvider(PlayerProvider): if self.server_api.get_client(client_id) is None: self.logger.debug("Client %s gone after waiting for pending unregister", client_id) return + if self.mass.players.get_player(client_id) is not None: + self.logger.debug( + "Client %s already registered, skipping duplicate add event", client_id + ) + return player = SendspinPlayer(self, client_id) self.logger.debug("Client %s connected", client_id) if player.device_info.manufacturer == "ESPHome" and ( @@ -74,10 +79,15 @@ class SendspinProvider(PlayerProvider): player._attr_name = ( hass_device["name_by_user"] or hass_device["name"] or player.name ) - await self.mass.players.register(player) + try: + await self.mass.players.register(player) + except AlreadyRegisteredError: + self.logger.debug("Client %s already registered while handling add event", client_id) + player.unsub_event_cb() + player.unsub_group_event_cb() async def _handle_client_removed(self, client_id: str) -> None: - """Handle client removed event asynchronously.""" + """Handle a client disconnection asynchronously.""" self.logger.debug("Client %s disconnected", client_id) unregister_event = asyncio.Event() self._pending_unregisters[client_id] = unregister_event @@ -115,13 +125,16 @@ class SendspinProvider(PlayerProvider): """ # Disconnect all clients before stopping the server clients = list(self.server_api.clients) + connected_clients = [] disconnect_tasks = [] for client in clients: - self.logger.debug("Disconnecting client %s", client.client_id) - disconnect_tasks.append(client.disconnect(retry_connection=False)) + if client.connection is None: + continue + connected_clients.append(client) + disconnect_tasks.append(client.connection.disconnect(retry_connection=False)) if disconnect_tasks: results = await asyncio.gather(*disconnect_tasks, return_exceptions=True) - for client, result in zip(clients, results, strict=True): + for client, result in zip(connected_clients, results, strict=True): if isinstance(result, Exception): self.logger.warning( "Error disconnecting client %s: %s", client.client_id, result diff --git a/music_assistant/providers/sendspin/timed_client_stream.py b/music_assistant/providers/sendspin/timed_client_stream.py deleted file mode 100644 index bb0b9620..00000000 --- a/music_assistant/providers/sendspin/timed_client_stream.py +++ /dev/null @@ -1,331 +0,0 @@ -""" -Timestamped multi-client audio stream for position-aware playback. - -This module provides a multi-client streaming implementation optimized for -aiosendspin's synchronized multi-room audio playback. Each audio chunk is -timestamped, allowing late-joining players to start at the correct position -for synchronized playback across multiple devices. -""" - -import asyncio -import logging -from collections import deque -from collections.abc import AsyncGenerator -from contextlib import suppress -from uuid import UUID, uuid4 - -from music_assistant_models.media_items import AudioFormat - -from music_assistant.helpers.ffmpeg import get_ffmpeg_stream - -LOGGER = logging.getLogger(__name__) - -# Minimum/target buffer retention time in seconds -# This 10s buffer is currently required since: -# - aiosendspin currently uses a fixed 5s buffer to allow up to ~4s of network interruption -# - ~2s allows for ffmpeg processing time and some margin -# - ~3s are currently needed internally by aiosendspin for initial buffering -MIN_BUFFER_DURATION = 10.0 -# Maximum buffer duration before raising an error (safety mechanism) -MAX_BUFFER_DURATION = MIN_BUFFER_DURATION + 5.0 - - -class TimedClientStream: - """Multi-client audio stream with timestamped chunks for synchronized playback.""" - - audio_source: AsyncGenerator[bytes, None] - """The source audio stream to read from.""" - audio_format: AudioFormat - """The audio format of the source stream.""" - chunk_buffer: deque[tuple[bytes, float]] - """Buffer storing chunks with their timestamps in seconds (chunk_data, timestamp_seconds).""" - subscriber_positions: dict[UUID, int] - """Subscriber positions: maps subscriber_id to position (index into chunk_buffer).""" - buffer_lock: asyncio.Lock - """Lock for buffer and shared state access.""" - source_read_lock: asyncio.Lock - """Lock to serialize audio source reads.""" - stream_ended: bool = False - """Track if stream has ended.""" - current_position: float = 0.0 - """Current position in seconds (from stream start).""" - - def __init__( - self, - audio_source: AsyncGenerator[bytes, None], - audio_format: AudioFormat, - ) -> None: - """Initialize TimedClientStream.""" - self.audio_source = audio_source - self.audio_format = audio_format - self.chunk_buffer = deque() - self.subscriber_positions = {} - self.buffer_lock = asyncio.Lock() - self.source_read_lock = asyncio.Lock() - - def _get_bytes_per_second(self) -> int: - """Get bytes per second for the audio format.""" - return ( - self.audio_format.sample_rate - * self.audio_format.channels - * (self.audio_format.bit_depth // 8) - ) - - def _bytes_to_seconds(self, num_bytes: int) -> float: - """Convert bytes to seconds based on audio format.""" - bytes_per_second = self._get_bytes_per_second() - if bytes_per_second == 0: - return 0.0 - return num_bytes / bytes_per_second - - def _get_buffer_duration(self) -> float: - """Calculate total duration of buffered chunks in seconds.""" - if not self.chunk_buffer: - return 0.0 - # Duration is from first chunk timestamp to current position - first_chunk_timestamp = self.chunk_buffer[0][1] - return self.current_position - first_chunk_timestamp - - def _cleanup_old_chunks(self) -> None: - """Remove old chunks when all subscribers read them and min duration exceeded.""" - # Find the oldest position still needed by any subscriber - if self.subscriber_positions: - min_position = min(self.subscriber_positions.values()) - else: - min_position = len(self.chunk_buffer) - - # Calculate target oldest timestamp - # This ensures buffer contains at least MIN_BUFFER_DURATION seconds of recent data - target_oldest = self.current_position - MIN_BUFFER_DURATION - - # Remove old chunks that meet both conditions: - # 1. Before min_position (no subscriber needs them) - # 2. Older than target_oldest (outside minimum retention window) - chunks_removed = 0 - while chunks_removed < min_position and self.chunk_buffer: - _chunk_bytes, chunk_timestamp = self.chunk_buffer[0] - if chunk_timestamp < target_oldest: - self.chunk_buffer.popleft() - chunks_removed += 1 - else: - # Stop when we reach chunks we want to keep - break - - # Adjust all subscriber positions to account for removed chunks - for sub_id in self.subscriber_positions: - self.subscriber_positions[sub_id] -= chunks_removed - - async def _read_chunk_from_source(self) -> None: - """Read next chunk from audio source and add to buffer.""" - try: - chunk = await anext(self.audio_source) - async with self.buffer_lock: - # Calculate timestamp for this chunk - chunk_timestamp = self.current_position - chunk_duration = self._bytes_to_seconds(len(chunk)) - - # Append chunk with its timestamp - self.chunk_buffer.append((chunk, chunk_timestamp)) - - # Update current position - self.current_position += chunk_duration - - # Safety check: ensure buffer doesn't grow unbounded - if self._get_buffer_duration() > MAX_BUFFER_DURATION: - msg = f"Buffer exceeded maximum duration ({MAX_BUFFER_DURATION}s)" - raise RuntimeError(msg) - except StopAsyncIteration: - # Source exhausted, add EOF marker - async with self.buffer_lock: - self.chunk_buffer.append((b"", self.current_position)) - self.stream_ended = True - except Exception: - # Source errored or was canceled, mark stream as ended - async with self.buffer_lock: - self.stream_ended = True - raise - - async def _check_buffer(self, subscriber_id: UUID) -> bool | None: - """ - Check if buffer has grown or stream ended. - - REQUIRES: Caller must hold self.source_read_lock before calling. - - Returns: - True if should continue reading loop (chunk found in buffer), - False if should break (stream ended), - None if should proceed to read from source. - """ - async with self.buffer_lock: - position = self.subscriber_positions[subscriber_id] - if position < len(self.chunk_buffer): - # Another subscriber already read the chunk - return True - if self.stream_ended: - # Stream ended while waiting for source lock - return False - return None # Continue to read from source - - async def _get_chunk_from_buffer(self, subscriber_id: UUID) -> bytes | None: - """ - Get next chunk from buffer for subscriber. - - Returns: - Chunk bytes if available, None if no chunk available, or empty bytes for EOF. - """ - async with self.buffer_lock: - position = self.subscriber_positions[subscriber_id] - - # Check if we have a chunk at this position - if position < len(self.chunk_buffer): - # Chunk available in buffer - chunk_data, _ = self.chunk_buffer[position] - - # Move to next position - self.subscriber_positions[subscriber_id] = position + 1 - - # Cleanup old chunks that no one needs - self._cleanup_old_chunks() - return chunk_data - if self.stream_ended: - # Stream ended and we've read all buffered chunks - return b"" - return None - - async def _cleanup_subscriber(self, subscriber_id: UUID) -> None: - """Clean up subscriber and close stream if no subscribers left.""" - async with self.buffer_lock: - if subscriber_id in self.subscriber_positions: - del self.subscriber_positions[subscriber_id] - - # If no subscribers left, close the stream - if not self.subscriber_positions and not self.stream_ended: - self.stream_ended = True - # Close the audio source generator to prevent resource leak - with suppress(Exception): - await self.audio_source.aclose() - - async def get_stream( - self, - output_format: AudioFormat, - filter_params: list[str] | None = None, - ) -> tuple[AsyncGenerator[bytes, None], float]: - """ - Get (client specific encoded) ffmpeg stream. - - Returns: - A tuple of (audio generator, actual position in seconds) - """ - audio_gen, position = await self.subscribe_raw() - - # Calculate frame size for alignment - # Frame size = channels * bytes_per_sample - bytes_per_frame = output_format.channels * (output_format.bit_depth // 8) - - async def _stream_with_ffmpeg() -> AsyncGenerator[bytes, None]: - buffer = b"" - try: - async for chunk in get_ffmpeg_stream( - audio_input=audio_gen, - input_format=self.audio_format, - output_format=output_format, - filter_params=filter_params, - ): - buffer += chunk - # Yield only complete frames - aligned_size = (len(buffer) // bytes_per_frame) * bytes_per_frame - if aligned_size > 0: - yield buffer[:aligned_size] - buffer = buffer[aligned_size:] - # Yield any remaining complete frames at end of stream - if buffer: - aligned_size = (len(buffer) // bytes_per_frame) * bytes_per_frame - if aligned_size > 0: - yield buffer[:aligned_size] - finally: - # Ensure audio_gen cleanup runs immediately - with suppress(Exception): - await audio_gen.aclose() - - return _stream_with_ffmpeg(), position - - async def _generate(self, subscriber_id: UUID) -> AsyncGenerator[bytes, None]: - """ - Generate audio chunks for a subscriber. - - Yields chunks from the buffer until the stream ends, reading from the source - as needed. Automatically cleans up the subscriber on exit. - """ - try: - # Position already set above atomically with timestamp capture - while True: - # Try to get chunk from buffer - chunk_bytes = await self._get_chunk_from_buffer(subscriber_id) - - # Release lock before yielding to avoid deadlock - if chunk_bytes is not None: - if chunk_bytes == b"": - # End of stream marker - break - yield chunk_bytes - else: - # No chunk available, need to read from source - # Use source_read_lock to ensure only one subscriber reads at a time - async with self.source_read_lock: - # Check again if buffer has grown or stream ended while waiting - check_result = await self._check_buffer(subscriber_id) - if check_result is True: - # Another subscriber already read the chunk - continue - if check_result is False: - # Stream ended while waiting for source lock - break - - # Read next chunk from source (check_result is None) - # Note: This may block if the audio_source does synchronous I/O - await self._read_chunk_from_source() - - finally: - await self._cleanup_subscriber(subscriber_id) - - async def subscribe_raw(self) -> tuple[AsyncGenerator[bytes, None], float]: - """ - Subscribe to the raw/unaltered audio stream. - - Returns: - A tuple of (audio generator, actual position in seconds). - The position indicates where in the stream the first chunk will be from. - - Note: - Callers must properly consume or cancel the returned generator to prevent - resource leaks. - """ - subscriber_id = uuid4() - - # Atomically capture starting position and register subscriber while holding lock - async with self.buffer_lock: - if self.chunk_buffer: - _, starting_position = self.chunk_buffer[0] - # Log buffer time range for debugging - newest_ts = self.chunk_buffer[-1][1] - oldest_relative = starting_position - self.current_position - newest_relative = newest_ts - self.current_position - LOGGER.debug( - "New subscriber joining: buffer contains %.3fs (from %.3fs to %.3fs, " - "current_position=%.3fs)", - newest_ts - starting_position, - oldest_relative, - newest_relative, - self.current_position, - ) - else: - starting_position = self.current_position - LOGGER.debug( - "New subscriber joining: buffer is empty, starting at current_position=%.3fs", - self.current_position, - ) - # Register subscriber at position 0 (start of buffer) - self.subscriber_positions[subscriber_id] = 0 - - # Return generator and starting position in seconds - return self._generate(subscriber_id), starting_position diff --git a/requirements_all.txt b/requirements_all.txt index 42914953..7544fe65 100644 --- a/requirements_all.txt +++ b/requirements_all.txt @@ -11,7 +11,7 @@ aiojellyfin==0.14.1 aiomusiccast==0.15.0 aiortc>=1.6.0 aiorun==2025.1.1 -aiosendspin==3.0.0 +aiosendspin==4.0.1 aioslimproto==3.1.5 aiosonos==0.1.9 aiosqlite==0.22.1 -- 2.34.1