Improve AirPlay late joiner support
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Tue, 17 Feb 2026 13:34:46 +0000 (14:34 +0100)
committerMarcel van der Veldt <m.vanderveldt@outlook.com>
Tue, 17 Feb 2026 13:34:46 +0000 (14:34 +0100)
music_assistant/providers/airplay/stream_session.py

index 830d2448f32bd2f2a292cf631088c514eb1f9273..e7984d3291a60e6fee6ce92cfac91f67f43a3fb6 100644 (file)
@@ -4,6 +4,7 @@ from __future__ import annotations
 
 import asyncio
 import time
+from collections import deque
 from collections.abc import AsyncGenerator
 from contextlib import suppress
 from typing import TYPE_CHECKING
@@ -60,6 +61,10 @@ class AirPlayStreamSession:
         self.wait_start: float = 0.0
         self.seconds_streamed: float = 0
         self._first_chunk_received = asyncio.Event()
+        # Ring buffer for late joiners: stores (chunk_data, seconds_offset) tuples
+        # Chunks from streams controller are ~1 second each (pcm_sample_size bytes)
+        # Keep 8 seconds of buffer for late joiners (maxlen=10 for safety with variable sizes)
+        self._chunk_buffer: deque[tuple[bytes, float]] = deque(maxlen=10)
 
     async def start(self, audio_source: AsyncGenerator[bytes, None]) -> None:
         """Initialize stream session for all players."""
@@ -124,9 +129,9 @@ class AirPlayStreamSession:
         """Add a sync client to the session as a late joiner.
 
         The late joiner will:
-        1. Start playing at a compensated NTP timestamp (start_ntp + offset)
-        2. Receive silence calculated dynamically based on how much audio has been sent
-        3. Then receive real audio chunks in sync with other players
+        1. Start with NTP timestamp accounting for buffered chunks we'll send
+        2. Receive buffered chunks immediately to prime the ffmpeg/CLI pipeline
+        3. Join the real-time stream in perfect sync with other players
         """
         sync_leader = self.sync_clients[0]
         if not sync_leader.stream or not sync_leader.stream.running:
@@ -146,9 +151,35 @@ class AirPlayStreamSession:
             return
 
         async with self._lock:
-            skip_seconds = self.seconds_streamed
-            start_at = self.start_time + skip_seconds
+            # Get all buffered chunks to send
+            buffered_chunks = list(self._chunk_buffer)
+
+            if buffered_chunks:
+                # Calculate how much buffer we're sending
+                first_chunk_position = buffered_chunks[0][1]
+                buffer_duration = self.seconds_streamed - first_chunk_position
+
+                # Set start NTP to account for the buffer we're about to send
+                # Device will start at (current_position - buffer_duration) and catch up
+                start_at = self.start_time + (self.seconds_streamed - buffer_duration)
+
+                self.prov.logger.debug(
+                    "Late joiner %s: sending %.2fs of buffered audio, start at %.2fs",
+                    airplay_player.player_id,
+                    buffer_duration,
+                    self.seconds_streamed - buffer_duration,
+                )
+            else:
+                # No buffer available, start from current position
+                start_at = self.start_time + self.seconds_streamed
+                self.prov.logger.debug(
+                    "Late joiner %s: no buffered chunks available, starting at %.2fs",
+                    airplay_player.player_id,
+                    self.seconds_streamed,
+                )
+
             start_ntp = unix_time_to_ntp(start_at)
+
             if airplay_player not in self.sync_clients:
                 self.sync_clients.append(airplay_player)
 
@@ -156,6 +187,11 @@ class AirPlayStreamSession:
             if airplay_player.stream:
                 await airplay_player.stream.wait_for_connection()
 
+            # Feed buffered chunks INSIDE the lock to prevent race conditions
+            # This ensures we don't send a new real-time chunk while feeding the buffer
+            if buffered_chunks:
+                await self._feed_buffered_chunks(airplay_player, buffered_chunks)
+
     async def _audio_streamer(self, audio_source: AsyncGenerator[bytes, None]) -> None:
         """Stream audio to all players."""
         pcm_sample_size = self.pcm_format.pcm_sample_size
@@ -217,6 +253,10 @@ class AirPlayStreamSession:
             if not sync_clients:
                 return
 
+            # Add chunk to ring buffer for late joiners (before seconds_streamed is updated)
+            chunk_position = self.seconds_streamed
+            self._chunk_buffer.append((chunk, chunk_position))
+
             # Write chunk to all players
             write_tasks = [self._write_chunk_to_player(x, chunk) for x in sync_clients if x.stream]
             results = await asyncio.gather(*write_tasks, return_exceptions=True)
@@ -253,6 +293,28 @@ class AirPlayStreamSession:
                 return
             await asyncio.wait_for(ffmpeg.write(chunk), timeout=35.0)
 
+    async def _feed_buffered_chunks(
+        self,
+        airplay_player: AirPlayPlayer,
+        buffered_chunks: list[tuple[bytes, float]],
+    ) -> None:
+        """Feed buffered chunks to a late joiner to prime the ffmpeg pipeline.
+
+        :param airplay_player: The late joiner player.
+        :param buffered_chunks: List of (chunk_data, position) tuples to send.
+        """
+        try:
+            for chunk, _position in buffered_chunks:
+                await self._write_chunk_to_player(airplay_player, chunk)
+        except Exception as err:
+            self.prov.logger.warning(
+                "Failed to feed buffered chunks to late joiner %s: %s",
+                airplay_player.player_id,
+                err,
+            )
+            # Remove the client if feeding buffered chunks fails
+            self.mass.create_task(self.remove_client(airplay_player))
+
     async def _write_eof_to_player(self, airplay_player: AirPlayPlayer) -> None:
         """Write EOF to a specific player."""
         if ffmpeg := self._player_ffmpeg.pop(airplay_player.player_id, None):