import asyncio
import time
+from collections import deque
from collections.abc import AsyncGenerator
from contextlib import suppress
from typing import TYPE_CHECKING
self.wait_start: float = 0.0
self.seconds_streamed: float = 0
self._first_chunk_received = asyncio.Event()
+ # Ring buffer for late joiners: stores (chunk_data, seconds_offset) tuples
+ # Chunks from streams controller are ~1 second each (pcm_sample_size bytes)
+ # Keep 8 seconds of buffer for late joiners (maxlen=10 for safety with variable sizes)
+ self._chunk_buffer: deque[tuple[bytes, float]] = deque(maxlen=10)
async def start(self, audio_source: AsyncGenerator[bytes, None]) -> None:
"""Initialize stream session for all players."""
"""Add a sync client to the session as a late joiner.
The late joiner will:
- 1. Start playing at a compensated NTP timestamp (start_ntp + offset)
- 2. Receive silence calculated dynamically based on how much audio has been sent
- 3. Then receive real audio chunks in sync with other players
+ 1. Start with NTP timestamp accounting for buffered chunks we'll send
+ 2. Receive buffered chunks immediately to prime the ffmpeg/CLI pipeline
+ 3. Join the real-time stream in perfect sync with other players
"""
sync_leader = self.sync_clients[0]
if not sync_leader.stream or not sync_leader.stream.running:
return
async with self._lock:
- skip_seconds = self.seconds_streamed
- start_at = self.start_time + skip_seconds
+ # Get all buffered chunks to send
+ buffered_chunks = list(self._chunk_buffer)
+
+ if buffered_chunks:
+ # Calculate how much buffer we're sending
+ first_chunk_position = buffered_chunks[0][1]
+ buffer_duration = self.seconds_streamed - first_chunk_position
+
+ # Set start NTP to account for the buffer we're about to send
+ # Device will start at (current_position - buffer_duration) and catch up
+ start_at = self.start_time + (self.seconds_streamed - buffer_duration)
+
+ self.prov.logger.debug(
+ "Late joiner %s: sending %.2fs of buffered audio, start at %.2fs",
+ airplay_player.player_id,
+ buffer_duration,
+ self.seconds_streamed - buffer_duration,
+ )
+ else:
+ # No buffer available, start from current position
+ start_at = self.start_time + self.seconds_streamed
+ self.prov.logger.debug(
+ "Late joiner %s: no buffered chunks available, starting at %.2fs",
+ airplay_player.player_id,
+ self.seconds_streamed,
+ )
+
start_ntp = unix_time_to_ntp(start_at)
+
if airplay_player not in self.sync_clients:
self.sync_clients.append(airplay_player)
if airplay_player.stream:
await airplay_player.stream.wait_for_connection()
+ # Feed buffered chunks INSIDE the lock to prevent race conditions
+ # This ensures we don't send a new real-time chunk while feeding the buffer
+ if buffered_chunks:
+ await self._feed_buffered_chunks(airplay_player, buffered_chunks)
+
async def _audio_streamer(self, audio_source: AsyncGenerator[bytes, None]) -> None:
"""Stream audio to all players."""
pcm_sample_size = self.pcm_format.pcm_sample_size
if not sync_clients:
return
+ # Add chunk to ring buffer for late joiners (before seconds_streamed is updated)
+ chunk_position = self.seconds_streamed
+ self._chunk_buffer.append((chunk, chunk_position))
+
# Write chunk to all players
write_tasks = [self._write_chunk_to_player(x, chunk) for x in sync_clients if x.stream]
results = await asyncio.gather(*write_tasks, return_exceptions=True)
return
await asyncio.wait_for(ffmpeg.write(chunk), timeout=35.0)
+ async def _feed_buffered_chunks(
+ self,
+ airplay_player: AirPlayPlayer,
+ buffered_chunks: list[tuple[bytes, float]],
+ ) -> None:
+ """Feed buffered chunks to a late joiner to prime the ffmpeg pipeline.
+
+ :param airplay_player: The late joiner player.
+ :param buffered_chunks: List of (chunk_data, position) tuples to send.
+ """
+ try:
+ for chunk, _position in buffered_chunks:
+ await self._write_chunk_to_player(airplay_player, chunk)
+ except Exception as err:
+ self.prov.logger.warning(
+ "Failed to feed buffered chunks to late joiner %s: %s",
+ airplay_player.player_id,
+ err,
+ )
+ # Remove the client if feeding buffered chunks fails
+ self.mass.create_task(self.remove_client(airplay_player))
+
async def _write_eof_to_player(self, airplay_player: AirPlayPlayer) -> None:
"""Write EOF to a specific player."""
if ffmpeg := self._player_ffmpeg.pop(airplay_player.player_id, None):