return
async with self._lock:
- # Get all buffered chunks to send
- buffered_chunks = list(self._chunk_buffer)
+ # Get buffered chunks to send, but limit to ~5 seconds to avoid
+ # blocking real-time streaming to other players (causes packet loss)
+ max_late_join_buffer_seconds = 5.0
+ all_buffered = list(self._chunk_buffer)
+
+ # Filter to only include chunks within the time limit
+ if all_buffered:
+ min_position = self.seconds_streamed - max_late_join_buffer_seconds
+ buffered_chunks = [
+ (chunk, pos) for chunk, pos in all_buffered if pos >= min_position
+ ]
+ else:
+ buffered_chunks = []
if buffered_chunks:
# Calculate how much buffer we're sending
if not self.sync_clients:
break
- await self._write_chunk_to_all_players(chunk)
+ has_running_clients = await self._write_chunk_to_all_players(chunk)
+ if not has_running_clients:
+ self.prov.logger.debug("No running clients remaining, stopping audio streamer")
+ break
self.seconds_streamed += len(chunk) / pcm_sample_size
finally:
if not watchdog_task.done():
silence_duration = 0.1
silence_bytes = int(pcm_sample_size * silence_duration)
silence_chunk = bytes(silence_bytes)
- await self._write_chunk_to_all_players(silence_chunk)
+ has_running_clients = await self._write_chunk_to_all_players(silence_chunk)
+ if not has_running_clients:
+ break
self.seconds_streamed += silence_duration
silence_inserted += silence_duration
await asyncio.sleep(0.05)
silence_inserted,
)
- async def _write_chunk_to_all_players(self, chunk: bytes) -> None:
- """Write a chunk to all connected players."""
+ async def _write_chunk_to_all_players(self, chunk: bytes) -> bool:
+ """Write a chunk to all connected players.
+
+ :return: True if there are still running clients, False otherwise.
+ """
async with self._lock:
sync_clients = [x for x in self.sync_clients if x.stream and x.stream.running]
if not sync_clients:
- return
+ return False
# Add chunk to ring buffer for late joiners (before seconds_streamed is updated)
chunk_position = self.seconds_streamed
for player in players_to_remove:
self.mass.create_task(self.remove_client(player))
+ # Return False if all clients were removed (or scheduled for removal)
+ remaining_clients = len(sync_clients) - len(players_to_remove)
+ return remaining_clients > 0
+
async def _write_chunk_to_player(self, airplay_player: AirPlayPlayer, chunk: bytes) -> None:
"""Write audio chunk to a player's ffmpeg process."""
player_id = airplay_player.player_id