import asyncio
import time
-from collections.abc import Callable
+from collections.abc import AsyncGenerator, Callable
from io import BytesIO
from typing import TYPE_CHECKING, cast
INTERNAL_PCM_FORMAT,
)
from music_assistant.helpers.audio import get_player_filter_params
-from music_assistant.helpers.ffmpeg import get_ffmpeg_stream
from music_assistant.models.player import Player, PlayerMedia
+from .timed_client_stream import TimedClientStream
+
if TYPE_CHECKING:
from aioresonate.server.client import ResonateClient
from music_assistant_models.event import MassEvent
from .provider import ResonateProvider
+class MusicAssistantMediaStream(MediaStream):
+ """MediaStream implementation for Music Assistant with per-player DSP support."""
+
+ player_instance: ResonatePlayer
+ internal_format: AudioFormat
+ output_format: AudioFormat
+
+ def __init__(
+ self,
+ *,
+ main_channel_source: AsyncGenerator[bytes, None],
+ main_channel_format: ResonateAudioFormat,
+ player_instance: ResonatePlayer,
+ internal_format: AudioFormat,
+ output_format: AudioFormat,
+ ) -> None:
+ """
+ Initialise the media stream with audio source and format for main_channel().
+
+ Args:
+ main_channel_source: Audio source generator for the main channel.
+ main_channel_format: Audio format for the main channel (includes codec).
+ player_instance: The ResonatePlayer instance for accessing mass and streams.
+ internal_format: Internal processing format (float32 for headroom).
+ output_format: Output PCM format (16-bit for player output).
+ """
+ super().__init__(
+ main_channel_source=main_channel_source,
+ main_channel_format=main_channel_format,
+ )
+ self.player_instance = player_instance
+ self.internal_format = internal_format
+ self.output_format = output_format
+
+ async def player_channel(
+ self,
+ player_id: str,
+ preferred_format: ResonateAudioFormat | None = None,
+ position_us: int = 0,
+ ) -> tuple[AsyncGenerator[bytes, None], ResonateAudioFormat, int] | None:
+ """
+ Get a player-specific audio stream with per-player DSP.
+
+ Args:
+ player_id: Identifier for the player requesting the stream.
+ preferred_format: The player's preferred native format for the stream.
+ The implementation may return a different format; the library
+ will handle any necessary conversion.
+ position_us: Position in microseconds relative to the main_stream start.
+ Used for late-joining players to sync with the main stream.
+
+ Returns:
+ A tuple of (audio generator, audio format, actual position in microseconds)
+ or None if unavailable. If None, the main_stream is used as fallback.
+ """
+ mass = self.player_instance.mass
+ multi_client_stream = self.player_instance.timed_client_stream
+ assert multi_client_stream is not None
+
+ dsp = mass.config.get_player_dsp_config(player_id)
+ if not dsp.enabled:
+ # DSP is disabled for this player, use main_stream
+ return None
+
+ # Get per-player DSP filter parameters
+ # Convert from internal format to output format
+ filter_params = get_player_filter_params(
+ mass, player_id, self.internal_format, self.output_format
+ )
+
+ # Get the stream with position (in seconds)
+ stream_gen, actual_position = await multi_client_stream.get_stream(
+ output_format=self.output_format,
+ filter_params=filter_params,
+ )
+
+ # Convert position from seconds to microseconds for aioresonate API
+ actual_position_us = int(actual_position * 1_000_000)
+
+ # Return actual position in microseconds relative to main_stream start
+ self.player_instance.logger.debug(
+ "Providing channel stream for player %s at position %d us",
+ player_id,
+ actual_position_us,
+ )
+ return (
+ stream_gen,
+ ResonateAudioFormat(
+ sample_rate=self.output_format.sample_rate,
+ bit_depth=self.output_format.bit_depth,
+ channels=self.output_format.channels,
+ codec=self._main_channel_format.codec,
+ ),
+ actual_position_us,
+ )
+
+
class ResonatePlayer(Player):
"""A resonate audio player in Music Assistant."""
unsub_group_event_cb: Callable[[], None]
last_sent_artwork_url: str | None = None
_playback_task: asyncio.Task[None] | None = None
+ timed_client_stream: TimedClientStream | None = None
def __init__(self, provider: ResonateProvider, player_id: str) -> None:
"""Initialize the Player."""
self._attr_type = PlayerType.PLAYER
self._attr_supported_features = {
PlayerFeature.SET_MEMBERS,
+ PlayerFeature.MULTI_DEVICE_DSP,
}
self._attr_can_group_with = {provider.lookup_key}
self._attr_power_control = PLAYER_CONTROL_NONE
case ClientGroupChangedEvent(new_group=new_group):
self.unsub_group_event_cb()
self.unsub_group_event_cb = new_group.add_event_listener(self.group_event_cb)
+ # Sync playback state from the new group
+ match new_group.state:
+ case PlaybackStateType.PLAYING:
+ self._attr_playback_state = PlaybackState.PLAYING
+ case PlaybackStateType.PAUSED:
+ self._attr_playback_state = PlaybackState.PAUSED
+ case PlaybackStateType.STOPPED:
+ self._attr_playback_state = PlaybackState.IDLE
+ self.update_state()
async def group_event_cb(self, event: GroupEvent) -> None:
"""Event callback registered to the resonate group this player belongs to."""
self._attr_elapsed_time = 0
self._attr_elapsed_time_last_updated = time.time()
self.update_state()
- case GroupMemberAddedEvent(client_id=_):
- pass
- case GroupMemberRemovedEvent(client_id=_):
- pass
+ case GroupMemberAddedEvent(client_id=client_id):
+ self.logger.debug("Group member added: %s", client_id)
+ if client_id not in self._attr_group_members:
+ self._attr_group_members.append(client_id)
+ self.update_state()
+ case GroupMemberRemovedEvent(client_id=client_id):
+ self.logger.debug("Group member removed: %s", client_id)
+ if client_id in self._attr_group_members:
+ self._attr_group_members.remove(client_id)
+ self.update_state()
case GroupDeletedEvent():
pass
# Convert string codec to AudioCodec enum
audio_codec = AudioCodec(output_codec)
- # Apply DSP and other audio filters
- audio_source = get_ffmpeg_stream(
- audio_input=self.mass.streams.get_stream(media, flow_pcm_format),
- input_format=flow_pcm_format,
- output_format=pcm_format,
- filter_params=get_player_filter_params(
- self.mass, self.player_id, flow_pcm_format, pcm_format
- ),
+ # Get clean audio source in flow format (high quality internal format)
+ # Format conversion and per-player DSP will be applied via player_channel
+ audio_source = self.mass.streams.get_stream(media, flow_pcm_format)
+
+ # Create TimedClientStream to wrap the clean audio source
+ # This distributes the audio to multiple subscribers without DSP
+ self.timed_client_stream = TimedClientStream(
+ audio_source=audio_source,
+ audio_format=flow_pcm_format,
)
- # Create MediaStream wrapping the audio source generator
- media_stream = MediaStream(
- source=audio_source,
- audio_format=ResonateAudioFormat(
+ # Setup the main channel subscription
+ # aioresonate only really supports 16-bit for now TODO: upgrade later to 32-bit
+ main_channel_gen, main_position = await self.timed_client_stream.get_stream(
+ output_format=pcm_format,
+ filter_params=None, # TODO: this should probably still include the safety limiter
+ )
+ assert main_position == 0.0 # first subscriber, should be zero
+ media_stream = MusicAssistantMediaStream(
+ main_channel_source=main_channel_gen,
+ main_channel_format=ResonateAudioFormat(
sample_rate=pcm_format.sample_rate,
bit_depth=pcm_format.bit_depth,
channels=pcm_format.channels,
codec=audio_codec,
),
+ player_instance=self,
+ internal_format=flow_pcm_format,
+ output_format=pcm_format,
)
stop_time = await self.api.group.play_media(media_stream)
except Exception:
self.logger.exception("Error during playback for player %s", self.display_name)
raise
+ finally:
+ self.timed_client_stream = None
async def set_members(
self,
player = cast("ResonatePlayer", player) # For type checking
await self.api.group.remove_client(player.api)
player.api.disconnect_behaviour = DisconnectBehaviour.STOP
- self._attr_group_members.remove(player_id)
for player_id in player_ids_to_add or []:
player = self.mass.players.get(player_id, True)
player = cast("ResonatePlayer", player) # For type checking
player.api.disconnect_behaviour = DisconnectBehaviour.UNGROUP
await self.api.group.add_client(player.api)
- self._attr_group_members.append(player_id)
- self.update_state()
-
- def _update_media_art(self, image_data: bytes) -> None:
- image = Image.open(BytesIO(image_data))
- self.api.group.set_media_art(image)
+ # self.group_members will be updated by the group event callback
async def _on_queue_update(self, event: MassEvent) -> None:
"""Extract and send current media metadata to resonate players on queue updates."""
current_item.media_item
)
if image_data is not None:
- await asyncio.to_thread(self._update_media_art, image_data)
+ image = await asyncio.to_thread(Image.open, BytesIO(image_data))
+ await self.api.group.set_media_art(image)
# TODO: null media art if not set?
track_duration = current_item.duration
--- /dev/null
+"""
+Timestamped multi-client audio stream for position-aware playback.
+
+This module provides a multi-client streaming implementation optimized for
+aioresonate's synchronized multi-room audio playback. Each audio chunk is
+timestamped, allowing late-joining players to start at the correct position
+for synchronized playback across multiple devices.
+"""
+
+import asyncio
+import logging
+from collections import deque
+from collections.abc import AsyncGenerator
+from contextlib import suppress
+from uuid import UUID, uuid4
+
+from music_assistant_models.media_items import AudioFormat
+
+from music_assistant.helpers.ffmpeg import get_ffmpeg_stream
+
+LOGGER = logging.getLogger(__name__)
+
+# Minimum/target buffer retention time in seconds
+# This 10s buffer is currently required since:
+# - aioresonate currently uses a fixed 5s buffer to allow up to ~4s of network interruption
+# - ~2s allows for ffmpeg processing time and some margin
+# - ~3s are currently needed internally by aioresonate for initial buffering
+MIN_BUFFER_DURATION = 10.0
+# Maximum buffer duration before raising an error (safety mechanism)
+MAX_BUFFER_DURATION = MIN_BUFFER_DURATION + 5.0
+
+
+class TimedClientStream:
+ """Multi-client audio stream with timestamped chunks for synchronized playback."""
+
+ audio_source: AsyncGenerator[bytes, None]
+ """The source audio stream to read from."""
+ audio_format: AudioFormat
+ """The audio format of the source stream."""
+ chunk_buffer: deque[tuple[bytes, float]]
+ """Buffer storing chunks with their timestamps in seconds (chunk_data, timestamp_seconds)."""
+ subscriber_positions: dict[UUID, int]
+ """Subscriber positions: maps subscriber_id to position (index into chunk_buffer)."""
+ buffer_lock: asyncio.Lock
+ """Lock for buffer and shared state access."""
+ source_read_lock: asyncio.Lock
+ """Lock to serialize audio source reads."""
+ stream_ended: bool = False
+ """Track if stream has ended."""
+ current_position: float = 0.0
+ """Current position in seconds (from stream start)."""
+
+ def __init__(
+ self,
+ audio_source: AsyncGenerator[bytes, None],
+ audio_format: AudioFormat,
+ ) -> None:
+ """Initialize TimedClientStream."""
+ self.audio_source = audio_source
+ self.audio_format = audio_format
+ self.chunk_buffer = deque()
+ self.subscriber_positions = {}
+ self.buffer_lock = asyncio.Lock()
+ self.source_read_lock = asyncio.Lock()
+
+ def _get_bytes_per_second(self) -> int:
+ """Get bytes per second for the audio format."""
+ return (
+ self.audio_format.sample_rate
+ * self.audio_format.channels
+ * (self.audio_format.bit_depth // 8)
+ )
+
+ def _bytes_to_seconds(self, num_bytes: int) -> float:
+ """Convert bytes to seconds based on audio format."""
+ bytes_per_second = self._get_bytes_per_second()
+ if bytes_per_second == 0:
+ return 0.0
+ return num_bytes / bytes_per_second
+
+ def _get_buffer_duration(self) -> float:
+ """Calculate total duration of buffered chunks in seconds."""
+ if not self.chunk_buffer:
+ return 0.0
+ # Duration is from first chunk timestamp to current position
+ first_chunk_timestamp = self.chunk_buffer[0][1]
+ return self.current_position - first_chunk_timestamp
+
+ def _cleanup_old_chunks(self) -> None:
+ """Remove old chunks when all subscribers read them and min duration exceeded."""
+ # Find the oldest position still needed by any subscriber
+ if self.subscriber_positions:
+ min_position = min(self.subscriber_positions.values())
+ else:
+ min_position = len(self.chunk_buffer)
+
+ # Calculate target oldest timestamp
+ # This ensures buffer contains at least MIN_BUFFER_DURATION seconds of recent data
+ target_oldest = self.current_position - MIN_BUFFER_DURATION
+
+ # Remove old chunks that meet both conditions:
+ # 1. Before min_position (no subscriber needs them)
+ # 2. Older than target_oldest (outside minimum retention window)
+ chunks_removed = 0
+ while chunks_removed < min_position and self.chunk_buffer:
+ _chunk_bytes, chunk_timestamp = self.chunk_buffer[0]
+ if chunk_timestamp < target_oldest:
+ self.chunk_buffer.popleft()
+ chunks_removed += 1
+ else:
+ # Stop when we reach chunks we want to keep
+ break
+
+ # Adjust all subscriber positions to account for removed chunks
+ for sub_id in self.subscriber_positions:
+ self.subscriber_positions[sub_id] -= chunks_removed
+
+ async def _read_chunk_from_source(self) -> None:
+ """Read next chunk from audio source and add to buffer."""
+ try:
+ chunk = await anext(self.audio_source)
+ async with self.buffer_lock:
+ # Calculate timestamp for this chunk
+ chunk_timestamp = self.current_position
+ chunk_duration = self._bytes_to_seconds(len(chunk))
+
+ # Append chunk with its timestamp
+ self.chunk_buffer.append((chunk, chunk_timestamp))
+
+ # Update current position
+ self.current_position += chunk_duration
+
+ # Safety check: ensure buffer doesn't grow unbounded
+ if self._get_buffer_duration() > MAX_BUFFER_DURATION:
+ msg = f"Buffer exceeded maximum duration ({MAX_BUFFER_DURATION}s)"
+ raise RuntimeError(msg)
+ except StopAsyncIteration:
+ # Source exhausted, add EOF marker
+ async with self.buffer_lock:
+ self.chunk_buffer.append((b"", self.current_position))
+ self.stream_ended = True
+ except Exception:
+ # Source errored or was canceled, mark stream as ended
+ async with self.buffer_lock:
+ self.stream_ended = True
+ raise
+
+ async def _check_buffer(self, subscriber_id: UUID) -> bool | None:
+ """
+ Check if buffer has grown or stream ended.
+
+ REQUIRES: Caller must hold self.source_read_lock before calling.
+
+ Returns:
+ True if should continue reading loop (chunk found in buffer),
+ False if should break (stream ended),
+ None if should proceed to read from source.
+ """
+ async with self.buffer_lock:
+ position = self.subscriber_positions[subscriber_id]
+ if position < len(self.chunk_buffer):
+ # Another subscriber already read the chunk
+ return True
+ if self.stream_ended:
+ # Stream ended while waiting for source lock
+ return False
+ return None # Continue to read from source
+
+ async def _get_chunk_from_buffer(self, subscriber_id: UUID) -> bytes | None:
+ """
+ Get next chunk from buffer for subscriber.
+
+ Returns:
+ Chunk bytes if available, None if no chunk available, or empty bytes for EOF.
+ """
+ async with self.buffer_lock:
+ position = self.subscriber_positions[subscriber_id]
+
+ # Check if we have a chunk at this position
+ if position < len(self.chunk_buffer):
+ # Chunk available in buffer
+ chunk_data, _ = self.chunk_buffer[position]
+
+ # Move to next position
+ self.subscriber_positions[subscriber_id] = position + 1
+
+ # Cleanup old chunks that no one needs
+ self._cleanup_old_chunks()
+ return chunk_data
+ if self.stream_ended:
+ # Stream ended and we've read all buffered chunks
+ return b""
+ return None
+
+ async def _cleanup_subscriber(self, subscriber_id: UUID) -> None:
+ """Clean up subscriber and close stream if no subscribers left."""
+ async with self.buffer_lock:
+ if subscriber_id in self.subscriber_positions:
+ del self.subscriber_positions[subscriber_id]
+
+ # If no subscribers left, close the stream
+ if not self.subscriber_positions and not self.stream_ended:
+ self.stream_ended = True
+ # Close the audio source generator to prevent resource leak
+ with suppress(Exception):
+ await self.audio_source.aclose()
+
+ async def get_stream(
+ self,
+ output_format: AudioFormat,
+ filter_params: list[str] | None = None,
+ ) -> tuple[AsyncGenerator[bytes, None], float]:
+ """
+ Get (client specific encoded) ffmpeg stream.
+
+ Returns:
+ A tuple of (audio generator, actual position in seconds)
+ """
+ audio_gen, position = await self.subscribe_raw()
+
+ async def _stream_with_ffmpeg() -> AsyncGenerator[bytes, None]:
+ try:
+ async for chunk in get_ffmpeg_stream(
+ audio_input=audio_gen,
+ input_format=self.audio_format,
+ output_format=output_format,
+ filter_params=filter_params,
+ ):
+ yield chunk
+ finally:
+ # Ensure audio_gen cleanup runs immediately
+ with suppress(Exception):
+ await audio_gen.aclose()
+
+ return _stream_with_ffmpeg(), position
+
+ async def _generate(self, subscriber_id: UUID) -> AsyncGenerator[bytes, None]:
+ """
+ Generate audio chunks for a subscriber.
+
+ Yields chunks from the buffer until the stream ends, reading from the source
+ as needed. Automatically cleans up the subscriber on exit.
+ """
+ try:
+ # Position already set above atomically with timestamp capture
+ while True:
+ # Try to get chunk from buffer
+ chunk_bytes = await self._get_chunk_from_buffer(subscriber_id)
+
+ # Release lock before yielding to avoid deadlock
+ if chunk_bytes is not None:
+ if chunk_bytes == b"":
+ # End of stream marker
+ break
+ yield chunk_bytes
+ else:
+ # No chunk available, need to read from source
+ # Use source_read_lock to ensure only one subscriber reads at a time
+ async with self.source_read_lock:
+ # Check again if buffer has grown or stream ended while waiting
+ check_result = await self._check_buffer(subscriber_id)
+ if check_result is True:
+ # Another subscriber already read the chunk
+ continue
+ if check_result is False:
+ # Stream ended while waiting for source lock
+ break
+
+ # Read next chunk from source (check_result is None)
+ # Note: This may block if the audio_source does synchronous I/O
+ await self._read_chunk_from_source()
+
+ finally:
+ await self._cleanup_subscriber(subscriber_id)
+
+ async def subscribe_raw(self) -> tuple[AsyncGenerator[bytes, None], float]:
+ """
+ Subscribe to the raw/unaltered audio stream.
+
+ Returns:
+ A tuple of (audio generator, actual position in seconds).
+ The position indicates where in the stream the first chunk will be from.
+
+ Note:
+ Callers must properly consume or cancel the returned generator to prevent
+ resource leaks.
+ """
+ subscriber_id = uuid4()
+
+ # Atomically capture starting position and register subscriber while holding lock
+ async with self.buffer_lock:
+ if self.chunk_buffer:
+ _, starting_position = self.chunk_buffer[0]
+ # Log buffer time range for debugging
+ newest_ts = self.chunk_buffer[-1][1]
+ oldest_relative = starting_position - self.current_position
+ newest_relative = newest_ts - self.current_position
+ LOGGER.debug(
+ "New subscriber joining: buffer contains %.3fs (from %.3fs to %.3fs, "
+ "current_position=%.3fs)",
+ newest_ts - starting_position,
+ oldest_relative,
+ newest_relative,
+ self.current_position,
+ )
+ else:
+ starting_position = self.current_position
+ LOGGER.debug(
+ "New subscriber joining: buffer is empty, starting at current_position=%.3fs",
+ self.current_position,
+ )
+ # Register subscriber at position 0 (start of buffer)
+ self.subscriber_positions[subscriber_id] = 0
+
+ # Return generator and starting position in seconds
+ return self._generate(subscriber_id), starting_position