Add late joining support to AirPlay
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Sat, 1 Nov 2025 23:34:27 +0000 (00:34 +0100)
committerMarcel van der Veldt <m.vanderveldt@outlook.com>
Sat, 1 Nov 2025 23:34:27 +0000 (00:34 +0100)
music_assistant/providers/airplay/__init__.py
music_assistant/providers/airplay/constants.py
music_assistant/providers/airplay/protocols/_protocol.py
music_assistant/providers/airplay/protocols/raop.py
music_assistant/providers/airplay/stream_session.py

index 753700fe41195313cc13568521713ddc5aee4ec5..9413a4ea3902a2611a1158389de7c4e5b791520d 100644 (file)
@@ -9,7 +9,10 @@ from music_assistant_models.enums import ConfigEntryType, ProviderFeature
 from music_assistant_models.provider import ProviderManifest
 
 from music_assistant.mass import MusicAssistant
-from music_assistant.providers.airplay.constants import CONF_ENABLE_LATE_JOIN
+from music_assistant.providers.airplay.constants import (
+    CONF_ENABLE_LATE_JOIN,
+    ENABLE_LATE_JOIN_DEFAULT,
+)
 
 from .provider import AirPlayProvider
 
@@ -42,12 +45,13 @@ async def get_config_entries(
         ConfigEntry(
             key=CONF_ENABLE_LATE_JOIN,
             type=ConfigEntryType.BOOLEAN,
-            default_value=False,
+            default_value=ENABLE_LATE_JOIN_DEFAULT,
             label="Enable late joining",
             description=(
                 "Allow the player to join an existing AirPlay stream instead of "
-                "starting a new one. \n NOTE: may not work in all conditions. "
-                "If you experience issues or players are not fully in sync, disable this option."
+                "restarting the whole stream. \n NOTE: may not work in all conditions. "
+                "If you experience issues or players are not fully in sync, disable this option. \n"
+                "Also note that a late joining player may take a few seconds to catch up."
             ),
             category="airplay",
         ),
index 473604b1440e2cbd3a574de53f5aea68449547ab..b03dcf0613e4458610c6570a5a5100724e981214 100644 (file)
@@ -48,6 +48,7 @@ CONF_ENABLE_LATE_JOIN: Final[str] = "enable_late_join"
 
 BACKOFF_TIME_LOWER_LIMIT: Final[int] = 15  # seconds
 BACKOFF_TIME_UPPER_LIMIT: Final[int] = 300  # Five minutes
+ENABLE_LATE_JOIN_DEFAULT: Final[bool] = True
 
 FALLBACK_VOLUME: Final[int] = 20
 
index 456363519a73b17c5e84932bc1fa2ed7010ac197..cc72940f24dd4bad126b8fcc1f5f654b195fe4e6 100644 (file)
@@ -81,12 +81,11 @@ class AirPlayProtocol(ABC):
         )
 
     @abstractmethod
-    async def start(self, start_ntp: int, skip: int = 0) -> None:
+    async def start(self, start_ntp: int) -> None:
         """Initialize streaming process for the player.
 
         Args:
             start_ntp: NTP timestamp to start streaming
-            skip: Number of seconds to skip (for late joiners)
         """
 
     async def stop(self) -> None:
index 968d5869a75b823a50f4dde6dbcf94e43c7bd291..24bf3a49396d845cb6b3052506f20841efd90265 100644 (file)
@@ -49,7 +49,7 @@ class RaopStream(AirPlayProtocol):
             and not self._cli_proc.closed
         )
 
-    async def start(self, start_ntp: int, skip: int = 0) -> None:
+    async def start(self, start_ntp: int) -> None:
         """Initialize CLIRaop process for a player."""
         assert self.player.discovery_info is not None  # for type checker
         cli_binary = await get_cli_binary(self.player.protocol)
@@ -63,8 +63,6 @@ class RaopStream(AirPlayProtocol):
         for prop in ("et", "md", "am", "pk", "pw"):
             if prop_value := self.player.discovery_info.decoded_properties.get(prop):
                 extra_args += [f"-{prop}", prop_value]
-        if skip > 0:
-            extra_args += ["-skip", str(skip)]
         sync_adjust = self.player.config.get_value(CONF_SYNC_ADJUST, 0)
         assert isinstance(sync_adjust, int)
         if device_password := self.mass.config.get_raw_player_config_value(
@@ -81,7 +79,6 @@ class RaopStream(AirPlayProtocol):
         read_ahead = await self.mass.config.get_player_config_value(
             player_id, CONF_READ_AHEAD_BUFFER
         )
-        self.player.logger.info("Starting cliraop with latency buffer: %dms", read_ahead)
 
         # cliraop is the binary that handles the actual raop streaming to the player
         # this is a slightly modified version of philippe44's libraop
index bddd1f32499b0c41291a5ec62a4a1bc112dd151c..22778b74765527367c578cfaba5a5db02a38e80e 100644 (file)
@@ -16,7 +16,7 @@ from music_assistant.helpers.ffmpeg import FFMpeg
 from music_assistant.helpers.util import TaskManager, close_async_generator
 from music_assistant.providers.airplay.helpers import unix_time_to_ntp
 
-from .constants import CONF_ENABLE_LATE_JOIN, StreamingProtocol
+from .constants import CONF_ENABLE_LATE_JOIN, ENABLE_LATE_JOIN_DEFAULT, StreamingProtocol
 from .protocols.airplay2 import AirPlay2Stream
 from .protocols.raop import RaopStream
 
@@ -61,6 +61,7 @@ class AirPlayStreamSession:
         # because we reuse an existing stream session for new play_media requests,
         # we need to track when the last stream was started
         self.last_stream_started: float = 0.0
+        self._clients_ready_event = asyncio.Event()
 
     async def start(self) -> None:
         """Initialize stream session for all players."""
@@ -71,10 +72,14 @@ class AirPlayStreamSession:
         self.wait_start = wait_start_seconds  # in seconds
         self.start_time = cur_time + wait_start_seconds
         self.start_ntp = unix_time_to_ntp(self.start_time)
-        self.prov.logger.info(
+        self.prov.logger.debug(
             "Starting stream session with %d clients",
             len(self.sync_clients),
         )
+        # Start audio source streamer task
+        # this will read from the audio source and distribute to all players
+        # we start this task early so it can buffer audio while players are starting
+        self._audio_source_task = asyncio.create_task(self._audio_streamer())
 
         async def _start_client(airplay_player: AirPlayPlayer) -> None:
             """Start stream for a single client."""
@@ -108,9 +113,8 @@ class AirPlayStreamSession:
         async with TaskManager(self.mass) as tm:
             for _airplay_player in self.sync_clients:
                 tm.create_task(_start_client(_airplay_player))
-        # Start audio source streamer task
-        # this will read from the audio source and distribute to all players
-        self._audio_source_task = asyncio.create_task(self._audio_streamer())
+        # All clients started
+        self._clients_ready_event.set()
 
     async def stop(self) -> None:
         """Stop playback and cleanup."""
@@ -153,8 +157,9 @@ class AirPlayStreamSession:
         if not sync_leader.stream or not sync_leader.stream.running:
             return
 
-        allow_late_join = self.prov.config.get_value(CONF_ENABLE_LATE_JOIN, False)
-        allow_late_join = False  # TODO: disable late join for now until we can test it properly
+        allow_late_join = self.prov.config.get_value(
+            CONF_ENABLE_LATE_JOIN, ENABLE_LATE_JOIN_DEFAULT
+        )
         if not allow_late_join:
             # Late joining is not allowed - restart the session for all players
             await self.stop()  # we need to stop the current session to add a new client
@@ -166,8 +171,10 @@ class AirPlayStreamSession:
                     self.mass.players.cmd_resume(sync_leader.player_id),
                     task_id=f"resync_session_{sync_leader.player_id}",
                 )
+            return
 
         # Stop existing stream if the player is already streaming
+        # should not happen, but guard just in case
         if airplay_player.stream and airplay_player.stream.running:
             await airplay_player.stream.stop()
 
@@ -188,12 +195,19 @@ class AirPlayStreamSession:
 
             # Calculate skip_seconds based on how many chunks have been sent
             skip_seconds = self.chunks_streamed
-
+            # Start the stream at compensated NTP timestamp
+            start_at = self.start_time + skip_seconds
+            start_ntp = unix_time_to_ntp(start_at)
+            self.prov.logger.debug(
+                "Adding late joiner %s to session, playback starts %.3fs from now",
+                airplay_player.player_id,
+                start_at - time.time(),
+            )
             # Add player to sync clients list
             if airplay_player not in self.sync_clients:
                 self.sync_clients.append(airplay_player)
 
-            await airplay_player.stream.start(self.start_ntp, skip_seconds)
+            await airplay_player.stream.start(start_ntp)
 
     async def replace_stream(self, audio_source: AsyncGenerator[bytes, None]) -> None:
         """Replace the audio source of the stream."""
@@ -223,6 +237,8 @@ class AirPlayStreamSession:
         generator_exhausted = False
         _last_metadata: str | None = None
         chunk_size = self.pcm_format.pcm_sample_size
+        stream_start_time = time.time()
+        first_chunk_received = False
         try:
             # each chunk is exactly one second of audio data based on the pcm format.
             async for chunk in self._audio_source:
@@ -233,6 +249,15 @@ class AirPlayStreamSession:
                         len(chunk),
                         chunk_size,
                     )
+                if first_chunk_received is False:
+                    first_chunk_received = True
+                    self.prov.logger.debug(
+                        "First audio chunk received after %.3fs, which is %.3fs before scheduled start time",
+                        time.time() - stream_start_time,
+                        time.time() - self.start_time,
+                    )
+                    # wait until the clients are ready to receive audio
+                    await asyncio.wait_for(self._clients_ready_event.wait(), timeout=10)
                 async with self._lock:
                     sync_clients = [x for x in self.sync_clients if x.stream and x.stream.running]
                     if not sync_clients: