Some more cleanup to airplay provider
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Fri, 31 Oct 2025 03:10:21 +0000 (04:10 +0100)
committerMarcel van der Veldt <m.vanderveldt@outlook.com>
Fri, 31 Oct 2025 03:10:21 +0000 (04:10 +0100)
music_assistant/providers/airplay/stream_session.py

index 5e89793c49e21385ee5a0140846ab1c32a67b54f..2d1c1ccc50f0a8cb851840a981c12f02afb0be13 100644 (file)
@@ -52,11 +52,13 @@ class AirPlayStreamSession:
         self._audio_source = audio_source
         self._audio_source_task: asyncio.Task[None] | None = None
         self._player_ffmpeg: dict[str, FFMpeg] = {}
-        self._player_start_chunk: dict[str, int] = {}  # Chunk number when player joined
         self._lock = asyncio.Lock()
         self.start_ntp: int = 0
         self.start_time: float = 0.0
         self.chunks_streamed: int = 0  # Total chunks sent to session (each chunk = 1 second)
+        # because we reuse an existing stream session for new play_media requests,
+        # we need to track when the last stream was started
+        self.last_stream_started: float = 0.0
 
     async def start(self) -> None:
         """Initialize stream session for all players."""
@@ -78,11 +80,6 @@ class AirPlayStreamSession:
             # Stop existing stream if running
             if airplay_player.stream and airplay_player.stream.running:
                 await airplay_player.stream.stop()
-            if ffmpeg := self._player_ffmpeg.pop(airplay_player.player_id, None):
-                await ffmpeg.close()
-                del ffmpeg
-
-            self._player_start_chunk[airplay_player.player_id] = 1
 
             # Create appropriate stream type based on protocol
             if airplay_player.protocol == StreamingProtocol.AIRPLAY2:
@@ -90,28 +87,11 @@ class AirPlayStreamSession:
             else:
                 airplay_player.stream = RaopStream(self, airplay_player)
 
-            # create optional FFMpeg instance per player if needed
-            # this is used to do any optional DSP processing/filtering
-            filter_params = get_player_filter_params(
-                self.mass,
-                airplay_player.player_id,
-                self.pcm_format,
-                airplay_player.stream.pcm_format,
-            )
-            if filter_params or self.pcm_format != airplay_player.stream.pcm_format:
-                ffmpeg = FFMpeg(
-                    audio_input="-",
-                    input_format=self.pcm_format,
-                    output_format=airplay_player.stream.pcm_format,
-                    filter_params=filter_params,
-                )
-                await ffmpeg.start()
-                self._player_ffmpeg[airplay_player.player_id] = ffmpeg
+            await self._start_client_ffmpeg(airplay_player)
 
+            # start the stream
             await airplay_player.stream.start(self.start_ntp)
 
-            # Tracking will be initialized on first write
-
         async with TaskManager(self.mass) as tm:
             for _airplay_player in self.sync_clients:
                 tm.create_task(_start_client(_airplay_player))
@@ -141,8 +121,6 @@ class AirPlayStreamSession:
         if ffmpeg := self._player_ffmpeg.pop(airplay_player.player_id, None):
             await ffmpeg.close()
             del ffmpeg
-        # Clean up player tracking
-        self._player_start_chunk.pop(airplay_player.player_id, None)
         await airplay_player.stream.stop()
         airplay_player.stream = None
         # If this was the last client, stop the session
@@ -179,37 +157,18 @@ class AirPlayStreamSession:
         if airplay_player.stream and airplay_player.stream.running:
             await airplay_player.stream.stop()
 
-        # Clean up any existing FFmpeg instance for this player
-        if ffmpeg := self._player_ffmpeg.pop(airplay_player.player_id, None):
-            await ffmpeg.close()
-            del ffmpeg
-
         # Create appropriate stream type based on protocol
         if airplay_player.protocol == StreamingProtocol.AIRPLAY2:
             airplay_player.stream = AirPlay2Stream(self, airplay_player)
         else:
             airplay_player.stream = RaopStream(self, airplay_player)
 
-        # Create optional FFMpeg instance per player if needed
-        filter_params = get_player_filter_params(
-            self.mass,
-            airplay_player.player_id,
-            self.pcm_format,
-            airplay_player.stream.pcm_format,
-        )
-        if filter_params or self.pcm_format != airplay_player.stream.pcm_format:
-            ffmpeg = FFMpeg(
-                audio_input="-",
-                input_format=self.pcm_format,
-                output_format=airplay_player.stream.pcm_format,
-                filter_params=filter_params,
-            )
-            await ffmpeg.start()
-            self._player_ffmpeg[airplay_player.player_id] = ffmpeg
-
         # Snapshot chunks_streamed inside lock to prevent race conditions
         # Keep lock held during stream.start() to ensure player doesn't miss any chunks
         async with self._lock:
+            # (re)start the player specific ffmpeg process
+            await self._start_client_ffmpeg(airplay_player)
+
             # Calculate skip_seconds based on how many chunks have been sent
             skip_seconds = self.chunks_streamed
 
@@ -226,17 +185,22 @@ class AirPlayStreamSession:
         self._audio_source_task.cancel()
         with suppress(asyncio.CancelledError):
             await self._audio_source_task
-        # Set new audio source and restart the stream
-        self._audio_source = audio_source
-        self._audio_source_task = asyncio.create_task(self._audio_streamer())
         # Restart the (player-specific) ffmpeg stream for all players
         # This is the easiest way to ensure the new audio source is used
         # as quickly as possible, without waiting for the buffers to be drained
         # It also allows changing the player settings such as DSP on the fly
-        # for sync_client in self.sync_clients:
-        #     if not sync_client.stream:
-        #         continue  # guard
-        #     sync_client.stream.start_ffmpeg_stream()
+        async with self._lock, TaskManager(self.mass) as tm:
+            for sync_client in self.sync_clients:
+                if not sync_client.stream:
+                    continue  # guard
+                tm.create_task(self._start_client_ffmpeg(sync_client))
+        # Set new audio source and restart the stream
+        self._audio_source = audio_source
+        self._audio_source_task = asyncio.create_task(self._audio_streamer())
+
+        self.last_stream_started = time.time()
+        for sync_client in self.sync_clients:
+            sync_client.set_state_from_stream(state=None, elapsed_time=0)
 
     async def _audio_streamer(self) -> None:
         """Stream audio to all players."""
@@ -368,9 +332,6 @@ class AirPlayStreamSession:
         chunk_number = self.chunks_streamed + 1
         player_id = airplay_player.player_id
 
-        # Calculate chunk offset based on actual time vs start time
-        self._player_start_chunk.pop(player_id, None)
-
         # if the player has an associated FFMpeg instance, use that first
         if ffmpeg := self._player_ffmpeg.get(player_id):
             await ffmpeg.write(chunk)
@@ -416,3 +377,27 @@ class AirPlayStreamSession:
                 ],
                 return_exceptions=True,
             )
+
+    async def _start_client_ffmpeg(self, airplay_player: AirPlayPlayer) -> None:
+        """Start or restart the player's ffmpeg stream."""
+        # Clean up any existing FFmpeg instance for this player
+        if ffmpeg := self._player_ffmpeg.pop(airplay_player.player_id, None):
+            await ffmpeg.close()
+            del ffmpeg
+        assert airplay_player.stream  # for type checker
+        # Create optional FFMpeg instance per player if needed
+        filter_params = get_player_filter_params(
+            self.mass,
+            airplay_player.player_id,
+            self.pcm_format,
+            airplay_player.stream.pcm_format,
+        )
+        if filter_params or self.pcm_format != airplay_player.stream.pcm_format:
+            ffmpeg = FFMpeg(
+                audio_input="-",
+                input_format=self.pcm_format,
+                output_format=airplay_player.stream.pcm_format,
+                filter_params=filter_params,
+            )
+            await ffmpeg.start()
+            self._player_ffmpeg[airplay_player.player_id] = ffmpeg