airplay_provider: AirPlayProvider,
sync_clients: list[AirPlayPlayer],
pcm_format: AudioFormat,
- audio_source: AsyncGenerator[bytes, None],
) -> None:
"""Initialize AirPlayStreamSession.
self.mass = airplay_provider.mass
self.pcm_format = pcm_format
self.sync_clients = sync_clients
- self._audio_source = audio_source
self._audio_source_task: asyncio.Task[None] | None = None
self._player_ffmpeg: dict[str, FFMpeg] = {}
self._lock = asyncio.Lock()
# we need to track when the last stream was started
self.last_stream_started: float = 0.0
self._clients_ready = asyncio.Event()
+ self._first_chunk_received = asyncio.Event()
- async def start(self) -> None:
+ async def start(self, audio_source: AsyncGenerator[bytes, None]) -> None:
"""Initialize stream session for all players."""
+ self.prov.logger.debug(
+ "Starting stream session with %d clients",
+ len(self.sync_clients),
+ )
+ # Prepare all clients
+ # this will create the stream objects, named pipes and start ffmpeg
+ async with TaskManager(self.mass) as tm:
+ for _airplay_player in self.sync_clients:
+ tm.create_task(self._prepare_client(_airplay_player))
+
+ # Start audio source streamer task
+ # this will read from the audio source and distribute
+ # to all player-specific ffmpeg processes
+ # we start this task early because some streams (especially radio)
+ # may need more time to buffer - this way we ensure we have audio ready
+ # when the players should start playing
+ self._audio_source_task = asyncio.create_task(self._audio_streamer(audio_source))
+ # wait until the first chunk is received before starting clients
+ await self._first_chunk_received.wait()
+
+ # Start all clients
# Get current NTP timestamp and calculate wait time
cur_time = time.time()
- wait_start = 1750 + (250 * len(self.sync_clients)) # in milliseconds
+ wait_start = 1500 + (250 * len(self.sync_clients)) # in milliseconds
wait_start_seconds = wait_start / 1000
self.wait_start = wait_start_seconds # in seconds
self.start_time = cur_time + wait_start_seconds
self.last_stream_started = self.start_time
self.start_ntp = unix_time_to_ntp(self.start_time)
- self.prov.logger.debug(
- "Starting stream session with %d clients",
- len(self.sync_clients),
- )
- # Start audio source streamer task
- # this will read from the audio source and distribute to all players
- # we start this task early so it can buffer audio while players are starting
- self._audio_source_task = asyncio.create_task(self._audio_streamer())
-
- async def _start_client(airplay_player: AirPlayPlayer) -> None:
- """Start stream for a single client."""
- # Stop existing stream if running
- if airplay_player.stream and airplay_player.stream.running:
- await airplay_player.stream.stop()
-
- # Create appropriate stream type based on protocol
- if airplay_player.protocol == StreamingProtocol.AIRPLAY2:
- airplay_player.stream = AirPlay2Stream(airplay_player)
- else:
- airplay_player.stream = RaopStream(airplay_player)
-
- # Link stream session to player stream
- airplay_player.stream.session = self
- # create the named pipes
- await airplay_player.stream.audio_pipe.create()
- await airplay_player.stream.commands_pipe.create()
- # start the (player-specific) ffmpeg process
- # note that ffmpeg will open the named pipe for writing
- await self._start_client_ffmpeg(airplay_player)
-
- # start the stream
- await airplay_player.stream.start(self.start_ntp)
-
- # repeat sending the volume level to the player because some players seem
- # to ignore it the first time
- # https://github.com/music-assistant/support/issues/3330
- await airplay_player.stream.send_cli_command(f"VOLUME={airplay_player.volume_level}\n")
async with TaskManager(self.mass) as tm:
for _airplay_player in self.sync_clients:
- tm.create_task(_start_client(_airplay_player))
+ tm.create_task(self._start_client(_airplay_player, self.start_ntp))
+
# All clients started
self._clients_ready.set()
async def stop(self) -> None:
"""Stop playback and cleanup."""
- if self._audio_source_task and not self._audio_source_task.done():
- self._audio_source_task.cancel()
- with suppress(asyncio.CancelledError):
- await self._audio_source_task
await asyncio.gather(
*[self.remove_client(x) for x in self.sync_clients],
return_exceptions=True,
)
+ if self._audio_source_task and not self._audio_source_task.done():
+ self._audio_source_task.cancel()
+ with suppress(asyncio.CancelledError):
+ await self._audio_source_task
async def remove_client(self, airplay_player: AirPlayPlayer) -> None:
"""Remove a sync client from the session."""
assert airplay_player.stream.session == self
async with self._lock:
self.sync_clients.remove(airplay_player)
+ await airplay_player.stream.stop()
if ffmpeg := self._player_ffmpeg.pop(airplay_player.player_id, None):
- await ffmpeg.write_eof()
await ffmpeg.close()
- del ffmpeg
- await airplay_player.stream.stop()
- airplay_player.stream = None
# If this was the last client, stop the session
if not self.sync_clients:
await self.stop()
)
return
- # Stop existing stream if the player is already streaming
- # should not happen, but guard just in case
- if airplay_player.stream and airplay_player.stream.running:
- await airplay_player.stream.stop()
-
- # Create appropriate stream type based on protocol
- if airplay_player.protocol == StreamingProtocol.AIRPLAY2:
- airplay_player.stream = AirPlay2Stream(airplay_player)
- else:
- airplay_player.stream = RaopStream(airplay_player)
-
- # Link stream session to player stream
- airplay_player.stream.session = self
- # create the named pipes
- await airplay_player.stream.audio_pipe.create()
- await airplay_player.stream.commands_pipe.create()
+ # Prepare the new client for streaming
+ await self._prepare_client(airplay_player)
# Snapshot seconds_streamed inside lock to prevent race conditions
# Keep lock held during stream.start() to ensure player doesn't miss any chunks
if airplay_player not in self.sync_clients:
self.sync_clients.append(airplay_player)
- # start the player specific ffmpeg process
- await self._start_client_ffmpeg(airplay_player)
- await airplay_player.stream.start(start_ntp)
+ await self._start_client(airplay_player, start_ntp)
async def replace_stream(self, audio_source: AsyncGenerator[bytes, None]) -> None:
"""Replace the audio source of the stream."""
+ self._first_chunk_received.clear()
+ new_audio_source_task = asyncio.create_task(self._audio_streamer(audio_source))
+ await self._first_chunk_received.wait()
async with self._lock:
# Cancel the current audio source task
assert self._audio_source_task # for type checker
- self._audio_source_task.cancel()
- # Set new audio source and restart the stream
- self._audio_source = audio_source
- self._audio_source_task = asyncio.create_task(self._audio_streamer())
+ old_audio_source_task = self._audio_source_task
+ old_audio_source_task.cancel()
+ self._audio_source_task = new_audio_source_task
self.last_stream_started = time.time() + self.wait_start
for sync_client in self.sync_clients:
sync_client.set_state_from_stream(state=None, elapsed_time=0)
# ensure we cleanly wait for the old audio source task to finish
with suppress(asyncio.CancelledError):
- await self._audio_source_task
+ await old_audio_source_task
- async def _audio_streamer(self) -> None:
+ async def _audio_streamer(self, audio_source: AsyncGenerator[bytes, None]) -> None:
"""Stream audio to all players."""
_last_metadata: str | None = None
pcm_sample_size = self.pcm_format.pcm_sample_size
stream_start_time = time.time()
first_chunk_received = False
# each chunk is exactly one second of audio data based on the pcm format.
- async for chunk in self._audio_source:
+ async for chunk in audio_source:
if first_chunk_received is False:
first_chunk_received = True
self.prov.logger.debug(
- "First audio chunk received after %.3fs, "
- "which is %.3fs before scheduled start time",
+ "First audio chunk received after %.3fs",
time.time() - stream_start_time,
- self.last_stream_started - time.time(),
)
- # wait until the clients are ready to receive audio
- await asyncio.wait_for(self._clients_ready.wait(), timeout=10)
+ self._first_chunk_received.set()
+ # Wait until all clients are ready
+ await self._clients_ready.wait()
+ # Send chunk to all players
async with self._lock:
sync_clients = [x for x in self.sync_clients if x.stream and x.stream.running]
if not sync_clients:
prev_progress_report = now
self.mass.create_task(self._send_metadata(progress, None))
# Entire stream consumed: send EOF
+ self.prov.logger.debug("Audio source stream exhausted")
async with self._lock:
await asyncio.gather(
*[
# cleanup any associated FFMpeg instance first
if ffmpeg := self._player_ffmpeg.pop(airplay_player.player_id, None):
await ffmpeg.write_eof()
- await ffmpeg.close()
- if airplay_player.stream:
- await airplay_player.stream.stop()
+ await ffmpeg.wait_with_timeout(30)
+ del ffmpeg
+ assert airplay_player.stream # for type checker
+ # then stop the player stream
+ await airplay_player.stream.stop()
async def _send_metadata(self, progress: int | None, metadata: PlayerMedia | None) -> None:
"""Send metadata to all players."""
return_exceptions=True,
)
+ async def _prepare_client(self, airplay_player: AirPlayPlayer) -> None:
+ """Prepare stream for a single client."""
+ # Stop existing stream if running
+ if airplay_player.stream and airplay_player.stream.running:
+ await airplay_player.stream.stop()
+ # Create appropriate stream type based on protocol
+ if airplay_player.protocol == StreamingProtocol.AIRPLAY2:
+ airplay_player.stream = AirPlay2Stream(airplay_player)
+ else:
+ airplay_player.stream = RaopStream(airplay_player)
+ # Link stream session to player stream
+ airplay_player.stream.session = self
+ # create the named pipes
+ await airplay_player.stream.audio_pipe.create()
+ await airplay_player.stream.commands_pipe.create()
+ # start the (player-specific) ffmpeg process
+ # note that ffmpeg will open the named pipe for writing
+ await self._start_client_ffmpeg(airplay_player)
+ await asyncio.sleep(0.05) # allow ffmpeg to open the pipe properly
+
+ async def _start_client(self, airplay_player: AirPlayPlayer, start_ntp: int) -> None:
+ """Start stream for a single client."""
+ # start the stream
+ assert airplay_player.stream # for type checker
+ await airplay_player.stream.start(start_ntp)
+ # repeat sending the volume level to the player because some players seem
+ # to ignore it the first time
+ # https://github.com/music-assistant/support/issues/3330
+ await airplay_player.stream.send_cli_command(f"VOLUME={airplay_player.volume_level}\n")
+
async def _start_client_ffmpeg(self, airplay_player: AirPlayPlayer) -> None:
"""Start or restart the player's ffmpeg stream."""
# Clean up any existing FFmpeg instance for this player