From: Marcel van der Veldt Date: Sat, 1 Nov 2025 23:34:27 +0000 (+0100) Subject: Add late joining support to AirPlay X-Git-Url: https://git.kitaultman.com/?a=commitdiff_plain;h=0d7e2b418de10ad623f080c48b784336a06fbdca;p=music-assistant-server.git Add late joining support to AirPlay --- diff --git a/music_assistant/providers/airplay/__init__.py b/music_assistant/providers/airplay/__init__.py index 753700fe..9413a4ea 100644 --- a/music_assistant/providers/airplay/__init__.py +++ b/music_assistant/providers/airplay/__init__.py @@ -9,7 +9,10 @@ from music_assistant_models.enums import ConfigEntryType, ProviderFeature from music_assistant_models.provider import ProviderManifest from music_assistant.mass import MusicAssistant -from music_assistant.providers.airplay.constants import CONF_ENABLE_LATE_JOIN +from music_assistant.providers.airplay.constants import ( + CONF_ENABLE_LATE_JOIN, + ENABLE_LATE_JOIN_DEFAULT, +) from .provider import AirPlayProvider @@ -42,12 +45,13 @@ async def get_config_entries( ConfigEntry( key=CONF_ENABLE_LATE_JOIN, type=ConfigEntryType.BOOLEAN, - default_value=False, + default_value=ENABLE_LATE_JOIN_DEFAULT, label="Enable late joining", description=( "Allow the player to join an existing AirPlay stream instead of " - "starting a new one. \n NOTE: may not work in all conditions. " - "If you experience issues or players are not fully in sync, disable this option." + "restarting the whole stream. \n NOTE: may not work in all conditions. " + "If you experience issues or players are not fully in sync, disable this option. \n" + "Also note that a late joining player may take a few seconds to catch up." ), category="airplay", ), diff --git a/music_assistant/providers/airplay/constants.py b/music_assistant/providers/airplay/constants.py index 473604b1..b03dcf06 100644 --- a/music_assistant/providers/airplay/constants.py +++ b/music_assistant/providers/airplay/constants.py @@ -48,6 +48,7 @@ CONF_ENABLE_LATE_JOIN: Final[str] = "enable_late_join" BACKOFF_TIME_LOWER_LIMIT: Final[int] = 15 # seconds BACKOFF_TIME_UPPER_LIMIT: Final[int] = 300 # Five minutes +ENABLE_LATE_JOIN_DEFAULT: Final[bool] = True FALLBACK_VOLUME: Final[int] = 20 diff --git a/music_assistant/providers/airplay/protocols/_protocol.py b/music_assistant/providers/airplay/protocols/_protocol.py index 45636351..cc72940f 100644 --- a/music_assistant/providers/airplay/protocols/_protocol.py +++ b/music_assistant/providers/airplay/protocols/_protocol.py @@ -81,12 +81,11 @@ class AirPlayProtocol(ABC): ) @abstractmethod - async def start(self, start_ntp: int, skip: int = 0) -> None: + async def start(self, start_ntp: int) -> None: """Initialize streaming process for the player. Args: start_ntp: NTP timestamp to start streaming - skip: Number of seconds to skip (for late joiners) """ async def stop(self) -> None: diff --git a/music_assistant/providers/airplay/protocols/raop.py b/music_assistant/providers/airplay/protocols/raop.py index 968d5869..24bf3a49 100644 --- a/music_assistant/providers/airplay/protocols/raop.py +++ b/music_assistant/providers/airplay/protocols/raop.py @@ -49,7 +49,7 @@ class RaopStream(AirPlayProtocol): and not self._cli_proc.closed ) - async def start(self, start_ntp: int, skip: int = 0) -> None: + async def start(self, start_ntp: int) -> None: """Initialize CLIRaop process for a player.""" assert self.player.discovery_info is not None # for type checker cli_binary = await get_cli_binary(self.player.protocol) @@ -63,8 +63,6 @@ class RaopStream(AirPlayProtocol): for prop in ("et", "md", "am", "pk", "pw"): if prop_value := self.player.discovery_info.decoded_properties.get(prop): extra_args += [f"-{prop}", prop_value] - if skip > 0: - extra_args += ["-skip", str(skip)] sync_adjust = self.player.config.get_value(CONF_SYNC_ADJUST, 0) assert isinstance(sync_adjust, int) if device_password := self.mass.config.get_raw_player_config_value( @@ -81,7 +79,6 @@ class RaopStream(AirPlayProtocol): read_ahead = await self.mass.config.get_player_config_value( player_id, CONF_READ_AHEAD_BUFFER ) - self.player.logger.info("Starting cliraop with latency buffer: %dms", read_ahead) # cliraop is the binary that handles the actual raop streaming to the player # this is a slightly modified version of philippe44's libraop diff --git a/music_assistant/providers/airplay/stream_session.py b/music_assistant/providers/airplay/stream_session.py index bddd1f32..22778b74 100644 --- a/music_assistant/providers/airplay/stream_session.py +++ b/music_assistant/providers/airplay/stream_session.py @@ -16,7 +16,7 @@ from music_assistant.helpers.ffmpeg import FFMpeg from music_assistant.helpers.util import TaskManager, close_async_generator from music_assistant.providers.airplay.helpers import unix_time_to_ntp -from .constants import CONF_ENABLE_LATE_JOIN, StreamingProtocol +from .constants import CONF_ENABLE_LATE_JOIN, ENABLE_LATE_JOIN_DEFAULT, StreamingProtocol from .protocols.airplay2 import AirPlay2Stream from .protocols.raop import RaopStream @@ -61,6 +61,7 @@ class AirPlayStreamSession: # because we reuse an existing stream session for new play_media requests, # we need to track when the last stream was started self.last_stream_started: float = 0.0 + self._clients_ready_event = asyncio.Event() async def start(self) -> None: """Initialize stream session for all players.""" @@ -71,10 +72,14 @@ class AirPlayStreamSession: self.wait_start = wait_start_seconds # in seconds self.start_time = cur_time + wait_start_seconds self.start_ntp = unix_time_to_ntp(self.start_time) - self.prov.logger.info( + self.prov.logger.debug( "Starting stream session with %d clients", len(self.sync_clients), ) + # Start audio source streamer task + # this will read from the audio source and distribute to all players + # we start this task early so it can buffer audio while players are starting + self._audio_source_task = asyncio.create_task(self._audio_streamer()) async def _start_client(airplay_player: AirPlayPlayer) -> None: """Start stream for a single client.""" @@ -108,9 +113,8 @@ class AirPlayStreamSession: async with TaskManager(self.mass) as tm: for _airplay_player in self.sync_clients: tm.create_task(_start_client(_airplay_player)) - # Start audio source streamer task - # this will read from the audio source and distribute to all players - self._audio_source_task = asyncio.create_task(self._audio_streamer()) + # All clients started + self._clients_ready_event.set() async def stop(self) -> None: """Stop playback and cleanup.""" @@ -153,8 +157,9 @@ class AirPlayStreamSession: if not sync_leader.stream or not sync_leader.stream.running: return - allow_late_join = self.prov.config.get_value(CONF_ENABLE_LATE_JOIN, False) - allow_late_join = False # TODO: disable late join for now until we can test it properly + allow_late_join = self.prov.config.get_value( + CONF_ENABLE_LATE_JOIN, ENABLE_LATE_JOIN_DEFAULT + ) if not allow_late_join: # Late joining is not allowed - restart the session for all players await self.stop() # we need to stop the current session to add a new client @@ -166,8 +171,10 @@ class AirPlayStreamSession: self.mass.players.cmd_resume(sync_leader.player_id), task_id=f"resync_session_{sync_leader.player_id}", ) + return # Stop existing stream if the player is already streaming + # should not happen, but guard just in case if airplay_player.stream and airplay_player.stream.running: await airplay_player.stream.stop() @@ -188,12 +195,19 @@ class AirPlayStreamSession: # Calculate skip_seconds based on how many chunks have been sent skip_seconds = self.chunks_streamed - + # Start the stream at compensated NTP timestamp + start_at = self.start_time + skip_seconds + start_ntp = unix_time_to_ntp(start_at) + self.prov.logger.debug( + "Adding late joiner %s to session, playback starts %.3fs from now", + airplay_player.player_id, + start_at - time.time(), + ) # Add player to sync clients list if airplay_player not in self.sync_clients: self.sync_clients.append(airplay_player) - await airplay_player.stream.start(self.start_ntp, skip_seconds) + await airplay_player.stream.start(start_ntp) async def replace_stream(self, audio_source: AsyncGenerator[bytes, None]) -> None: """Replace the audio source of the stream.""" @@ -223,6 +237,8 @@ class AirPlayStreamSession: generator_exhausted = False _last_metadata: str | None = None chunk_size = self.pcm_format.pcm_sample_size + stream_start_time = time.time() + first_chunk_received = False try: # each chunk is exactly one second of audio data based on the pcm format. async for chunk in self._audio_source: @@ -233,6 +249,15 @@ class AirPlayStreamSession: len(chunk), chunk_size, ) + if first_chunk_received is False: + first_chunk_received = True + self.prov.logger.debug( + "First audio chunk received after %.3fs, which is %.3fs before scheduled start time", + time.time() - stream_start_time, + time.time() - self.start_time, + ) + # wait until the clients are ready to receive audio + await asyncio.wait_for(self._clients_ready_event.wait(), timeout=10) async with self._lock: sync_clients = [x for x in self.sync_clients if x.stream and x.stream.running] if not sync_clients: