From: Maxim Raznatovski Date: Fri, 14 Nov 2025 15:02:16 +0000 (+0100) Subject: Update Resonate provider with `MULTI_DEVICE_DSP` support and other improvements ... X-Git-Url: https://git.kitaultman.com/?a=commitdiff_plain;h=9ec2a9f5ff198bbbc2b94db02982d7dcab30b1c2;p=music-assistant-server.git Update Resonate provider with `MULTI_DEVICE_DSP` support and other improvements (#2616) * Update `MediaStream` usage * Support `MULTI_DEVICE_DSP` for resonate * Rename `stream` to `channel` in `MediaStream` * Initial working implementation for per player DSP support * Refactors, fixes, and comments * Rename `shared_buffer` to `chunk_buffer` * Use `UUID` instead of `str` * Use seconds in `multi_client_stream` * Move attribute definitions to class level * Remove dead code * Refactor `_cleanup_old_chunks` * Rename `MultiClientStream` to `TimedClientStream` * Refactor `_read_chunk_from` * Convert main channel to 16-bit PCM for aioresonate compatibility * Use separate host and advertise_host Pass bind_ip for socket binding and publish_ip for mDNS advertising to correctly handle scenarios where the server binds to all interfaces (0.0.0.0) but advertises a specific IP address to clients. * Bump `aioresonate` * Add safety check to `TimedClientStream` to avoid memory leaks * Update `MIN_BUFFER_DURATION` comment with findings * Update music_assistant/providers/resonate/timed_client_stream.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Clean up `timed_client_stream` reference after playback * Bump `aioresonate` * Update call to the now async friendly `set_media_art` * Update `playback_state` when joining a new group * Update `group_members` based on `aioresonate` events Instead of optimistically setting them. * Move `MusicAssistantMediaStream` outside a method * Run audio cleanup immediately * Remove redundant type annotation * Use `anext()` * Remove unused async from `_cleanup_old_chunks` * Simplify method name and clarify lock requirement * Move `_generate` to class-level method * Refactor `MusicAssistantMediaStream` parameters --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Co-authored-by: Paulus Schoutsen --- diff --git a/music_assistant/providers/resonate/manifest.json b/music_assistant/providers/resonate/manifest.json index 57552aa2..ca71e276 100644 --- a/music_assistant/providers/resonate/manifest.json +++ b/music_assistant/providers/resonate/manifest.json @@ -5,5 +5,5 @@ "name": "Resonate (WIP)", "description": "Resonate (working title) is the next generation streaming protocol built by the Open Home Foundation. Follow the development on Discord to see how you can get involved.", "codeowners": ["@music-assistant"], - "requirements": ["aioresonate==0.11.0"] + "requirements": ["aioresonate==0.13.1"] } diff --git a/music_assistant/providers/resonate/player.py b/music_assistant/providers/resonate/player.py index 39becc4d..b6e082b5 100644 --- a/music_assistant/providers/resonate/player.py +++ b/music_assistant/providers/resonate/player.py @@ -4,7 +4,7 @@ from __future__ import annotations import asyncio import time -from collections.abc import Callable +from collections.abc import AsyncGenerator, Callable from io import BytesIO from typing import TYPE_CHECKING, cast @@ -49,9 +49,10 @@ from music_assistant.constants import ( INTERNAL_PCM_FORMAT, ) from music_assistant.helpers.audio import get_player_filter_params -from music_assistant.helpers.ffmpeg import get_ffmpeg_stream from music_assistant.models.player import Player, PlayerMedia +from .timed_client_stream import TimedClientStream + if TYPE_CHECKING: from aioresonate.server.client import ResonateClient from music_assistant_models.event import MassEvent @@ -59,6 +60,103 @@ if TYPE_CHECKING: from .provider import ResonateProvider +class MusicAssistantMediaStream(MediaStream): + """MediaStream implementation for Music Assistant with per-player DSP support.""" + + player_instance: ResonatePlayer + internal_format: AudioFormat + output_format: AudioFormat + + def __init__( + self, + *, + main_channel_source: AsyncGenerator[bytes, None], + main_channel_format: ResonateAudioFormat, + player_instance: ResonatePlayer, + internal_format: AudioFormat, + output_format: AudioFormat, + ) -> None: + """ + Initialise the media stream with audio source and format for main_channel(). + + Args: + main_channel_source: Audio source generator for the main channel. + main_channel_format: Audio format for the main channel (includes codec). + player_instance: The ResonatePlayer instance for accessing mass and streams. + internal_format: Internal processing format (float32 for headroom). + output_format: Output PCM format (16-bit for player output). + """ + super().__init__( + main_channel_source=main_channel_source, + main_channel_format=main_channel_format, + ) + self.player_instance = player_instance + self.internal_format = internal_format + self.output_format = output_format + + async def player_channel( + self, + player_id: str, + preferred_format: ResonateAudioFormat | None = None, + position_us: int = 0, + ) -> tuple[AsyncGenerator[bytes, None], ResonateAudioFormat, int] | None: + """ + Get a player-specific audio stream with per-player DSP. + + Args: + player_id: Identifier for the player requesting the stream. + preferred_format: The player's preferred native format for the stream. + The implementation may return a different format; the library + will handle any necessary conversion. + position_us: Position in microseconds relative to the main_stream start. + Used for late-joining players to sync with the main stream. + + Returns: + A tuple of (audio generator, audio format, actual position in microseconds) + or None if unavailable. If None, the main_stream is used as fallback. + """ + mass = self.player_instance.mass + multi_client_stream = self.player_instance.timed_client_stream + assert multi_client_stream is not None + + dsp = mass.config.get_player_dsp_config(player_id) + if not dsp.enabled: + # DSP is disabled for this player, use main_stream + return None + + # Get per-player DSP filter parameters + # Convert from internal format to output format + filter_params = get_player_filter_params( + mass, player_id, self.internal_format, self.output_format + ) + + # Get the stream with position (in seconds) + stream_gen, actual_position = await multi_client_stream.get_stream( + output_format=self.output_format, + filter_params=filter_params, + ) + + # Convert position from seconds to microseconds for aioresonate API + actual_position_us = int(actual_position * 1_000_000) + + # Return actual position in microseconds relative to main_stream start + self.player_instance.logger.debug( + "Providing channel stream for player %s at position %d us", + player_id, + actual_position_us, + ) + return ( + stream_gen, + ResonateAudioFormat( + sample_rate=self.output_format.sample_rate, + bit_depth=self.output_format.bit_depth, + channels=self.output_format.channels, + codec=self._main_channel_format.codec, + ), + actual_position_us, + ) + + class ResonatePlayer(Player): """A resonate audio player in Music Assistant.""" @@ -67,6 +165,7 @@ class ResonatePlayer(Player): unsub_group_event_cb: Callable[[], None] last_sent_artwork_url: str | None = None _playback_task: asyncio.Task[None] | None = None + timed_client_stream: TimedClientStream | None = None def __init__(self, provider: ResonateProvider, player_id: str) -> None: """Initialize the Player.""" @@ -84,6 +183,7 @@ class ResonatePlayer(Player): self._attr_type = PlayerType.PLAYER self._attr_supported_features = { PlayerFeature.SET_MEMBERS, + PlayerFeature.MULTI_DEVICE_DSP, } self._attr_can_group_with = {provider.lookup_key} self._attr_power_control = PLAYER_CONTROL_NONE @@ -110,6 +210,15 @@ class ResonatePlayer(Player): case ClientGroupChangedEvent(new_group=new_group): self.unsub_group_event_cb() self.unsub_group_event_cb = new_group.add_event_listener(self.group_event_cb) + # Sync playback state from the new group + match new_group.state: + case PlaybackStateType.PLAYING: + self._attr_playback_state = PlaybackState.PLAYING + case PlaybackStateType.PAUSED: + self._attr_playback_state = PlaybackState.PAUSED + case PlaybackStateType.STOPPED: + self._attr_playback_state = PlaybackState.IDLE + self.update_state() async def group_event_cb(self, event: GroupEvent) -> None: """Event callback registered to the resonate group this player belongs to.""" @@ -155,10 +264,16 @@ class ResonatePlayer(Player): self._attr_elapsed_time = 0 self._attr_elapsed_time_last_updated = time.time() self.update_state() - case GroupMemberAddedEvent(client_id=_): - pass - case GroupMemberRemovedEvent(client_id=_): - pass + case GroupMemberAddedEvent(client_id=client_id): + self.logger.debug("Group member added: %s", client_id) + if client_id not in self._attr_group_members: + self._attr_group_members.append(client_id) + self.update_state() + case GroupMemberRemovedEvent(client_id=client_id): + self.logger.debug("Group member removed: %s", client_id) + if client_id in self._attr_group_members: + self._attr_group_members.remove(client_id) + self.update_state() case GroupDeletedEvent(): pass @@ -224,25 +339,35 @@ class ResonatePlayer(Player): # Convert string codec to AudioCodec enum audio_codec = AudioCodec(output_codec) - # Apply DSP and other audio filters - audio_source = get_ffmpeg_stream( - audio_input=self.mass.streams.get_stream(media, flow_pcm_format), - input_format=flow_pcm_format, - output_format=pcm_format, - filter_params=get_player_filter_params( - self.mass, self.player_id, flow_pcm_format, pcm_format - ), + # Get clean audio source in flow format (high quality internal format) + # Format conversion and per-player DSP will be applied via player_channel + audio_source = self.mass.streams.get_stream(media, flow_pcm_format) + + # Create TimedClientStream to wrap the clean audio source + # This distributes the audio to multiple subscribers without DSP + self.timed_client_stream = TimedClientStream( + audio_source=audio_source, + audio_format=flow_pcm_format, ) - # Create MediaStream wrapping the audio source generator - media_stream = MediaStream( - source=audio_source, - audio_format=ResonateAudioFormat( + # Setup the main channel subscription + # aioresonate only really supports 16-bit for now TODO: upgrade later to 32-bit + main_channel_gen, main_position = await self.timed_client_stream.get_stream( + output_format=pcm_format, + filter_params=None, # TODO: this should probably still include the safety limiter + ) + assert main_position == 0.0 # first subscriber, should be zero + media_stream = MusicAssistantMediaStream( + main_channel_source=main_channel_gen, + main_channel_format=ResonateAudioFormat( sample_rate=pcm_format.sample_rate, bit_depth=pcm_format.bit_depth, channels=pcm_format.channels, codec=audio_codec, ), + player_instance=self, + internal_format=flow_pcm_format, + output_format=pcm_format, ) stop_time = await self.api.group.play_media(media_stream) @@ -253,6 +378,8 @@ class ResonatePlayer(Player): except Exception: self.logger.exception("Error during playback for player %s", self.display_name) raise + finally: + self.timed_client_stream = None async def set_members( self, @@ -268,18 +395,12 @@ class ResonatePlayer(Player): player = cast("ResonatePlayer", player) # For type checking await self.api.group.remove_client(player.api) player.api.disconnect_behaviour = DisconnectBehaviour.STOP - self._attr_group_members.remove(player_id) for player_id in player_ids_to_add or []: player = self.mass.players.get(player_id, True) player = cast("ResonatePlayer", player) # For type checking player.api.disconnect_behaviour = DisconnectBehaviour.UNGROUP await self.api.group.add_client(player.api) - self._attr_group_members.append(player_id) - self.update_state() - - def _update_media_art(self, image_data: bytes) -> None: - image = Image.open(BytesIO(image_data)) - self.api.group.set_media_art(image) + # self.group_members will be updated by the group event callback async def _on_queue_update(self, event: MassEvent) -> None: """Extract and send current media metadata to resonate players on queue updates.""" @@ -328,7 +449,8 @@ class ResonatePlayer(Player): current_item.media_item ) if image_data is not None: - await asyncio.to_thread(self._update_media_art, image_data) + image = await asyncio.to_thread(Image.open, BytesIO(image_data)) + await self.api.group.set_media_art(image) # TODO: null media art if not set? track_duration = current_item.duration diff --git a/music_assistant/providers/resonate/provider.py b/music_assistant/providers/resonate/provider.py index d1367acb..7e934fa4 100644 --- a/music_assistant/providers/resonate/provider.py +++ b/music_assistant/providers/resonate/provider.py @@ -66,7 +66,9 @@ class ResonateProvider(PlayerProvider): # Start server for handling incoming Resonate connections from clients # and mDNS discovery of new clients await self.server_api.start_server( - port=8927, host=cast("str", self.mass.streams.publish_ip) + port=8927, + host=self.mass.streams.bind_ip, + advertise_host=cast("str", self.mass.streams.publish_ip), ) async def unload(self, is_removed: bool = False) -> None: diff --git a/music_assistant/providers/resonate/timed_client_stream.py b/music_assistant/providers/resonate/timed_client_stream.py new file mode 100644 index 00000000..c09b9ca8 --- /dev/null +++ b/music_assistant/providers/resonate/timed_client_stream.py @@ -0,0 +1,316 @@ +""" +Timestamped multi-client audio stream for position-aware playback. + +This module provides a multi-client streaming implementation optimized for +aioresonate's synchronized multi-room audio playback. Each audio chunk is +timestamped, allowing late-joining players to start at the correct position +for synchronized playback across multiple devices. +""" + +import asyncio +import logging +from collections import deque +from collections.abc import AsyncGenerator +from contextlib import suppress +from uuid import UUID, uuid4 + +from music_assistant_models.media_items import AudioFormat + +from music_assistant.helpers.ffmpeg import get_ffmpeg_stream + +LOGGER = logging.getLogger(__name__) + +# Minimum/target buffer retention time in seconds +# This 10s buffer is currently required since: +# - aioresonate currently uses a fixed 5s buffer to allow up to ~4s of network interruption +# - ~2s allows for ffmpeg processing time and some margin +# - ~3s are currently needed internally by aioresonate for initial buffering +MIN_BUFFER_DURATION = 10.0 +# Maximum buffer duration before raising an error (safety mechanism) +MAX_BUFFER_DURATION = MIN_BUFFER_DURATION + 5.0 + + +class TimedClientStream: + """Multi-client audio stream with timestamped chunks for synchronized playback.""" + + audio_source: AsyncGenerator[bytes, None] + """The source audio stream to read from.""" + audio_format: AudioFormat + """The audio format of the source stream.""" + chunk_buffer: deque[tuple[bytes, float]] + """Buffer storing chunks with their timestamps in seconds (chunk_data, timestamp_seconds).""" + subscriber_positions: dict[UUID, int] + """Subscriber positions: maps subscriber_id to position (index into chunk_buffer).""" + buffer_lock: asyncio.Lock + """Lock for buffer and shared state access.""" + source_read_lock: asyncio.Lock + """Lock to serialize audio source reads.""" + stream_ended: bool = False + """Track if stream has ended.""" + current_position: float = 0.0 + """Current position in seconds (from stream start).""" + + def __init__( + self, + audio_source: AsyncGenerator[bytes, None], + audio_format: AudioFormat, + ) -> None: + """Initialize TimedClientStream.""" + self.audio_source = audio_source + self.audio_format = audio_format + self.chunk_buffer = deque() + self.subscriber_positions = {} + self.buffer_lock = asyncio.Lock() + self.source_read_lock = asyncio.Lock() + + def _get_bytes_per_second(self) -> int: + """Get bytes per second for the audio format.""" + return ( + self.audio_format.sample_rate + * self.audio_format.channels + * (self.audio_format.bit_depth // 8) + ) + + def _bytes_to_seconds(self, num_bytes: int) -> float: + """Convert bytes to seconds based on audio format.""" + bytes_per_second = self._get_bytes_per_second() + if bytes_per_second == 0: + return 0.0 + return num_bytes / bytes_per_second + + def _get_buffer_duration(self) -> float: + """Calculate total duration of buffered chunks in seconds.""" + if not self.chunk_buffer: + return 0.0 + # Duration is from first chunk timestamp to current position + first_chunk_timestamp = self.chunk_buffer[0][1] + return self.current_position - first_chunk_timestamp + + def _cleanup_old_chunks(self) -> None: + """Remove old chunks when all subscribers read them and min duration exceeded.""" + # Find the oldest position still needed by any subscriber + if self.subscriber_positions: + min_position = min(self.subscriber_positions.values()) + else: + min_position = len(self.chunk_buffer) + + # Calculate target oldest timestamp + # This ensures buffer contains at least MIN_BUFFER_DURATION seconds of recent data + target_oldest = self.current_position - MIN_BUFFER_DURATION + + # Remove old chunks that meet both conditions: + # 1. Before min_position (no subscriber needs them) + # 2. Older than target_oldest (outside minimum retention window) + chunks_removed = 0 + while chunks_removed < min_position and self.chunk_buffer: + _chunk_bytes, chunk_timestamp = self.chunk_buffer[0] + if chunk_timestamp < target_oldest: + self.chunk_buffer.popleft() + chunks_removed += 1 + else: + # Stop when we reach chunks we want to keep + break + + # Adjust all subscriber positions to account for removed chunks + for sub_id in self.subscriber_positions: + self.subscriber_positions[sub_id] -= chunks_removed + + async def _read_chunk_from_source(self) -> None: + """Read next chunk from audio source and add to buffer.""" + try: + chunk = await anext(self.audio_source) + async with self.buffer_lock: + # Calculate timestamp for this chunk + chunk_timestamp = self.current_position + chunk_duration = self._bytes_to_seconds(len(chunk)) + + # Append chunk with its timestamp + self.chunk_buffer.append((chunk, chunk_timestamp)) + + # Update current position + self.current_position += chunk_duration + + # Safety check: ensure buffer doesn't grow unbounded + if self._get_buffer_duration() > MAX_BUFFER_DURATION: + msg = f"Buffer exceeded maximum duration ({MAX_BUFFER_DURATION}s)" + raise RuntimeError(msg) + except StopAsyncIteration: + # Source exhausted, add EOF marker + async with self.buffer_lock: + self.chunk_buffer.append((b"", self.current_position)) + self.stream_ended = True + except Exception: + # Source errored or was canceled, mark stream as ended + async with self.buffer_lock: + self.stream_ended = True + raise + + async def _check_buffer(self, subscriber_id: UUID) -> bool | None: + """ + Check if buffer has grown or stream ended. + + REQUIRES: Caller must hold self.source_read_lock before calling. + + Returns: + True if should continue reading loop (chunk found in buffer), + False if should break (stream ended), + None if should proceed to read from source. + """ + async with self.buffer_lock: + position = self.subscriber_positions[subscriber_id] + if position < len(self.chunk_buffer): + # Another subscriber already read the chunk + return True + if self.stream_ended: + # Stream ended while waiting for source lock + return False + return None # Continue to read from source + + async def _get_chunk_from_buffer(self, subscriber_id: UUID) -> bytes | None: + """ + Get next chunk from buffer for subscriber. + + Returns: + Chunk bytes if available, None if no chunk available, or empty bytes for EOF. + """ + async with self.buffer_lock: + position = self.subscriber_positions[subscriber_id] + + # Check if we have a chunk at this position + if position < len(self.chunk_buffer): + # Chunk available in buffer + chunk_data, _ = self.chunk_buffer[position] + + # Move to next position + self.subscriber_positions[subscriber_id] = position + 1 + + # Cleanup old chunks that no one needs + self._cleanup_old_chunks() + return chunk_data + if self.stream_ended: + # Stream ended and we've read all buffered chunks + return b"" + return None + + async def _cleanup_subscriber(self, subscriber_id: UUID) -> None: + """Clean up subscriber and close stream if no subscribers left.""" + async with self.buffer_lock: + if subscriber_id in self.subscriber_positions: + del self.subscriber_positions[subscriber_id] + + # If no subscribers left, close the stream + if not self.subscriber_positions and not self.stream_ended: + self.stream_ended = True + # Close the audio source generator to prevent resource leak + with suppress(Exception): + await self.audio_source.aclose() + + async def get_stream( + self, + output_format: AudioFormat, + filter_params: list[str] | None = None, + ) -> tuple[AsyncGenerator[bytes, None], float]: + """ + Get (client specific encoded) ffmpeg stream. + + Returns: + A tuple of (audio generator, actual position in seconds) + """ + audio_gen, position = await self.subscribe_raw() + + async def _stream_with_ffmpeg() -> AsyncGenerator[bytes, None]: + try: + async for chunk in get_ffmpeg_stream( + audio_input=audio_gen, + input_format=self.audio_format, + output_format=output_format, + filter_params=filter_params, + ): + yield chunk + finally: + # Ensure audio_gen cleanup runs immediately + with suppress(Exception): + await audio_gen.aclose() + + return _stream_with_ffmpeg(), position + + async def _generate(self, subscriber_id: UUID) -> AsyncGenerator[bytes, None]: + """ + Generate audio chunks for a subscriber. + + Yields chunks from the buffer until the stream ends, reading from the source + as needed. Automatically cleans up the subscriber on exit. + """ + try: + # Position already set above atomically with timestamp capture + while True: + # Try to get chunk from buffer + chunk_bytes = await self._get_chunk_from_buffer(subscriber_id) + + # Release lock before yielding to avoid deadlock + if chunk_bytes is not None: + if chunk_bytes == b"": + # End of stream marker + break + yield chunk_bytes + else: + # No chunk available, need to read from source + # Use source_read_lock to ensure only one subscriber reads at a time + async with self.source_read_lock: + # Check again if buffer has grown or stream ended while waiting + check_result = await self._check_buffer(subscriber_id) + if check_result is True: + # Another subscriber already read the chunk + continue + if check_result is False: + # Stream ended while waiting for source lock + break + + # Read next chunk from source (check_result is None) + # Note: This may block if the audio_source does synchronous I/O + await self._read_chunk_from_source() + + finally: + await self._cleanup_subscriber(subscriber_id) + + async def subscribe_raw(self) -> tuple[AsyncGenerator[bytes, None], float]: + """ + Subscribe to the raw/unaltered audio stream. + + Returns: + A tuple of (audio generator, actual position in seconds). + The position indicates where in the stream the first chunk will be from. + + Note: + Callers must properly consume or cancel the returned generator to prevent + resource leaks. + """ + subscriber_id = uuid4() + + # Atomically capture starting position and register subscriber while holding lock + async with self.buffer_lock: + if self.chunk_buffer: + _, starting_position = self.chunk_buffer[0] + # Log buffer time range for debugging + newest_ts = self.chunk_buffer[-1][1] + oldest_relative = starting_position - self.current_position + newest_relative = newest_ts - self.current_position + LOGGER.debug( + "New subscriber joining: buffer contains %.3fs (from %.3fs to %.3fs, " + "current_position=%.3fs)", + newest_ts - starting_position, + oldest_relative, + newest_relative, + self.current_position, + ) + else: + starting_position = self.current_position + LOGGER.debug( + "New subscriber joining: buffer is empty, starting at current_position=%.3fs", + self.current_position, + ) + # Register subscriber at position 0 (start of buffer) + self.subscriber_positions[subscriber_id] = 0 + + # Return generator and starting position in seconds + return self._generate(subscriber_id), starting_position diff --git a/requirements_all.txt b/requirements_all.txt index 6d5563f1..ca49a9f5 100644 --- a/requirements_all.txt +++ b/requirements_all.txt @@ -9,7 +9,7 @@ aiohttp_asyncmdnsresolver==0.1.1 aiohttp-fast-zlib==0.3.0 aiojellyfin==0.14.1 aiomusiccast==0.14.8 -aioresonate==0.11.0 +aioresonate==0.13.1 aiorun==2025.1.1 aioslimproto==3.1.1 aiosonos==0.1.9