Swap initialization order of AirPlay
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Mon, 3 Nov 2025 17:32:13 +0000 (18:32 +0100)
committerMarcel van der Veldt <m.vanderveldt@outlook.com>
Mon, 3 Nov 2025 17:32:13 +0000 (18:32 +0100)
only start streaming to airplay if we have received the first audio chunk

music_assistant/providers/airplay/player.py
music_assistant/providers/airplay/protocols/raop.py
music_assistant/providers/airplay/stream_session.py

index 924a8f52275f069c1dbf4ee946702c5a63483d81..20e0fe10f17b1bd0a2bc98ddaf77a1138253f5a6 100644 (file)
@@ -448,10 +448,8 @@ class AirPlayPlayer(Player):
         # setup StreamSession for player (and its sync childs if any)
         sync_clients = self._get_sync_clients()
         provider = cast("AirPlayProvider", self.provider)
-        stream_session = AirPlayStreamSession(
-            provider, sync_clients, AIRPLAY_PCM_FORMAT, audio_source
-        )
-        await stream_session.start()
+        stream_session = AirPlayStreamSession(provider, sync_clients, AIRPLAY_PCM_FORMAT)
+        await stream_session.start(audio_source)
 
     async def volume_set(self, volume_level: int) -> None:
         """Send VOLUME_SET command to given player."""
index 64ca16727d5280717f21110d3b91aa2169bf610a..954ad293bf3edf704c58d30b9a8670b8c55a1df7 100644 (file)
@@ -2,7 +2,6 @@
 
 from __future__ import annotations
 
-import asyncio
 import logging
 from typing import TYPE_CHECKING, cast
 
@@ -37,7 +36,6 @@ class RaopStream(AirPlayProtocol):
     """
 
     supports_pairing = True
-    _stderr_reader_task: asyncio.Task[None] | None = None
 
     async def start(self, start_ntp: int) -> None:
         """Initialize CLIRaop process for a player."""
@@ -117,7 +115,7 @@ class RaopStream(AirPlayProtocol):
                 raise PlayerCommandFailed("Cannot connect to AirPlay device")
 
         # start reading the stderr of the cliraop process from another task
-        self._stderr_reader_task = self.mass.create_task(self._stderr_reader())
+        self._cli_proc.attach_stderr_reader(self.mass.create_task(self._stderr_reader()))
 
     async def start_pairing(self) -> None:
         """Start pairing process for this protocol (if supported)."""
index 5ebb9c2fb39d55f5b70d67a2f76f13b084fdce70..60ace8f44ca28b66b68b101817d6e363ae4c82b7 100644 (file)
@@ -35,7 +35,6 @@ class AirPlayStreamSession:
         airplay_provider: AirPlayProvider,
         sync_clients: list[AirPlayPlayer],
         pcm_format: AudioFormat,
-        audio_source: AsyncGenerator[bytes, None],
     ) -> None:
         """Initialize AirPlayStreamSession.
 
@@ -50,7 +49,6 @@ class AirPlayStreamSession:
         self.mass = airplay_provider.mass
         self.pcm_format = pcm_format
         self.sync_clients = sync_clients
-        self._audio_source = audio_source
         self._audio_source_task: asyncio.Task[None] | None = None
         self._player_ffmpeg: dict[str, FFMpeg] = {}
         self._lock = asyncio.Lock()
@@ -62,71 +60,57 @@ class AirPlayStreamSession:
         # we need to track when the last stream was started
         self.last_stream_started: float = 0.0
         self._clients_ready = asyncio.Event()
+        self._first_chunk_received = asyncio.Event()
 
-    async def start(self) -> None:
+    async def start(self, audio_source: AsyncGenerator[bytes, None]) -> None:
         """Initialize stream session for all players."""
+        self.prov.logger.debug(
+            "Starting stream session with %d clients",
+            len(self.sync_clients),
+        )
+        # Prepare all clients
+        # this will create the stream objects, named pipes and start ffmpeg
+        async with TaskManager(self.mass) as tm:
+            for _airplay_player in self.sync_clients:
+                tm.create_task(self._prepare_client(_airplay_player))
+
+        # Start audio source streamer task
+        # this will read from the audio source and distribute
+        # to all player-specific ffmpeg processes
+        # we start this task early because some streams (especially radio)
+        # may need more time to buffer - this way we ensure we have audio ready
+        # when the players should start playing
+        self._audio_source_task = asyncio.create_task(self._audio_streamer(audio_source))
+        # wait until the first chunk is received before starting clients
+        await self._first_chunk_received.wait()
+
+        # Start all clients
         # Get current NTP timestamp and calculate wait time
         cur_time = time.time()
-        wait_start = 1750 + (250 * len(self.sync_clients))  # in milliseconds
+        wait_start = 1500 + (250 * len(self.sync_clients))  # in milliseconds
         wait_start_seconds = wait_start / 1000
         self.wait_start = wait_start_seconds  # in seconds
         self.start_time = cur_time + wait_start_seconds
         self.last_stream_started = self.start_time
         self.start_ntp = unix_time_to_ntp(self.start_time)
-        self.prov.logger.debug(
-            "Starting stream session with %d clients",
-            len(self.sync_clients),
-        )
-        # Start audio source streamer task
-        # this will read from the audio source and distribute to all players
-        # we start this task early so it can buffer audio while players are starting
-        self._audio_source_task = asyncio.create_task(self._audio_streamer())
-
-        async def _start_client(airplay_player: AirPlayPlayer) -> None:
-            """Start stream for a single client."""
-            # Stop existing stream if running
-            if airplay_player.stream and airplay_player.stream.running:
-                await airplay_player.stream.stop()
-
-            # Create appropriate stream type based on protocol
-            if airplay_player.protocol == StreamingProtocol.AIRPLAY2:
-                airplay_player.stream = AirPlay2Stream(airplay_player)
-            else:
-                airplay_player.stream = RaopStream(airplay_player)
-
-            # Link stream session to player stream
-            airplay_player.stream.session = self
-            # create the named pipes
-            await airplay_player.stream.audio_pipe.create()
-            await airplay_player.stream.commands_pipe.create()
-            # start the (player-specific) ffmpeg process
-            # note that ffmpeg will open the named pipe for writing
-            await self._start_client_ffmpeg(airplay_player)
-
-            # start the stream
-            await airplay_player.stream.start(self.start_ntp)
-
-            # repeat sending the volume level to the player because some players seem
-            # to ignore it the first time
-            # https://github.com/music-assistant/support/issues/3330
-            await airplay_player.stream.send_cli_command(f"VOLUME={airplay_player.volume_level}\n")
 
         async with TaskManager(self.mass) as tm:
             for _airplay_player in self.sync_clients:
-                tm.create_task(_start_client(_airplay_player))
+                tm.create_task(self._start_client(_airplay_player, self.start_ntp))
+
         # All clients started
         self._clients_ready.set()
 
     async def stop(self) -> None:
         """Stop playback and cleanup."""
-        if self._audio_source_task and not self._audio_source_task.done():
-            self._audio_source_task.cancel()
-            with suppress(asyncio.CancelledError):
-                await self._audio_source_task
         await asyncio.gather(
             *[self.remove_client(x) for x in self.sync_clients],
             return_exceptions=True,
         )
+        if self._audio_source_task and not self._audio_source_task.done():
+            self._audio_source_task.cancel()
+            with suppress(asyncio.CancelledError):
+                await self._audio_source_task
 
     async def remove_client(self, airplay_player: AirPlayPlayer) -> None:
         """Remove a sync client from the session."""
@@ -136,12 +120,9 @@ class AirPlayStreamSession:
         assert airplay_player.stream.session == self
         async with self._lock:
             self.sync_clients.remove(airplay_player)
+        await airplay_player.stream.stop()
         if ffmpeg := self._player_ffmpeg.pop(airplay_player.player_id, None):
-            await ffmpeg.write_eof()
             await ffmpeg.close()
-            del ffmpeg
-        await airplay_player.stream.stop()
-        airplay_player.stream = None
         # If this was the last client, stop the session
         if not self.sync_clients:
             await self.stop()
@@ -175,22 +156,8 @@ class AirPlayStreamSession:
                 )
             return
 
-        # Stop existing stream if the player is already streaming
-        # should not happen, but guard just in case
-        if airplay_player.stream and airplay_player.stream.running:
-            await airplay_player.stream.stop()
-
-        # Create appropriate stream type based on protocol
-        if airplay_player.protocol == StreamingProtocol.AIRPLAY2:
-            airplay_player.stream = AirPlay2Stream(airplay_player)
-        else:
-            airplay_player.stream = RaopStream(airplay_player)
-
-        # Link stream session to player stream
-        airplay_player.stream.session = self
-        # create the named pipes
-        await airplay_player.stream.audio_pipe.create()
-        await airplay_player.stream.commands_pipe.create()
+        # Prepare the new client for streaming
+        await self._prepare_client(airplay_player)
 
         # Snapshot seconds_streamed inside lock to prevent race conditions
         # Keep lock held during stream.start() to ensure player doesn't miss any chunks
@@ -209,44 +176,44 @@ class AirPlayStreamSession:
             if airplay_player not in self.sync_clients:
                 self.sync_clients.append(airplay_player)
 
-            # start the player specific ffmpeg process
-            await self._start_client_ffmpeg(airplay_player)
-            await airplay_player.stream.start(start_ntp)
+        await self._start_client(airplay_player, start_ntp)
 
     async def replace_stream(self, audio_source: AsyncGenerator[bytes, None]) -> None:
         """Replace the audio source of the stream."""
+        self._first_chunk_received.clear()
+        new_audio_source_task = asyncio.create_task(self._audio_streamer(audio_source))
+        await self._first_chunk_received.wait()
         async with self._lock:
             # Cancel the current audio source task
             assert self._audio_source_task  # for type checker
-            self._audio_source_task.cancel()
-            # Set new audio source and restart the stream
-            self._audio_source = audio_source
-            self._audio_source_task = asyncio.create_task(self._audio_streamer())
+            old_audio_source_task = self._audio_source_task
+            old_audio_source_task.cancel()
+            self._audio_source_task = new_audio_source_task
         self.last_stream_started = time.time() + self.wait_start
         for sync_client in self.sync_clients:
             sync_client.set_state_from_stream(state=None, elapsed_time=0)
         # ensure we cleanly wait for the old audio source task to finish
         with suppress(asyncio.CancelledError):
-            await self._audio_source_task
+            await old_audio_source_task
 
-    async def _audio_streamer(self) -> None:
+    async def _audio_streamer(self, audio_source: AsyncGenerator[bytes, None]) -> None:
         """Stream audio to all players."""
         _last_metadata: str | None = None
         pcm_sample_size = self.pcm_format.pcm_sample_size
         stream_start_time = time.time()
         first_chunk_received = False
         # each chunk is exactly one second of audio data based on the pcm format.
-        async for chunk in self._audio_source:
+        async for chunk in audio_source:
             if first_chunk_received is False:
                 first_chunk_received = True
                 self.prov.logger.debug(
-                    "First audio chunk received after %.3fs, "
-                    "which is %.3fs before scheduled start time",
+                    "First audio chunk received after %.3fs",
                     time.time() - stream_start_time,
-                    self.last_stream_started - time.time(),
                 )
-                # wait until the clients are ready to receive audio
-                await asyncio.wait_for(self._clients_ready.wait(), timeout=10)
+                self._first_chunk_received.set()
+            # Wait until all clients are ready
+            await self._clients_ready.wait()
+            # Send chunk to all players
             async with self._lock:
                 sync_clients = [x for x in self.sync_clients if x.stream and x.stream.running]
                 if not sync_clients:
@@ -320,6 +287,7 @@ class AirPlayStreamSession:
                     prev_progress_report = now
                     self.mass.create_task(self._send_metadata(progress, None))
         # Entire stream consumed: send EOF
+        self.prov.logger.debug("Audio source stream exhausted")
         async with self._lock:
             await asyncio.gather(
                 *[
@@ -372,9 +340,11 @@ class AirPlayStreamSession:
         # cleanup any associated FFMpeg instance first
         if ffmpeg := self._player_ffmpeg.pop(airplay_player.player_id, None):
             await ffmpeg.write_eof()
-            await ffmpeg.close()
-        if airplay_player.stream:
-            await airplay_player.stream.stop()
+            await ffmpeg.wait_with_timeout(30)
+            del ffmpeg
+        assert airplay_player.stream  # for type checker
+        # then stop the player stream
+        await airplay_player.stream.stop()
 
     async def _send_metadata(self, progress: int | None, metadata: PlayerMedia | None) -> None:
         """Send metadata to all players."""
@@ -388,6 +358,36 @@ class AirPlayStreamSession:
                 return_exceptions=True,
             )
 
+    async def _prepare_client(self, airplay_player: AirPlayPlayer) -> None:
+        """Prepare stream for a single client."""
+        # Stop existing stream if running
+        if airplay_player.stream and airplay_player.stream.running:
+            await airplay_player.stream.stop()
+        # Create appropriate stream type based on protocol
+        if airplay_player.protocol == StreamingProtocol.AIRPLAY2:
+            airplay_player.stream = AirPlay2Stream(airplay_player)
+        else:
+            airplay_player.stream = RaopStream(airplay_player)
+        # Link stream session to player stream
+        airplay_player.stream.session = self
+        # create the named pipes
+        await airplay_player.stream.audio_pipe.create()
+        await airplay_player.stream.commands_pipe.create()
+        # start the (player-specific) ffmpeg process
+        # note that ffmpeg will open the named pipe for writing
+        await self._start_client_ffmpeg(airplay_player)
+        await asyncio.sleep(0.05)  # allow ffmpeg to open the pipe properly
+
+    async def _start_client(self, airplay_player: AirPlayPlayer, start_ntp: int) -> None:
+        """Start stream for a single client."""
+        # start the stream
+        assert airplay_player.stream  # for type checker
+        await airplay_player.stream.start(start_ntp)
+        # repeat sending the volume level to the player because some players seem
+        # to ignore it the first time
+        # https://github.com/music-assistant/support/issues/3330
+        await airplay_player.stream.send_cli_command(f"VOLUME={airplay_player.volume_level}\n")
+
     async def _start_client_ffmpeg(self, airplay_player: AirPlayPlayer) -> None:
         """Start or restart the player's ffmpeg stream."""
         # Clean up any existing FFmpeg instance for this player