From: Marcel van der Veldt Date: Mon, 3 Nov 2025 17:32:13 +0000 (+0100) Subject: Swap initialization order of AirPlay X-Git-Url: https://git.kitaultman.com/?a=commitdiff_plain;h=38c790c26109ba9a40c9aef6f4af61ee65679bcc;p=music-assistant-server.git Swap initialization order of AirPlay only start streaming to airplay if we have received the first audio chunk --- diff --git a/music_assistant/providers/airplay/player.py b/music_assistant/providers/airplay/player.py index 924a8f52..20e0fe10 100644 --- a/music_assistant/providers/airplay/player.py +++ b/music_assistant/providers/airplay/player.py @@ -448,10 +448,8 @@ class AirPlayPlayer(Player): # setup StreamSession for player (and its sync childs if any) sync_clients = self._get_sync_clients() provider = cast("AirPlayProvider", self.provider) - stream_session = AirPlayStreamSession( - provider, sync_clients, AIRPLAY_PCM_FORMAT, audio_source - ) - await stream_session.start() + stream_session = AirPlayStreamSession(provider, sync_clients, AIRPLAY_PCM_FORMAT) + await stream_session.start(audio_source) async def volume_set(self, volume_level: int) -> None: """Send VOLUME_SET command to given player.""" diff --git a/music_assistant/providers/airplay/protocols/raop.py b/music_assistant/providers/airplay/protocols/raop.py index 64ca1672..954ad293 100644 --- a/music_assistant/providers/airplay/protocols/raop.py +++ b/music_assistant/providers/airplay/protocols/raop.py @@ -2,7 +2,6 @@ from __future__ import annotations -import asyncio import logging from typing import TYPE_CHECKING, cast @@ -37,7 +36,6 @@ class RaopStream(AirPlayProtocol): """ supports_pairing = True - _stderr_reader_task: asyncio.Task[None] | None = None async def start(self, start_ntp: int) -> None: """Initialize CLIRaop process for a player.""" @@ -117,7 +115,7 @@ class RaopStream(AirPlayProtocol): raise PlayerCommandFailed("Cannot connect to AirPlay device") # start reading the stderr of the cliraop process from another task - self._stderr_reader_task = self.mass.create_task(self._stderr_reader()) + self._cli_proc.attach_stderr_reader(self.mass.create_task(self._stderr_reader())) async def start_pairing(self) -> None: """Start pairing process for this protocol (if supported).""" diff --git a/music_assistant/providers/airplay/stream_session.py b/music_assistant/providers/airplay/stream_session.py index 5ebb9c2f..60ace8f4 100644 --- a/music_assistant/providers/airplay/stream_session.py +++ b/music_assistant/providers/airplay/stream_session.py @@ -35,7 +35,6 @@ class AirPlayStreamSession: airplay_provider: AirPlayProvider, sync_clients: list[AirPlayPlayer], pcm_format: AudioFormat, - audio_source: AsyncGenerator[bytes, None], ) -> None: """Initialize AirPlayStreamSession. @@ -50,7 +49,6 @@ class AirPlayStreamSession: self.mass = airplay_provider.mass self.pcm_format = pcm_format self.sync_clients = sync_clients - self._audio_source = audio_source self._audio_source_task: asyncio.Task[None] | None = None self._player_ffmpeg: dict[str, FFMpeg] = {} self._lock = asyncio.Lock() @@ -62,71 +60,57 @@ class AirPlayStreamSession: # we need to track when the last stream was started self.last_stream_started: float = 0.0 self._clients_ready = asyncio.Event() + self._first_chunk_received = asyncio.Event() - async def start(self) -> None: + async def start(self, audio_source: AsyncGenerator[bytes, None]) -> None: """Initialize stream session for all players.""" + self.prov.logger.debug( + "Starting stream session with %d clients", + len(self.sync_clients), + ) + # Prepare all clients + # this will create the stream objects, named pipes and start ffmpeg + async with TaskManager(self.mass) as tm: + for _airplay_player in self.sync_clients: + tm.create_task(self._prepare_client(_airplay_player)) + + # Start audio source streamer task + # this will read from the audio source and distribute + # to all player-specific ffmpeg processes + # we start this task early because some streams (especially radio) + # may need more time to buffer - this way we ensure we have audio ready + # when the players should start playing + self._audio_source_task = asyncio.create_task(self._audio_streamer(audio_source)) + # wait until the first chunk is received before starting clients + await self._first_chunk_received.wait() + + # Start all clients # Get current NTP timestamp and calculate wait time cur_time = time.time() - wait_start = 1750 + (250 * len(self.sync_clients)) # in milliseconds + wait_start = 1500 + (250 * len(self.sync_clients)) # in milliseconds wait_start_seconds = wait_start / 1000 self.wait_start = wait_start_seconds # in seconds self.start_time = cur_time + wait_start_seconds self.last_stream_started = self.start_time self.start_ntp = unix_time_to_ntp(self.start_time) - self.prov.logger.debug( - "Starting stream session with %d clients", - len(self.sync_clients), - ) - # Start audio source streamer task - # this will read from the audio source and distribute to all players - # we start this task early so it can buffer audio while players are starting - self._audio_source_task = asyncio.create_task(self._audio_streamer()) - - async def _start_client(airplay_player: AirPlayPlayer) -> None: - """Start stream for a single client.""" - # Stop existing stream if running - if airplay_player.stream and airplay_player.stream.running: - await airplay_player.stream.stop() - - # Create appropriate stream type based on protocol - if airplay_player.protocol == StreamingProtocol.AIRPLAY2: - airplay_player.stream = AirPlay2Stream(airplay_player) - else: - airplay_player.stream = RaopStream(airplay_player) - - # Link stream session to player stream - airplay_player.stream.session = self - # create the named pipes - await airplay_player.stream.audio_pipe.create() - await airplay_player.stream.commands_pipe.create() - # start the (player-specific) ffmpeg process - # note that ffmpeg will open the named pipe for writing - await self._start_client_ffmpeg(airplay_player) - - # start the stream - await airplay_player.stream.start(self.start_ntp) - - # repeat sending the volume level to the player because some players seem - # to ignore it the first time - # https://github.com/music-assistant/support/issues/3330 - await airplay_player.stream.send_cli_command(f"VOLUME={airplay_player.volume_level}\n") async with TaskManager(self.mass) as tm: for _airplay_player in self.sync_clients: - tm.create_task(_start_client(_airplay_player)) + tm.create_task(self._start_client(_airplay_player, self.start_ntp)) + # All clients started self._clients_ready.set() async def stop(self) -> None: """Stop playback and cleanup.""" - if self._audio_source_task and not self._audio_source_task.done(): - self._audio_source_task.cancel() - with suppress(asyncio.CancelledError): - await self._audio_source_task await asyncio.gather( *[self.remove_client(x) for x in self.sync_clients], return_exceptions=True, ) + if self._audio_source_task and not self._audio_source_task.done(): + self._audio_source_task.cancel() + with suppress(asyncio.CancelledError): + await self._audio_source_task async def remove_client(self, airplay_player: AirPlayPlayer) -> None: """Remove a sync client from the session.""" @@ -136,12 +120,9 @@ class AirPlayStreamSession: assert airplay_player.stream.session == self async with self._lock: self.sync_clients.remove(airplay_player) + await airplay_player.stream.stop() if ffmpeg := self._player_ffmpeg.pop(airplay_player.player_id, None): - await ffmpeg.write_eof() await ffmpeg.close() - del ffmpeg - await airplay_player.stream.stop() - airplay_player.stream = None # If this was the last client, stop the session if not self.sync_clients: await self.stop() @@ -175,22 +156,8 @@ class AirPlayStreamSession: ) return - # Stop existing stream if the player is already streaming - # should not happen, but guard just in case - if airplay_player.stream and airplay_player.stream.running: - await airplay_player.stream.stop() - - # Create appropriate stream type based on protocol - if airplay_player.protocol == StreamingProtocol.AIRPLAY2: - airplay_player.stream = AirPlay2Stream(airplay_player) - else: - airplay_player.stream = RaopStream(airplay_player) - - # Link stream session to player stream - airplay_player.stream.session = self - # create the named pipes - await airplay_player.stream.audio_pipe.create() - await airplay_player.stream.commands_pipe.create() + # Prepare the new client for streaming + await self._prepare_client(airplay_player) # Snapshot seconds_streamed inside lock to prevent race conditions # Keep lock held during stream.start() to ensure player doesn't miss any chunks @@ -209,44 +176,44 @@ class AirPlayStreamSession: if airplay_player not in self.sync_clients: self.sync_clients.append(airplay_player) - # start the player specific ffmpeg process - await self._start_client_ffmpeg(airplay_player) - await airplay_player.stream.start(start_ntp) + await self._start_client(airplay_player, start_ntp) async def replace_stream(self, audio_source: AsyncGenerator[bytes, None]) -> None: """Replace the audio source of the stream.""" + self._first_chunk_received.clear() + new_audio_source_task = asyncio.create_task(self._audio_streamer(audio_source)) + await self._first_chunk_received.wait() async with self._lock: # Cancel the current audio source task assert self._audio_source_task # for type checker - self._audio_source_task.cancel() - # Set new audio source and restart the stream - self._audio_source = audio_source - self._audio_source_task = asyncio.create_task(self._audio_streamer()) + old_audio_source_task = self._audio_source_task + old_audio_source_task.cancel() + self._audio_source_task = new_audio_source_task self.last_stream_started = time.time() + self.wait_start for sync_client in self.sync_clients: sync_client.set_state_from_stream(state=None, elapsed_time=0) # ensure we cleanly wait for the old audio source task to finish with suppress(asyncio.CancelledError): - await self._audio_source_task + await old_audio_source_task - async def _audio_streamer(self) -> None: + async def _audio_streamer(self, audio_source: AsyncGenerator[bytes, None]) -> None: """Stream audio to all players.""" _last_metadata: str | None = None pcm_sample_size = self.pcm_format.pcm_sample_size stream_start_time = time.time() first_chunk_received = False # each chunk is exactly one second of audio data based on the pcm format. - async for chunk in self._audio_source: + async for chunk in audio_source: if first_chunk_received is False: first_chunk_received = True self.prov.logger.debug( - "First audio chunk received after %.3fs, " - "which is %.3fs before scheduled start time", + "First audio chunk received after %.3fs", time.time() - stream_start_time, - self.last_stream_started - time.time(), ) - # wait until the clients are ready to receive audio - await asyncio.wait_for(self._clients_ready.wait(), timeout=10) + self._first_chunk_received.set() + # Wait until all clients are ready + await self._clients_ready.wait() + # Send chunk to all players async with self._lock: sync_clients = [x for x in self.sync_clients if x.stream and x.stream.running] if not sync_clients: @@ -320,6 +287,7 @@ class AirPlayStreamSession: prev_progress_report = now self.mass.create_task(self._send_metadata(progress, None)) # Entire stream consumed: send EOF + self.prov.logger.debug("Audio source stream exhausted") async with self._lock: await asyncio.gather( *[ @@ -372,9 +340,11 @@ class AirPlayStreamSession: # cleanup any associated FFMpeg instance first if ffmpeg := self._player_ffmpeg.pop(airplay_player.player_id, None): await ffmpeg.write_eof() - await ffmpeg.close() - if airplay_player.stream: - await airplay_player.stream.stop() + await ffmpeg.wait_with_timeout(30) + del ffmpeg + assert airplay_player.stream # for type checker + # then stop the player stream + await airplay_player.stream.stop() async def _send_metadata(self, progress: int | None, metadata: PlayerMedia | None) -> None: """Send metadata to all players.""" @@ -388,6 +358,36 @@ class AirPlayStreamSession: return_exceptions=True, ) + async def _prepare_client(self, airplay_player: AirPlayPlayer) -> None: + """Prepare stream for a single client.""" + # Stop existing stream if running + if airplay_player.stream and airplay_player.stream.running: + await airplay_player.stream.stop() + # Create appropriate stream type based on protocol + if airplay_player.protocol == StreamingProtocol.AIRPLAY2: + airplay_player.stream = AirPlay2Stream(airplay_player) + else: + airplay_player.stream = RaopStream(airplay_player) + # Link stream session to player stream + airplay_player.stream.session = self + # create the named pipes + await airplay_player.stream.audio_pipe.create() + await airplay_player.stream.commands_pipe.create() + # start the (player-specific) ffmpeg process + # note that ffmpeg will open the named pipe for writing + await self._start_client_ffmpeg(airplay_player) + await asyncio.sleep(0.05) # allow ffmpeg to open the pipe properly + + async def _start_client(self, airplay_player: AirPlayPlayer, start_ntp: int) -> None: + """Start stream for a single client.""" + # start the stream + assert airplay_player.stream # for type checker + await airplay_player.stream.start(start_ntp) + # repeat sending the volume level to the player because some players seem + # to ignore it the first time + # https://github.com/music-assistant/support/issues/3330 + await airplay_player.stream.send_cli_command(f"VOLUME={airplay_player.volume_level}\n") + async def _start_client_ffmpeg(self, airplay_player: AirPlayPlayer) -> None: """Start or restart the player's ffmpeg stream.""" # Clean up any existing FFmpeg instance for this player