From: Marcel van der Veldt Date: Fri, 31 Oct 2025 03:10:21 +0000 (+0100) Subject: Some more cleanup to airplay provider X-Git-Url: https://git.kitaultman.com/?a=commitdiff_plain;h=139c0118b293c8a3b1c5799f700b8708f77ac03b;p=music-assistant-server.git Some more cleanup to airplay provider --- diff --git a/music_assistant/providers/airplay/stream_session.py b/music_assistant/providers/airplay/stream_session.py index 5e89793c..2d1c1ccc 100644 --- a/music_assistant/providers/airplay/stream_session.py +++ b/music_assistant/providers/airplay/stream_session.py @@ -52,11 +52,13 @@ class AirPlayStreamSession: self._audio_source = audio_source self._audio_source_task: asyncio.Task[None] | None = None self._player_ffmpeg: dict[str, FFMpeg] = {} - self._player_start_chunk: dict[str, int] = {} # Chunk number when player joined self._lock = asyncio.Lock() self.start_ntp: int = 0 self.start_time: float = 0.0 self.chunks_streamed: int = 0 # Total chunks sent to session (each chunk = 1 second) + # because we reuse an existing stream session for new play_media requests, + # we need to track when the last stream was started + self.last_stream_started: float = 0.0 async def start(self) -> None: """Initialize stream session for all players.""" @@ -78,11 +80,6 @@ class AirPlayStreamSession: # Stop existing stream if running if airplay_player.stream and airplay_player.stream.running: await airplay_player.stream.stop() - if ffmpeg := self._player_ffmpeg.pop(airplay_player.player_id, None): - await ffmpeg.close() - del ffmpeg - - self._player_start_chunk[airplay_player.player_id] = 1 # Create appropriate stream type based on protocol if airplay_player.protocol == StreamingProtocol.AIRPLAY2: @@ -90,28 +87,11 @@ class AirPlayStreamSession: else: airplay_player.stream = RaopStream(self, airplay_player) - # create optional FFMpeg instance per player if needed - # this is used to do any optional DSP processing/filtering - filter_params = get_player_filter_params( - self.mass, - airplay_player.player_id, - self.pcm_format, - airplay_player.stream.pcm_format, - ) - if filter_params or self.pcm_format != airplay_player.stream.pcm_format: - ffmpeg = FFMpeg( - audio_input="-", - input_format=self.pcm_format, - output_format=airplay_player.stream.pcm_format, - filter_params=filter_params, - ) - await ffmpeg.start() - self._player_ffmpeg[airplay_player.player_id] = ffmpeg + await self._start_client_ffmpeg(airplay_player) + # start the stream await airplay_player.stream.start(self.start_ntp) - # Tracking will be initialized on first write - async with TaskManager(self.mass) as tm: for _airplay_player in self.sync_clients: tm.create_task(_start_client(_airplay_player)) @@ -141,8 +121,6 @@ class AirPlayStreamSession: if ffmpeg := self._player_ffmpeg.pop(airplay_player.player_id, None): await ffmpeg.close() del ffmpeg - # Clean up player tracking - self._player_start_chunk.pop(airplay_player.player_id, None) await airplay_player.stream.stop() airplay_player.stream = None # If this was the last client, stop the session @@ -179,37 +157,18 @@ class AirPlayStreamSession: if airplay_player.stream and airplay_player.stream.running: await airplay_player.stream.stop() - # Clean up any existing FFmpeg instance for this player - if ffmpeg := self._player_ffmpeg.pop(airplay_player.player_id, None): - await ffmpeg.close() - del ffmpeg - # Create appropriate stream type based on protocol if airplay_player.protocol == StreamingProtocol.AIRPLAY2: airplay_player.stream = AirPlay2Stream(self, airplay_player) else: airplay_player.stream = RaopStream(self, airplay_player) - # Create optional FFMpeg instance per player if needed - filter_params = get_player_filter_params( - self.mass, - airplay_player.player_id, - self.pcm_format, - airplay_player.stream.pcm_format, - ) - if filter_params or self.pcm_format != airplay_player.stream.pcm_format: - ffmpeg = FFMpeg( - audio_input="-", - input_format=self.pcm_format, - output_format=airplay_player.stream.pcm_format, - filter_params=filter_params, - ) - await ffmpeg.start() - self._player_ffmpeg[airplay_player.player_id] = ffmpeg - # Snapshot chunks_streamed inside lock to prevent race conditions # Keep lock held during stream.start() to ensure player doesn't miss any chunks async with self._lock: + # (re)start the player specific ffmpeg process + await self._start_client_ffmpeg(airplay_player) + # Calculate skip_seconds based on how many chunks have been sent skip_seconds = self.chunks_streamed @@ -226,17 +185,22 @@ class AirPlayStreamSession: self._audio_source_task.cancel() with suppress(asyncio.CancelledError): await self._audio_source_task - # Set new audio source and restart the stream - self._audio_source = audio_source - self._audio_source_task = asyncio.create_task(self._audio_streamer()) # Restart the (player-specific) ffmpeg stream for all players # This is the easiest way to ensure the new audio source is used # as quickly as possible, without waiting for the buffers to be drained # It also allows changing the player settings such as DSP on the fly - # for sync_client in self.sync_clients: - # if not sync_client.stream: - # continue # guard - # sync_client.stream.start_ffmpeg_stream() + async with self._lock, TaskManager(self.mass) as tm: + for sync_client in self.sync_clients: + if not sync_client.stream: + continue # guard + tm.create_task(self._start_client_ffmpeg(sync_client)) + # Set new audio source and restart the stream + self._audio_source = audio_source + self._audio_source_task = asyncio.create_task(self._audio_streamer()) + + self.last_stream_started = time.time() + for sync_client in self.sync_clients: + sync_client.set_state_from_stream(state=None, elapsed_time=0) async def _audio_streamer(self) -> None: """Stream audio to all players.""" @@ -368,9 +332,6 @@ class AirPlayStreamSession: chunk_number = self.chunks_streamed + 1 player_id = airplay_player.player_id - # Calculate chunk offset based on actual time vs start time - self._player_start_chunk.pop(player_id, None) - # if the player has an associated FFMpeg instance, use that first if ffmpeg := self._player_ffmpeg.get(player_id): await ffmpeg.write(chunk) @@ -416,3 +377,27 @@ class AirPlayStreamSession: ], return_exceptions=True, ) + + async def _start_client_ffmpeg(self, airplay_player: AirPlayPlayer) -> None: + """Start or restart the player's ffmpeg stream.""" + # Clean up any existing FFmpeg instance for this player + if ffmpeg := self._player_ffmpeg.pop(airplay_player.player_id, None): + await ffmpeg.close() + del ffmpeg + assert airplay_player.stream # for type checker + # Create optional FFMpeg instance per player if needed + filter_params = get_player_filter_params( + self.mass, + airplay_player.player_id, + self.pcm_format, + airplay_player.stream.pcm_format, + ) + if filter_params or self.pcm_format != airplay_player.stream.pcm_format: + ffmpeg = FFMpeg( + audio_input="-", + input_format=self.pcm_format, + output_format=airplay_player.stream.pcm_format, + filter_params=filter_params, + ) + await ffmpeg.start() + self._player_ffmpeg[airplay_player.player_id] = ffmpeg