Update Resonate provider with `MULTI_DEVICE_DSP` support and other improvements ...
authorMaxim Raznatovski <nda.mr43@gmail.com>
Fri, 14 Nov 2025 15:02:16 +0000 (16:02 +0100)
committerGitHub <noreply@github.com>
Fri, 14 Nov 2025 15:02:16 +0000 (10:02 -0500)
* Update `MediaStream` usage

* Support `MULTI_DEVICE_DSP` for resonate

* Rename `stream` to `channel` in `MediaStream`

* Initial working implementation for per player DSP support

* Refactors, fixes, and comments

* Rename `shared_buffer` to `chunk_buffer`

* Use `UUID` instead of `str`

* Use seconds in `multi_client_stream`

* Move attribute definitions to class level

* Remove dead code

* Refactor `_cleanup_old_chunks`

* Rename `MultiClientStream` to `TimedClientStream`

* Refactor `_read_chunk_from`

* Convert main channel to 16-bit PCM for aioresonate compatibility

* Use separate host and advertise_host

Pass bind_ip for socket binding and publish_ip for mDNS advertising to
correctly handle scenarios where the server binds to all interfaces
(0.0.0.0) but advertises a specific IP address to clients.

* Bump `aioresonate`

* Add safety check to `TimedClientStream` to avoid memory leaks

* Update `MIN_BUFFER_DURATION` comment with findings

* Update music_assistant/providers/resonate/timed_client_stream.py

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
* Clean up `timed_client_stream` reference after playback

* Bump `aioresonate`

* Update call to the now async friendly `set_media_art`

* Update `playback_state` when joining a new group

* Update `group_members` based on `aioresonate` events

Instead of optimistically setting them.

* Move `MusicAssistantMediaStream` outside a method

* Run audio cleanup immediately

* Remove redundant type annotation

* Use `anext()`

* Remove unused async from `_cleanup_old_chunks`

* Simplify method name and clarify lock requirement

* Move `_generate` to class-level method

* Refactor `MusicAssistantMediaStream` parameters

---------

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Paulus Schoutsen <balloob@gmail.com>
music_assistant/providers/resonate/manifest.json
music_assistant/providers/resonate/player.py
music_assistant/providers/resonate/provider.py
music_assistant/providers/resonate/timed_client_stream.py [new file with mode: 0644]
requirements_all.txt

index 57552aa27f857737d985a6f4c6431b2ec98dbcbf..ca71e27621c186dd4abcc029df5c2564946b413f 100644 (file)
@@ -5,5 +5,5 @@
   "name": "Resonate (WIP)",
   "description": "Resonate (working title) is the next generation streaming protocol built by the Open Home Foundation. Follow the development on Discord to see how you can get involved.",
   "codeowners": ["@music-assistant"],
-  "requirements": ["aioresonate==0.11.0"]
+  "requirements": ["aioresonate==0.13.1"]
 }
index 39becc4d876546f2d9f8276948269b06f775a55e..b6e082b5bd97c8a0616995c53aec37c8bb27cdec 100644 (file)
@@ -4,7 +4,7 @@ from __future__ import annotations
 
 import asyncio
 import time
-from collections.abc import Callable
+from collections.abc import AsyncGenerator, Callable
 from io import BytesIO
 from typing import TYPE_CHECKING, cast
 
@@ -49,9 +49,10 @@ from music_assistant.constants import (
     INTERNAL_PCM_FORMAT,
 )
 from music_assistant.helpers.audio import get_player_filter_params
-from music_assistant.helpers.ffmpeg import get_ffmpeg_stream
 from music_assistant.models.player import Player, PlayerMedia
 
+from .timed_client_stream import TimedClientStream
+
 if TYPE_CHECKING:
     from aioresonate.server.client import ResonateClient
     from music_assistant_models.event import MassEvent
@@ -59,6 +60,103 @@ if TYPE_CHECKING:
     from .provider import ResonateProvider
 
 
+class MusicAssistantMediaStream(MediaStream):
+    """MediaStream implementation for Music Assistant with per-player DSP support."""
+
+    player_instance: ResonatePlayer
+    internal_format: AudioFormat
+    output_format: AudioFormat
+
+    def __init__(
+        self,
+        *,
+        main_channel_source: AsyncGenerator[bytes, None],
+        main_channel_format: ResonateAudioFormat,
+        player_instance: ResonatePlayer,
+        internal_format: AudioFormat,
+        output_format: AudioFormat,
+    ) -> None:
+        """
+        Initialise the media stream with audio source and format for main_channel().
+
+        Args:
+            main_channel_source: Audio source generator for the main channel.
+            main_channel_format: Audio format for the main channel (includes codec).
+            player_instance: The ResonatePlayer instance for accessing mass and streams.
+            internal_format: Internal processing format (float32 for headroom).
+            output_format: Output PCM format (16-bit for player output).
+        """
+        super().__init__(
+            main_channel_source=main_channel_source,
+            main_channel_format=main_channel_format,
+        )
+        self.player_instance = player_instance
+        self.internal_format = internal_format
+        self.output_format = output_format
+
+    async def player_channel(
+        self,
+        player_id: str,
+        preferred_format: ResonateAudioFormat | None = None,
+        position_us: int = 0,
+    ) -> tuple[AsyncGenerator[bytes, None], ResonateAudioFormat, int] | None:
+        """
+        Get a player-specific audio stream with per-player DSP.
+
+        Args:
+            player_id: Identifier for the player requesting the stream.
+            preferred_format: The player's preferred native format for the stream.
+                The implementation may return a different format; the library
+                will handle any necessary conversion.
+            position_us: Position in microseconds relative to the main_stream start.
+                Used for late-joining players to sync with the main stream.
+
+        Returns:
+            A tuple of (audio generator, audio format, actual position in microseconds)
+            or None if unavailable. If None, the main_stream is used as fallback.
+        """
+        mass = self.player_instance.mass
+        multi_client_stream = self.player_instance.timed_client_stream
+        assert multi_client_stream is not None
+
+        dsp = mass.config.get_player_dsp_config(player_id)
+        if not dsp.enabled:
+            # DSP is disabled for this player, use main_stream
+            return None
+
+        # Get per-player DSP filter parameters
+        # Convert from internal format to output format
+        filter_params = get_player_filter_params(
+            mass, player_id, self.internal_format, self.output_format
+        )
+
+        # Get the stream with position (in seconds)
+        stream_gen, actual_position = await multi_client_stream.get_stream(
+            output_format=self.output_format,
+            filter_params=filter_params,
+        )
+
+        # Convert position from seconds to microseconds for aioresonate API
+        actual_position_us = int(actual_position * 1_000_000)
+
+        # Return actual position in microseconds relative to main_stream start
+        self.player_instance.logger.debug(
+            "Providing channel stream for player %s at position %d us",
+            player_id,
+            actual_position_us,
+        )
+        return (
+            stream_gen,
+            ResonateAudioFormat(
+                sample_rate=self.output_format.sample_rate,
+                bit_depth=self.output_format.bit_depth,
+                channels=self.output_format.channels,
+                codec=self._main_channel_format.codec,
+            ),
+            actual_position_us,
+        )
+
+
 class ResonatePlayer(Player):
     """A resonate audio player in Music Assistant."""
 
@@ -67,6 +165,7 @@ class ResonatePlayer(Player):
     unsub_group_event_cb: Callable[[], None]
     last_sent_artwork_url: str | None = None
     _playback_task: asyncio.Task[None] | None = None
+    timed_client_stream: TimedClientStream | None = None
 
     def __init__(self, provider: ResonateProvider, player_id: str) -> None:
         """Initialize the Player."""
@@ -84,6 +183,7 @@ class ResonatePlayer(Player):
         self._attr_type = PlayerType.PLAYER
         self._attr_supported_features = {
             PlayerFeature.SET_MEMBERS,
+            PlayerFeature.MULTI_DEVICE_DSP,
         }
         self._attr_can_group_with = {provider.lookup_key}
         self._attr_power_control = PLAYER_CONTROL_NONE
@@ -110,6 +210,15 @@ class ResonatePlayer(Player):
             case ClientGroupChangedEvent(new_group=new_group):
                 self.unsub_group_event_cb()
                 self.unsub_group_event_cb = new_group.add_event_listener(self.group_event_cb)
+                # Sync playback state from the new group
+                match new_group.state:
+                    case PlaybackStateType.PLAYING:
+                        self._attr_playback_state = PlaybackState.PLAYING
+                    case PlaybackStateType.PAUSED:
+                        self._attr_playback_state = PlaybackState.PAUSED
+                    case PlaybackStateType.STOPPED:
+                        self._attr_playback_state = PlaybackState.IDLE
+                self.update_state()
 
     async def group_event_cb(self, event: GroupEvent) -> None:
         """Event callback registered to the resonate group this player belongs to."""
@@ -155,10 +264,16 @@ class ResonatePlayer(Player):
                         self._attr_elapsed_time = 0
                         self._attr_elapsed_time_last_updated = time.time()
                 self.update_state()
-            case GroupMemberAddedEvent(client_id=_):
-                pass
-            case GroupMemberRemovedEvent(client_id=_):
-                pass
+            case GroupMemberAddedEvent(client_id=client_id):
+                self.logger.debug("Group member added: %s", client_id)
+                if client_id not in self._attr_group_members:
+                    self._attr_group_members.append(client_id)
+                    self.update_state()
+            case GroupMemberRemovedEvent(client_id=client_id):
+                self.logger.debug("Group member removed: %s", client_id)
+                if client_id in self._attr_group_members:
+                    self._attr_group_members.remove(client_id)
+                    self.update_state()
             case GroupDeletedEvent():
                 pass
 
@@ -224,25 +339,35 @@ class ResonatePlayer(Player):
             # Convert string codec to AudioCodec enum
             audio_codec = AudioCodec(output_codec)
 
-            # Apply DSP and other audio filters
-            audio_source = get_ffmpeg_stream(
-                audio_input=self.mass.streams.get_stream(media, flow_pcm_format),
-                input_format=flow_pcm_format,
-                output_format=pcm_format,
-                filter_params=get_player_filter_params(
-                    self.mass, self.player_id, flow_pcm_format, pcm_format
-                ),
+            # Get clean audio source in flow format (high quality internal format)
+            # Format conversion and per-player DSP will be applied via player_channel
+            audio_source = self.mass.streams.get_stream(media, flow_pcm_format)
+
+            # Create TimedClientStream to wrap the clean audio source
+            # This distributes the audio to multiple subscribers without DSP
+            self.timed_client_stream = TimedClientStream(
+                audio_source=audio_source,
+                audio_format=flow_pcm_format,
             )
 
-            # Create MediaStream wrapping the audio source generator
-            media_stream = MediaStream(
-                source=audio_source,
-                audio_format=ResonateAudioFormat(
+            # Setup the main channel subscription
+            # aioresonate only really supports 16-bit for now TODO: upgrade later to 32-bit
+            main_channel_gen, main_position = await self.timed_client_stream.get_stream(
+                output_format=pcm_format,
+                filter_params=None,  # TODO: this should probably still include the safety limiter
+            )
+            assert main_position == 0.0  # first subscriber, should be zero
+            media_stream = MusicAssistantMediaStream(
+                main_channel_source=main_channel_gen,
+                main_channel_format=ResonateAudioFormat(
                     sample_rate=pcm_format.sample_rate,
                     bit_depth=pcm_format.bit_depth,
                     channels=pcm_format.channels,
                     codec=audio_codec,
                 ),
+                player_instance=self,
+                internal_format=flow_pcm_format,
+                output_format=pcm_format,
             )
 
             stop_time = await self.api.group.play_media(media_stream)
@@ -253,6 +378,8 @@ class ResonatePlayer(Player):
         except Exception:
             self.logger.exception("Error during playback for player %s", self.display_name)
             raise
+        finally:
+            self.timed_client_stream = None
 
     async def set_members(
         self,
@@ -268,18 +395,12 @@ class ResonatePlayer(Player):
             player = cast("ResonatePlayer", player)  # For type checking
             await self.api.group.remove_client(player.api)
             player.api.disconnect_behaviour = DisconnectBehaviour.STOP
-            self._attr_group_members.remove(player_id)
         for player_id in player_ids_to_add or []:
             player = self.mass.players.get(player_id, True)
             player = cast("ResonatePlayer", player)  # For type checking
             player.api.disconnect_behaviour = DisconnectBehaviour.UNGROUP
             await self.api.group.add_client(player.api)
-            self._attr_group_members.append(player_id)
-        self.update_state()
-
-    def _update_media_art(self, image_data: bytes) -> None:
-        image = Image.open(BytesIO(image_data))
-        self.api.group.set_media_art(image)
+        # self.group_members will be updated by the group event callback
 
     async def _on_queue_update(self, event: MassEvent) -> None:
         """Extract and send current media metadata to resonate players on queue updates."""
@@ -328,7 +449,8 @@ class ResonatePlayer(Player):
                     current_item.media_item
                 )
                 if image_data is not None:
-                    await asyncio.to_thread(self._update_media_art, image_data)
+                    image = await asyncio.to_thread(Image.open, BytesIO(image_data))
+                    await self.api.group.set_media_art(image)
             # TODO: null media art if not set?
 
         track_duration = current_item.duration
index d1367acbc3c5ad9a611a2106f11c2f8b0464b261..7e934fa4f3328bcccd5b7dd162a3affe60abd692 100644 (file)
@@ -66,7 +66,9 @@ class ResonateProvider(PlayerProvider):
         # Start server for handling incoming Resonate connections from clients
         # and mDNS discovery of new clients
         await self.server_api.start_server(
-            port=8927, host=cast("str", self.mass.streams.publish_ip)
+            port=8927,
+            host=self.mass.streams.bind_ip,
+            advertise_host=cast("str", self.mass.streams.publish_ip),
         )
 
     async def unload(self, is_removed: bool = False) -> None:
diff --git a/music_assistant/providers/resonate/timed_client_stream.py b/music_assistant/providers/resonate/timed_client_stream.py
new file mode 100644 (file)
index 0000000..c09b9ca
--- /dev/null
@@ -0,0 +1,316 @@
+"""
+Timestamped multi-client audio stream for position-aware playback.
+
+This module provides a multi-client streaming implementation optimized for
+aioresonate's synchronized multi-room audio playback. Each audio chunk is
+timestamped, allowing late-joining players to start at the correct position
+for synchronized playback across multiple devices.
+"""
+
+import asyncio
+import logging
+from collections import deque
+from collections.abc import AsyncGenerator
+from contextlib import suppress
+from uuid import UUID, uuid4
+
+from music_assistant_models.media_items import AudioFormat
+
+from music_assistant.helpers.ffmpeg import get_ffmpeg_stream
+
+LOGGER = logging.getLogger(__name__)
+
+# Minimum/target buffer retention time in seconds
+# This 10s buffer is currently required since:
+# - aioresonate currently uses a fixed 5s buffer to allow up to ~4s of network interruption
+# - ~2s allows for ffmpeg processing time and some margin
+# - ~3s are currently needed internally by aioresonate for initial buffering
+MIN_BUFFER_DURATION = 10.0
+# Maximum buffer duration before raising an error (safety mechanism)
+MAX_BUFFER_DURATION = MIN_BUFFER_DURATION + 5.0
+
+
+class TimedClientStream:
+    """Multi-client audio stream with timestamped chunks for synchronized playback."""
+
+    audio_source: AsyncGenerator[bytes, None]
+    """The source audio stream to read from."""
+    audio_format: AudioFormat
+    """The audio format of the source stream."""
+    chunk_buffer: deque[tuple[bytes, float]]
+    """Buffer storing chunks with their timestamps in seconds (chunk_data, timestamp_seconds)."""
+    subscriber_positions: dict[UUID, int]
+    """Subscriber positions: maps subscriber_id to position (index into chunk_buffer)."""
+    buffer_lock: asyncio.Lock
+    """Lock for buffer and shared state access."""
+    source_read_lock: asyncio.Lock
+    """Lock to serialize audio source reads."""
+    stream_ended: bool = False
+    """Track if stream has ended."""
+    current_position: float = 0.0
+    """Current position in seconds (from stream start)."""
+
+    def __init__(
+        self,
+        audio_source: AsyncGenerator[bytes, None],
+        audio_format: AudioFormat,
+    ) -> None:
+        """Initialize TimedClientStream."""
+        self.audio_source = audio_source
+        self.audio_format = audio_format
+        self.chunk_buffer = deque()
+        self.subscriber_positions = {}
+        self.buffer_lock = asyncio.Lock()
+        self.source_read_lock = asyncio.Lock()
+
+    def _get_bytes_per_second(self) -> int:
+        """Get bytes per second for the audio format."""
+        return (
+            self.audio_format.sample_rate
+            * self.audio_format.channels
+            * (self.audio_format.bit_depth // 8)
+        )
+
+    def _bytes_to_seconds(self, num_bytes: int) -> float:
+        """Convert bytes to seconds based on audio format."""
+        bytes_per_second = self._get_bytes_per_second()
+        if bytes_per_second == 0:
+            return 0.0
+        return num_bytes / bytes_per_second
+
+    def _get_buffer_duration(self) -> float:
+        """Calculate total duration of buffered chunks in seconds."""
+        if not self.chunk_buffer:
+            return 0.0
+        # Duration is from first chunk timestamp to current position
+        first_chunk_timestamp = self.chunk_buffer[0][1]
+        return self.current_position - first_chunk_timestamp
+
+    def _cleanup_old_chunks(self) -> None:
+        """Remove old chunks when all subscribers read them and min duration exceeded."""
+        # Find the oldest position still needed by any subscriber
+        if self.subscriber_positions:
+            min_position = min(self.subscriber_positions.values())
+        else:
+            min_position = len(self.chunk_buffer)
+
+        # Calculate target oldest timestamp
+        # This ensures buffer contains at least MIN_BUFFER_DURATION seconds of recent data
+        target_oldest = self.current_position - MIN_BUFFER_DURATION
+
+        # Remove old chunks that meet both conditions:
+        # 1. Before min_position (no subscriber needs them)
+        # 2. Older than target_oldest (outside minimum retention window)
+        chunks_removed = 0
+        while chunks_removed < min_position and self.chunk_buffer:
+            _chunk_bytes, chunk_timestamp = self.chunk_buffer[0]
+            if chunk_timestamp < target_oldest:
+                self.chunk_buffer.popleft()
+                chunks_removed += 1
+            else:
+                # Stop when we reach chunks we want to keep
+                break
+
+        # Adjust all subscriber positions to account for removed chunks
+        for sub_id in self.subscriber_positions:
+            self.subscriber_positions[sub_id] -= chunks_removed
+
+    async def _read_chunk_from_source(self) -> None:
+        """Read next chunk from audio source and add to buffer."""
+        try:
+            chunk = await anext(self.audio_source)
+            async with self.buffer_lock:
+                # Calculate timestamp for this chunk
+                chunk_timestamp = self.current_position
+                chunk_duration = self._bytes_to_seconds(len(chunk))
+
+                # Append chunk with its timestamp
+                self.chunk_buffer.append((chunk, chunk_timestamp))
+
+                # Update current position
+                self.current_position += chunk_duration
+
+                # Safety check: ensure buffer doesn't grow unbounded
+                if self._get_buffer_duration() > MAX_BUFFER_DURATION:
+                    msg = f"Buffer exceeded maximum duration ({MAX_BUFFER_DURATION}s)"
+                    raise RuntimeError(msg)
+        except StopAsyncIteration:
+            # Source exhausted, add EOF marker
+            async with self.buffer_lock:
+                self.chunk_buffer.append((b"", self.current_position))
+                self.stream_ended = True
+        except Exception:
+            # Source errored or was canceled, mark stream as ended
+            async with self.buffer_lock:
+                self.stream_ended = True
+            raise
+
+    async def _check_buffer(self, subscriber_id: UUID) -> bool | None:
+        """
+        Check if buffer has grown or stream ended.
+
+        REQUIRES: Caller must hold self.source_read_lock before calling.
+
+        Returns:
+            True if should continue reading loop (chunk found in buffer),
+            False if should break (stream ended),
+            None if should proceed to read from source.
+        """
+        async with self.buffer_lock:
+            position = self.subscriber_positions[subscriber_id]
+            if position < len(self.chunk_buffer):
+                # Another subscriber already read the chunk
+                return True
+            if self.stream_ended:
+                # Stream ended while waiting for source lock
+                return False
+        return None  # Continue to read from source
+
+    async def _get_chunk_from_buffer(self, subscriber_id: UUID) -> bytes | None:
+        """
+        Get next chunk from buffer for subscriber.
+
+        Returns:
+            Chunk bytes if available, None if no chunk available, or empty bytes for EOF.
+        """
+        async with self.buffer_lock:
+            position = self.subscriber_positions[subscriber_id]
+
+            # Check if we have a chunk at this position
+            if position < len(self.chunk_buffer):
+                # Chunk available in buffer
+                chunk_data, _ = self.chunk_buffer[position]
+
+                # Move to next position
+                self.subscriber_positions[subscriber_id] = position + 1
+
+                # Cleanup old chunks that no one needs
+                self._cleanup_old_chunks()
+                return chunk_data
+            if self.stream_ended:
+                # Stream ended and we've read all buffered chunks
+                return b""
+        return None
+
+    async def _cleanup_subscriber(self, subscriber_id: UUID) -> None:
+        """Clean up subscriber and close stream if no subscribers left."""
+        async with self.buffer_lock:
+            if subscriber_id in self.subscriber_positions:
+                del self.subscriber_positions[subscriber_id]
+
+            # If no subscribers left, close the stream
+            if not self.subscriber_positions and not self.stream_ended:
+                self.stream_ended = True
+                # Close the audio source generator to prevent resource leak
+                with suppress(Exception):
+                    await self.audio_source.aclose()
+
+    async def get_stream(
+        self,
+        output_format: AudioFormat,
+        filter_params: list[str] | None = None,
+    ) -> tuple[AsyncGenerator[bytes, None], float]:
+        """
+        Get (client specific encoded) ffmpeg stream.
+
+        Returns:
+            A tuple of (audio generator, actual position in seconds)
+        """
+        audio_gen, position = await self.subscribe_raw()
+
+        async def _stream_with_ffmpeg() -> AsyncGenerator[bytes, None]:
+            try:
+                async for chunk in get_ffmpeg_stream(
+                    audio_input=audio_gen,
+                    input_format=self.audio_format,
+                    output_format=output_format,
+                    filter_params=filter_params,
+                ):
+                    yield chunk
+            finally:
+                # Ensure audio_gen cleanup runs immediately
+                with suppress(Exception):
+                    await audio_gen.aclose()
+
+        return _stream_with_ffmpeg(), position
+
+    async def _generate(self, subscriber_id: UUID) -> AsyncGenerator[bytes, None]:
+        """
+        Generate audio chunks for a subscriber.
+
+        Yields chunks from the buffer until the stream ends, reading from the source
+        as needed. Automatically cleans up the subscriber on exit.
+        """
+        try:
+            # Position already set above atomically with timestamp capture
+            while True:
+                # Try to get chunk from buffer
+                chunk_bytes = await self._get_chunk_from_buffer(subscriber_id)
+
+                # Release lock before yielding to avoid deadlock
+                if chunk_bytes is not None:
+                    if chunk_bytes == b"":
+                        # End of stream marker
+                        break
+                    yield chunk_bytes
+                else:
+                    # No chunk available, need to read from source
+                    # Use source_read_lock to ensure only one subscriber reads at a time
+                    async with self.source_read_lock:
+                        # Check again if buffer has grown or stream ended while waiting
+                        check_result = await self._check_buffer(subscriber_id)
+                        if check_result is True:
+                            # Another subscriber already read the chunk
+                            continue
+                        if check_result is False:
+                            # Stream ended while waiting for source lock
+                            break
+
+                        # Read next chunk from source (check_result is None)
+                        # Note: This may block if the audio_source does synchronous I/O
+                        await self._read_chunk_from_source()
+
+        finally:
+            await self._cleanup_subscriber(subscriber_id)
+
+    async def subscribe_raw(self) -> tuple[AsyncGenerator[bytes, None], float]:
+        """
+        Subscribe to the raw/unaltered audio stream.
+
+        Returns:
+            A tuple of (audio generator, actual position in seconds).
+            The position indicates where in the stream the first chunk will be from.
+
+        Note:
+            Callers must properly consume or cancel the returned generator to prevent
+            resource leaks.
+        """
+        subscriber_id = uuid4()
+
+        # Atomically capture starting position and register subscriber while holding lock
+        async with self.buffer_lock:
+            if self.chunk_buffer:
+                _, starting_position = self.chunk_buffer[0]
+                # Log buffer time range for debugging
+                newest_ts = self.chunk_buffer[-1][1]
+                oldest_relative = starting_position - self.current_position
+                newest_relative = newest_ts - self.current_position
+                LOGGER.debug(
+                    "New subscriber joining: buffer contains %.3fs (from %.3fs to %.3fs, "
+                    "current_position=%.3fs)",
+                    newest_ts - starting_position,
+                    oldest_relative,
+                    newest_relative,
+                    self.current_position,
+                )
+            else:
+                starting_position = self.current_position
+                LOGGER.debug(
+                    "New subscriber joining: buffer is empty, starting at current_position=%.3fs",
+                    self.current_position,
+                )
+            # Register subscriber at position 0 (start of buffer)
+            self.subscriber_positions[subscriber_id] = 0
+
+        # Return generator and starting position in seconds
+        return self._generate(subscriber_id), starting_position
index 6d5563f18dc36f34675b75bc99224c82dffdde48..ca49a9f593f8246ece31d596b6994327d62eef2d 100644 (file)
@@ -9,7 +9,7 @@ aiohttp_asyncmdnsresolver==0.1.1
 aiohttp-fast-zlib==0.3.0
 aiojellyfin==0.14.1
 aiomusiccast==0.14.8
-aioresonate==0.11.0
+aioresonate==0.13.1
 aiorun==2025.1.1
 aioslimproto==3.1.1
 aiosonos==0.1.9