from music_assistant_models.provider import ProviderManifest
from music_assistant.mass import MusicAssistant
-from music_assistant.providers.airplay.constants import CONF_ENABLE_LATE_JOIN
+from music_assistant.providers.airplay.constants import (
+ CONF_ENABLE_LATE_JOIN,
+ ENABLE_LATE_JOIN_DEFAULT,
+)
from .provider import AirPlayProvider
ConfigEntry(
key=CONF_ENABLE_LATE_JOIN,
type=ConfigEntryType.BOOLEAN,
- default_value=False,
+ default_value=ENABLE_LATE_JOIN_DEFAULT,
label="Enable late joining",
description=(
"Allow the player to join an existing AirPlay stream instead of "
- "starting a new one. \n NOTE: may not work in all conditions. "
- "If you experience issues or players are not fully in sync, disable this option."
+ "restarting the whole stream. \n NOTE: may not work in all conditions. "
+ "If you experience issues or players are not fully in sync, disable this option. \n"
+ "Also note that a late joining player may take a few seconds to catch up."
),
category="airplay",
),
and not self._cli_proc.closed
)
- async def start(self, start_ntp: int, skip: int = 0) -> None:
+ async def start(self, start_ntp: int) -> None:
"""Initialize CLIRaop process for a player."""
assert self.player.discovery_info is not None # for type checker
cli_binary = await get_cli_binary(self.player.protocol)
for prop in ("et", "md", "am", "pk", "pw"):
if prop_value := self.player.discovery_info.decoded_properties.get(prop):
extra_args += [f"-{prop}", prop_value]
- if skip > 0:
- extra_args += ["-skip", str(skip)]
sync_adjust = self.player.config.get_value(CONF_SYNC_ADJUST, 0)
assert isinstance(sync_adjust, int)
if device_password := self.mass.config.get_raw_player_config_value(
read_ahead = await self.mass.config.get_player_config_value(
player_id, CONF_READ_AHEAD_BUFFER
)
- self.player.logger.info("Starting cliraop with latency buffer: %dms", read_ahead)
# cliraop is the binary that handles the actual raop streaming to the player
# this is a slightly modified version of philippe44's libraop
from music_assistant.helpers.util import TaskManager, close_async_generator
from music_assistant.providers.airplay.helpers import unix_time_to_ntp
-from .constants import CONF_ENABLE_LATE_JOIN, StreamingProtocol
+from .constants import CONF_ENABLE_LATE_JOIN, ENABLE_LATE_JOIN_DEFAULT, StreamingProtocol
from .protocols.airplay2 import AirPlay2Stream
from .protocols.raop import RaopStream
# because we reuse an existing stream session for new play_media requests,
# we need to track when the last stream was started
self.last_stream_started: float = 0.0
+ self._clients_ready_event = asyncio.Event()
async def start(self) -> None:
"""Initialize stream session for all players."""
self.wait_start = wait_start_seconds # in seconds
self.start_time = cur_time + wait_start_seconds
self.start_ntp = unix_time_to_ntp(self.start_time)
- self.prov.logger.info(
+ self.prov.logger.debug(
"Starting stream session with %d clients",
len(self.sync_clients),
)
+ # Start audio source streamer task
+ # this will read from the audio source and distribute to all players
+ # we start this task early so it can buffer audio while players are starting
+ self._audio_source_task = asyncio.create_task(self._audio_streamer())
async def _start_client(airplay_player: AirPlayPlayer) -> None:
"""Start stream for a single client."""
async with TaskManager(self.mass) as tm:
for _airplay_player in self.sync_clients:
tm.create_task(_start_client(_airplay_player))
- # Start audio source streamer task
- # this will read from the audio source and distribute to all players
- self._audio_source_task = asyncio.create_task(self._audio_streamer())
+ # All clients started
+ self._clients_ready_event.set()
async def stop(self) -> None:
"""Stop playback and cleanup."""
if not sync_leader.stream or not sync_leader.stream.running:
return
- allow_late_join = self.prov.config.get_value(CONF_ENABLE_LATE_JOIN, False)
- allow_late_join = False # TODO: disable late join for now until we can test it properly
+ allow_late_join = self.prov.config.get_value(
+ CONF_ENABLE_LATE_JOIN, ENABLE_LATE_JOIN_DEFAULT
+ )
if not allow_late_join:
# Late joining is not allowed - restart the session for all players
await self.stop() # we need to stop the current session to add a new client
self.mass.players.cmd_resume(sync_leader.player_id),
task_id=f"resync_session_{sync_leader.player_id}",
)
+ return
# Stop existing stream if the player is already streaming
+ # should not happen, but guard just in case
if airplay_player.stream and airplay_player.stream.running:
await airplay_player.stream.stop()
# Calculate skip_seconds based on how many chunks have been sent
skip_seconds = self.chunks_streamed
-
+ # Start the stream at compensated NTP timestamp
+ start_at = self.start_time + skip_seconds
+ start_ntp = unix_time_to_ntp(start_at)
+ self.prov.logger.debug(
+ "Adding late joiner %s to session, playback starts %.3fs from now",
+ airplay_player.player_id,
+ start_at - time.time(),
+ )
# Add player to sync clients list
if airplay_player not in self.sync_clients:
self.sync_clients.append(airplay_player)
- await airplay_player.stream.start(self.start_ntp, skip_seconds)
+ await airplay_player.stream.start(start_ntp)
async def replace_stream(self, audio_source: AsyncGenerator[bytes, None]) -> None:
"""Replace the audio source of the stream."""
generator_exhausted = False
_last_metadata: str | None = None
chunk_size = self.pcm_format.pcm_sample_size
+ stream_start_time = time.time()
+ first_chunk_received = False
try:
# each chunk is exactly one second of audio data based on the pcm format.
async for chunk in self._audio_source:
len(chunk),
chunk_size,
)
+ if first_chunk_received is False:
+ first_chunk_received = True
+ self.prov.logger.debug(
+ "First audio chunk received after %.3fs, which is %.3fs before scheduled start time",
+ time.time() - stream_start_time,
+ time.time() - self.start_time,
+ )
+ # wait until the clients are ready to receive audio
+ await asyncio.wait_for(self._clients_ready_event.wait(), timeout=10)
async with self._lock:
sync_clients = [x for x in self.sync_clients if x.stream and x.stream.running]
if not sync_clients: