Cleanup and fix start time calculation
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Sun, 2 Nov 2025 23:12:01 +0000 (00:12 +0100)
committerMarcel van der Veldt <m.vanderveldt@outlook.com>
Sun, 2 Nov 2025 23:12:01 +0000 (00:12 +0100)
music_assistant/providers/airplay/stream_session.py

index 29c039be0b88a54b81ddd05ab9024ad17cf40a35..7660487724f72135d05978d3ec9dc71da3ebb386 100644 (file)
@@ -3,7 +3,6 @@
 from __future__ import annotations
 
 import asyncio
-import logging
 import time
 from collections.abc import AsyncGenerator
 from contextlib import suppress
@@ -13,7 +12,7 @@ from music_assistant_models.enums import PlaybackState
 
 from music_assistant.helpers.audio import get_player_filter_params
 from music_assistant.helpers.ffmpeg import FFMpeg
-from music_assistant.helpers.util import TaskManager, close_async_generator
+from music_assistant.helpers.util import TaskManager
 from music_assistant.providers.airplay.helpers import unix_time_to_ntp
 
 from .constants import CONF_ENABLE_LATE_JOIN, ENABLE_LATE_JOIN_DEFAULT, StreamingProtocol
@@ -57,6 +56,7 @@ class AirPlayStreamSession:
         self._lock = asyncio.Lock()
         self.start_ntp: int = 0
         self.start_time: float = 0.0
+        self.wait_start: float = 0.0  # in seconds
         self.seconds_streamed: float = 0  # Total seconds sent to session
         # because we reuse an existing stream session for new play_media requests,
         # we need to track when the last stream was started
@@ -71,6 +71,7 @@ class AirPlayStreamSession:
         wait_start_seconds = wait_start / 1000
         self.wait_start = wait_start_seconds  # in seconds
         self.start_time = cur_time + wait_start_seconds
+        self.last_stream_started = self.start_time
         self.start_ntp = unix_time_to_ntp(self.start_time)
         self.prov.logger.debug(
             "Starting stream session with %d clients",
@@ -213,144 +214,120 @@ class AirPlayStreamSession:
 
     async def replace_stream(self, audio_source: AsyncGenerator[bytes, None]) -> None:
         """Replace the audio source of the stream."""
-        # Cancel the current audio source task
-        assert self._audio_source_task  # for type checker
-        self._audio_source_task.cancel()
-        with suppress(asyncio.CancelledError):
-            await self._audio_source_task
-        # Restart the (player-specific) ffmpeg stream for all players
-        # This is the easiest way to ensure the new audio source is used
-        # as quickly as possible, without waiting for the buffers to be drained
-        # It also allows changing the player settings such as DSP on the fly
-        async with self._lock, TaskManager(self.mass) as tm:
-            for sync_client in self.sync_clients:
-                if not sync_client.stream:
-                    continue  # guard
-                tm.create_task(self._start_client_ffmpeg(sync_client))
-        # Set new audio source and restart the stream
-        self._audio_source = audio_source
-        self._audio_source_task = asyncio.create_task(self._audio_streamer())
-        self.last_stream_started = time.time()
+        async with self._lock:
+            # Cancel the current audio source task
+            assert self._audio_source_task  # for type checker
+            self._audio_source_task.cancel()
+            # Set new audio source and restart the stream
+            self._audio_source = audio_source
+            self._audio_source_task = asyncio.create_task(self._audio_streamer())
+        self.last_stream_started = time.time() + self.wait_start
         for sync_client in self.sync_clients:
             sync_client.set_state_from_stream(state=None, elapsed_time=0)
+        # ensure we cleanly wait for the old audio source task to finish
+        with suppress(asyncio.CancelledError):
+            await self._audio_source_task
 
-    async def _audio_streamer(self) -> None:  # noqa: PLR0915
+    async def _audio_streamer(self) -> None:
         """Stream audio to all players."""
-        generator_exhausted = False
         _last_metadata: str | None = None
         pcm_sample_size = self.pcm_format.pcm_sample_size
         stream_start_time = time.time()
         first_chunk_received = False
-        try:
-            # each chunk is exactly one second of audio data based on the pcm format.
-            async for chunk in self._audio_source:
-                if first_chunk_received is False:
-                    first_chunk_received = True
+        # each chunk is exactly one second of audio data based on the pcm format.
+        async for chunk in self._audio_source:
+            if first_chunk_received is False:
+                first_chunk_received = True
+                self.prov.logger.debug(
+                    "First audio chunk received after %.3fs, "
+                    "which is %.3fs before scheduled start time",
+                    time.time() - stream_start_time,
+                    self.last_stream_started - time.time(),
+                )
+                # wait until the clients are ready to receive audio
+                await asyncio.wait_for(self._clients_ready.wait(), timeout=10)
+            async with self._lock:
+                sync_clients = [x for x in self.sync_clients if x.stream and x.stream.running]
+                if not sync_clients:
                     self.prov.logger.debug(
-                        "First audio chunk received after %.3fs, "
-                        "which is %.3fs before scheduled start time",
-                        time.time() - stream_start_time,
-                        time.time() - self.start_time,
+                        "Audio streamer exiting: No running clients left in session"
                     )
-                    # wait until the clients are ready to receive audio
-                    await asyncio.wait_for(self._clients_ready.wait(), timeout=10)
-                async with self._lock:
-                    sync_clients = [x for x in self.sync_clients if x.stream and x.stream.running]
-                    if not sync_clients:
-                        self.prov.logger.debug(
-                            "Audio streamer exiting: No running clients left in session"
+                    return
+
+                # Write chunk to all players
+                write_tasks = [
+                    self._write_chunk_to_player(x, chunk) for x in sync_clients if x.stream
+                ]
+                results = await asyncio.gather(*write_tasks, return_exceptions=True)
+
+                # Check for write errors or timeouts
+                players_to_remove: list[AirPlayPlayer] = []
+                for i, result in enumerate(results):
+                    if i >= len(sync_clients):
+                        continue
+                    player = sync_clients[i]
+
+                    if isinstance(result, asyncio.TimeoutError):
+                        self.prov.logger.error(
+                            "TIMEOUT writing chunk to player %s - REMOVING from sync group!",
+                            player.player_id,
                         )
-                        return
-
-                    # Write chunk to all players
-                    write_tasks = [
-                        self._write_chunk_to_player(x, chunk) for x in sync_clients if x.stream
-                    ]
-                    results = await asyncio.gather(*write_tasks, return_exceptions=True)
-
-                    # Check for write errors or timeouts
-                    players_to_remove: list[AirPlayPlayer] = []
-                    for i, result in enumerate(results):
-                        if i >= len(sync_clients):
-                            continue
-                        player = sync_clients[i]
-
-                        if isinstance(result, asyncio.TimeoutError):
-                            self.prov.logger.error(
-                                "TIMEOUT writing chunk to player %s - REMOVING from sync group!",
-                                player.player_id,
-                            )
-                            players_to_remove.append(player)
-                        elif isinstance(result, Exception):
-                            self.prov.logger.error(
-                                (
-                                    "Error writing chunk to player %s: %s - "
-                                    "REMOVING from sync group!"
-                                ),
-                                player.player_id,
-                                result,
-                            )
-                            players_to_remove.append(player)
-
-                    # Remove failed/timed-out players from sync group
-                    for player in players_to_remove:
-                        if player in self.sync_clients:
-                            self.sync_clients.remove(player)
-                            self.prov.logger.warning(
-                                "Player %s removed from sync group due to write failure/timeout",
-                                player.player_id,
-                            )
-                            # Stop the player's stream
-                            if player.stream:
-                                self.mass.create_task(player.stream.stop())
-
-                    # Update chunk counter (each chunk is exactly one second of audio)
-                    chunk_seconds = len(chunk) / pcm_sample_size
-                    self.seconds_streamed += chunk_seconds
-
-                # send metadata if changed
-                # do this in a separate task to not disturb audio streaming
-                # NOTE: we should probably move this out of the audio stream task into it's own task
-                metadata: PlayerMedia | None
-                if (
-                    self.sync_clients
-                    and (_leader := self.sync_clients[0])
-                    and (_leader.corrected_elapsed_time or 0) > 2
-                    and (metadata := _leader.current_media) is not None
-                ):
-                    now = time.time()
-                    metadata_checksum = f"{metadata.uri}.{metadata.title}.{metadata.image_url}"
-                    progress = int(metadata.corrected_elapsed_time or 0)
-                    if _last_metadata != metadata_checksum:
-                        _last_metadata = metadata_checksum
-                        prev_progress_report = now
-                        self.mass.create_task(self._send_metadata(progress, metadata))
-                    # send the progress report every 5 seconds
-                    elif now - prev_progress_report >= 5:
-                        prev_progress_report = now
-                        self.mass.create_task(self._send_metadata(progress, None))
-            # Entire stream consumed: send EOF
-            generator_exhausted = True
-            async with self._lock:
-                await asyncio.gather(
-                    *[
-                        self._write_eof_to_player(x)
-                        for x in self.sync_clients
-                        if x.stream and x.stream.running
-                    ],
-                    return_exceptions=True,
-                )
-        except Exception as err:
-            logger = self.prov.logger
-            logger.error(
-                "Stream error: %s",
-                str(err) or err.__class__.__name__,
-                exc_info=err if logger.isEnabledFor(logging.DEBUG) else None,
+                        players_to_remove.append(player)
+                    elif isinstance(result, Exception):
+                        self.prov.logger.error(
+                            ("Error writing chunk to player %s: %s - REMOVING from sync group!"),
+                            player.player_id,
+                            result,
+                        )
+                        players_to_remove.append(player)
+
+                # Remove failed/timed-out players from sync group
+                for player in players_to_remove:
+                    if player in self.sync_clients:
+                        self.sync_clients.remove(player)
+                        self.prov.logger.warning(
+                            "Player %s removed from sync group due to write failure/timeout",
+                            player.player_id,
+                        )
+                        # Stop the player's stream
+                        if player.stream:
+                            self.mass.create_task(player.stream.stop())
+
+                # Update chunk counter (each chunk is exactly one second of audio)
+                chunk_seconds = len(chunk) / pcm_sample_size
+                self.seconds_streamed += chunk_seconds
+
+            # send metadata if changed
+            # do this in a separate task to not disturb audio streaming
+            # NOTE: we should probably move this out of the audio stream task into it's own task
+            metadata: PlayerMedia | None
+            if (
+                self.sync_clients
+                and (_leader := self.sync_clients[0])
+                and (_leader.corrected_elapsed_time or 0) > 2
+                and (metadata := _leader.current_media) is not None
+            ):
+                now = time.time()
+                metadata_checksum = f"{metadata.uri}.{metadata.title}.{metadata.image_url}"
+                progress = int(metadata.corrected_elapsed_time or 0)
+                if _last_metadata != metadata_checksum:
+                    _last_metadata = metadata_checksum
+                    prev_progress_report = now
+                    self.mass.create_task(self._send_metadata(progress, metadata))
+                # send the progress report every 5 seconds
+                elif now - prev_progress_report >= 5:
+                    prev_progress_report = now
+                    self.mass.create_task(self._send_metadata(progress, None))
+        # Entire stream consumed: send EOF
+        async with self._lock:
+            await asyncio.gather(
+                *[
+                    self._write_eof_to_player(x)
+                    for x in self.sync_clients
+                    if x.stream and x.stream.running
+                ],
+                return_exceptions=True,
             )
-            raise
-        finally:
-            if not generator_exhausted:
-                await close_async_generator(self._audio_source)
 
     async def _write_chunk_to_player(self, airplay_player: AirPlayPlayer, chunk: bytes) -> None:
         """
@@ -431,7 +408,7 @@ class AirPlayStreamSession:
             output_format=airplay_player.stream.pcm_format,
             filter_params=filter_params,
             audio_output=airplay_player.stream.audio_pipe.path,
-            extra_input_args=["-y"],
+            extra_input_args=["-y", "-readrate", "1.0", "-readrate_initial_burst", "1.2"],
         )
         await ffmpeg.start()
         self._player_ffmpeg[airplay_player.player_id] = ffmpeg