From e9bd7854ea7114b5450e335a1a1e17a6d11c972e Mon Sep 17 00:00:00 2001 From: Marcel van der Veldt Date: Tue, 17 Feb 2026 14:34:46 +0100 Subject: [PATCH] Improve AirPlay late joiner support --- .../providers/airplay/stream_session.py | 72 +++++++++++++++++-- 1 file changed, 67 insertions(+), 5 deletions(-) diff --git a/music_assistant/providers/airplay/stream_session.py b/music_assistant/providers/airplay/stream_session.py index 830d2448..e7984d32 100644 --- a/music_assistant/providers/airplay/stream_session.py +++ b/music_assistant/providers/airplay/stream_session.py @@ -4,6 +4,7 @@ from __future__ import annotations import asyncio import time +from collections import deque from collections.abc import AsyncGenerator from contextlib import suppress from typing import TYPE_CHECKING @@ -60,6 +61,10 @@ class AirPlayStreamSession: self.wait_start: float = 0.0 self.seconds_streamed: float = 0 self._first_chunk_received = asyncio.Event() + # Ring buffer for late joiners: stores (chunk_data, seconds_offset) tuples + # Chunks from streams controller are ~1 second each (pcm_sample_size bytes) + # Keep 8 seconds of buffer for late joiners (maxlen=10 for safety with variable sizes) + self._chunk_buffer: deque[tuple[bytes, float]] = deque(maxlen=10) async def start(self, audio_source: AsyncGenerator[bytes, None]) -> None: """Initialize stream session for all players.""" @@ -124,9 +129,9 @@ class AirPlayStreamSession: """Add a sync client to the session as a late joiner. The late joiner will: - 1. Start playing at a compensated NTP timestamp (start_ntp + offset) - 2. Receive silence calculated dynamically based on how much audio has been sent - 3. Then receive real audio chunks in sync with other players + 1. Start with NTP timestamp accounting for buffered chunks we'll send + 2. Receive buffered chunks immediately to prime the ffmpeg/CLI pipeline + 3. Join the real-time stream in perfect sync with other players """ sync_leader = self.sync_clients[0] if not sync_leader.stream or not sync_leader.stream.running: @@ -146,9 +151,35 @@ class AirPlayStreamSession: return async with self._lock: - skip_seconds = self.seconds_streamed - start_at = self.start_time + skip_seconds + # Get all buffered chunks to send + buffered_chunks = list(self._chunk_buffer) + + if buffered_chunks: + # Calculate how much buffer we're sending + first_chunk_position = buffered_chunks[0][1] + buffer_duration = self.seconds_streamed - first_chunk_position + + # Set start NTP to account for the buffer we're about to send + # Device will start at (current_position - buffer_duration) and catch up + start_at = self.start_time + (self.seconds_streamed - buffer_duration) + + self.prov.logger.debug( + "Late joiner %s: sending %.2fs of buffered audio, start at %.2fs", + airplay_player.player_id, + buffer_duration, + self.seconds_streamed - buffer_duration, + ) + else: + # No buffer available, start from current position + start_at = self.start_time + self.seconds_streamed + self.prov.logger.debug( + "Late joiner %s: no buffered chunks available, starting at %.2fs", + airplay_player.player_id, + self.seconds_streamed, + ) + start_ntp = unix_time_to_ntp(start_at) + if airplay_player not in self.sync_clients: self.sync_clients.append(airplay_player) @@ -156,6 +187,11 @@ class AirPlayStreamSession: if airplay_player.stream: await airplay_player.stream.wait_for_connection() + # Feed buffered chunks INSIDE the lock to prevent race conditions + # This ensures we don't send a new real-time chunk while feeding the buffer + if buffered_chunks: + await self._feed_buffered_chunks(airplay_player, buffered_chunks) + async def _audio_streamer(self, audio_source: AsyncGenerator[bytes, None]) -> None: """Stream audio to all players.""" pcm_sample_size = self.pcm_format.pcm_sample_size @@ -217,6 +253,10 @@ class AirPlayStreamSession: if not sync_clients: return + # Add chunk to ring buffer for late joiners (before seconds_streamed is updated) + chunk_position = self.seconds_streamed + self._chunk_buffer.append((chunk, chunk_position)) + # Write chunk to all players write_tasks = [self._write_chunk_to_player(x, chunk) for x in sync_clients if x.stream] results = await asyncio.gather(*write_tasks, return_exceptions=True) @@ -253,6 +293,28 @@ class AirPlayStreamSession: return await asyncio.wait_for(ffmpeg.write(chunk), timeout=35.0) + async def _feed_buffered_chunks( + self, + airplay_player: AirPlayPlayer, + buffered_chunks: list[tuple[bytes, float]], + ) -> None: + """Feed buffered chunks to a late joiner to prime the ffmpeg pipeline. + + :param airplay_player: The late joiner player. + :param buffered_chunks: List of (chunk_data, position) tuples to send. + """ + try: + for chunk, _position in buffered_chunks: + await self._write_chunk_to_player(airplay_player, chunk) + except Exception as err: + self.prov.logger.warning( + "Failed to feed buffered chunks to late joiner %s: %s", + airplay_player.player_id, + err, + ) + # Remove the client if feeding buffered chunks fails + self.mass.create_task(self.remove_client(airplay_player)) + async def _write_eof_to_player(self, airplay_player: AirPlayPlayer) -> None: """Write EOF to a specific player.""" if ffmpeg := self._player_ffmpeg.pop(airplay_player.player_id, None): -- 2.34.1