From: Marcel van der Veldt Date: Sun, 2 Nov 2025 23:12:01 +0000 (+0100) Subject: Cleanup and fix start time calculation X-Git-Url: https://git.kitaultman.com/?a=commitdiff_plain;h=c36cbc5d101451c3091fb3788839126ef8703800;p=music-assistant-server.git Cleanup and fix start time calculation --- diff --git a/music_assistant/providers/airplay/stream_session.py b/music_assistant/providers/airplay/stream_session.py index 29c039be..76604877 100644 --- a/music_assistant/providers/airplay/stream_session.py +++ b/music_assistant/providers/airplay/stream_session.py @@ -3,7 +3,6 @@ from __future__ import annotations import asyncio -import logging import time from collections.abc import AsyncGenerator from contextlib import suppress @@ -13,7 +12,7 @@ from music_assistant_models.enums import PlaybackState from music_assistant.helpers.audio import get_player_filter_params from music_assistant.helpers.ffmpeg import FFMpeg -from music_assistant.helpers.util import TaskManager, close_async_generator +from music_assistant.helpers.util import TaskManager from music_assistant.providers.airplay.helpers import unix_time_to_ntp from .constants import CONF_ENABLE_LATE_JOIN, ENABLE_LATE_JOIN_DEFAULT, StreamingProtocol @@ -57,6 +56,7 @@ class AirPlayStreamSession: self._lock = asyncio.Lock() self.start_ntp: int = 0 self.start_time: float = 0.0 + self.wait_start: float = 0.0 # in seconds self.seconds_streamed: float = 0 # Total seconds sent to session # because we reuse an existing stream session for new play_media requests, # we need to track when the last stream was started @@ -71,6 +71,7 @@ class AirPlayStreamSession: wait_start_seconds = wait_start / 1000 self.wait_start = wait_start_seconds # in seconds self.start_time = cur_time + wait_start_seconds + self.last_stream_started = self.start_time self.start_ntp = unix_time_to_ntp(self.start_time) self.prov.logger.debug( "Starting stream session with %d clients", @@ -213,144 +214,120 @@ class AirPlayStreamSession: async def replace_stream(self, audio_source: AsyncGenerator[bytes, None]) -> None: """Replace the audio source of the stream.""" - # Cancel the current audio source task - assert self._audio_source_task # for type checker - self._audio_source_task.cancel() - with suppress(asyncio.CancelledError): - await self._audio_source_task - # Restart the (player-specific) ffmpeg stream for all players - # This is the easiest way to ensure the new audio source is used - # as quickly as possible, without waiting for the buffers to be drained - # It also allows changing the player settings such as DSP on the fly - async with self._lock, TaskManager(self.mass) as tm: - for sync_client in self.sync_clients: - if not sync_client.stream: - continue # guard - tm.create_task(self._start_client_ffmpeg(sync_client)) - # Set new audio source and restart the stream - self._audio_source = audio_source - self._audio_source_task = asyncio.create_task(self._audio_streamer()) - self.last_stream_started = time.time() + async with self._lock: + # Cancel the current audio source task + assert self._audio_source_task # for type checker + self._audio_source_task.cancel() + # Set new audio source and restart the stream + self._audio_source = audio_source + self._audio_source_task = asyncio.create_task(self._audio_streamer()) + self.last_stream_started = time.time() + self.wait_start for sync_client in self.sync_clients: sync_client.set_state_from_stream(state=None, elapsed_time=0) + # ensure we cleanly wait for the old audio source task to finish + with suppress(asyncio.CancelledError): + await self._audio_source_task - async def _audio_streamer(self) -> None: # noqa: PLR0915 + async def _audio_streamer(self) -> None: """Stream audio to all players.""" - generator_exhausted = False _last_metadata: str | None = None pcm_sample_size = self.pcm_format.pcm_sample_size stream_start_time = time.time() first_chunk_received = False - try: - # each chunk is exactly one second of audio data based on the pcm format. - async for chunk in self._audio_source: - if first_chunk_received is False: - first_chunk_received = True + # each chunk is exactly one second of audio data based on the pcm format. + async for chunk in self._audio_source: + if first_chunk_received is False: + first_chunk_received = True + self.prov.logger.debug( + "First audio chunk received after %.3fs, " + "which is %.3fs before scheduled start time", + time.time() - stream_start_time, + self.last_stream_started - time.time(), + ) + # wait until the clients are ready to receive audio + await asyncio.wait_for(self._clients_ready.wait(), timeout=10) + async with self._lock: + sync_clients = [x for x in self.sync_clients if x.stream and x.stream.running] + if not sync_clients: self.prov.logger.debug( - "First audio chunk received after %.3fs, " - "which is %.3fs before scheduled start time", - time.time() - stream_start_time, - time.time() - self.start_time, + "Audio streamer exiting: No running clients left in session" ) - # wait until the clients are ready to receive audio - await asyncio.wait_for(self._clients_ready.wait(), timeout=10) - async with self._lock: - sync_clients = [x for x in self.sync_clients if x.stream and x.stream.running] - if not sync_clients: - self.prov.logger.debug( - "Audio streamer exiting: No running clients left in session" + return + + # Write chunk to all players + write_tasks = [ + self._write_chunk_to_player(x, chunk) for x in sync_clients if x.stream + ] + results = await asyncio.gather(*write_tasks, return_exceptions=True) + + # Check for write errors or timeouts + players_to_remove: list[AirPlayPlayer] = [] + for i, result in enumerate(results): + if i >= len(sync_clients): + continue + player = sync_clients[i] + + if isinstance(result, asyncio.TimeoutError): + self.prov.logger.error( + "TIMEOUT writing chunk to player %s - REMOVING from sync group!", + player.player_id, ) - return - - # Write chunk to all players - write_tasks = [ - self._write_chunk_to_player(x, chunk) for x in sync_clients if x.stream - ] - results = await asyncio.gather(*write_tasks, return_exceptions=True) - - # Check for write errors or timeouts - players_to_remove: list[AirPlayPlayer] = [] - for i, result in enumerate(results): - if i >= len(sync_clients): - continue - player = sync_clients[i] - - if isinstance(result, asyncio.TimeoutError): - self.prov.logger.error( - "TIMEOUT writing chunk to player %s - REMOVING from sync group!", - player.player_id, - ) - players_to_remove.append(player) - elif isinstance(result, Exception): - self.prov.logger.error( - ( - "Error writing chunk to player %s: %s - " - "REMOVING from sync group!" - ), - player.player_id, - result, - ) - players_to_remove.append(player) - - # Remove failed/timed-out players from sync group - for player in players_to_remove: - if player in self.sync_clients: - self.sync_clients.remove(player) - self.prov.logger.warning( - "Player %s removed from sync group due to write failure/timeout", - player.player_id, - ) - # Stop the player's stream - if player.stream: - self.mass.create_task(player.stream.stop()) - - # Update chunk counter (each chunk is exactly one second of audio) - chunk_seconds = len(chunk) / pcm_sample_size - self.seconds_streamed += chunk_seconds - - # send metadata if changed - # do this in a separate task to not disturb audio streaming - # NOTE: we should probably move this out of the audio stream task into it's own task - metadata: PlayerMedia | None - if ( - self.sync_clients - and (_leader := self.sync_clients[0]) - and (_leader.corrected_elapsed_time or 0) > 2 - and (metadata := _leader.current_media) is not None - ): - now = time.time() - metadata_checksum = f"{metadata.uri}.{metadata.title}.{metadata.image_url}" - progress = int(metadata.corrected_elapsed_time or 0) - if _last_metadata != metadata_checksum: - _last_metadata = metadata_checksum - prev_progress_report = now - self.mass.create_task(self._send_metadata(progress, metadata)) - # send the progress report every 5 seconds - elif now - prev_progress_report >= 5: - prev_progress_report = now - self.mass.create_task(self._send_metadata(progress, None)) - # Entire stream consumed: send EOF - generator_exhausted = True - async with self._lock: - await asyncio.gather( - *[ - self._write_eof_to_player(x) - for x in self.sync_clients - if x.stream and x.stream.running - ], - return_exceptions=True, - ) - except Exception as err: - logger = self.prov.logger - logger.error( - "Stream error: %s", - str(err) or err.__class__.__name__, - exc_info=err if logger.isEnabledFor(logging.DEBUG) else None, + players_to_remove.append(player) + elif isinstance(result, Exception): + self.prov.logger.error( + ("Error writing chunk to player %s: %s - REMOVING from sync group!"), + player.player_id, + result, + ) + players_to_remove.append(player) + + # Remove failed/timed-out players from sync group + for player in players_to_remove: + if player in self.sync_clients: + self.sync_clients.remove(player) + self.prov.logger.warning( + "Player %s removed from sync group due to write failure/timeout", + player.player_id, + ) + # Stop the player's stream + if player.stream: + self.mass.create_task(player.stream.stop()) + + # Update chunk counter (each chunk is exactly one second of audio) + chunk_seconds = len(chunk) / pcm_sample_size + self.seconds_streamed += chunk_seconds + + # send metadata if changed + # do this in a separate task to not disturb audio streaming + # NOTE: we should probably move this out of the audio stream task into it's own task + metadata: PlayerMedia | None + if ( + self.sync_clients + and (_leader := self.sync_clients[0]) + and (_leader.corrected_elapsed_time or 0) > 2 + and (metadata := _leader.current_media) is not None + ): + now = time.time() + metadata_checksum = f"{metadata.uri}.{metadata.title}.{metadata.image_url}" + progress = int(metadata.corrected_elapsed_time or 0) + if _last_metadata != metadata_checksum: + _last_metadata = metadata_checksum + prev_progress_report = now + self.mass.create_task(self._send_metadata(progress, metadata)) + # send the progress report every 5 seconds + elif now - prev_progress_report >= 5: + prev_progress_report = now + self.mass.create_task(self._send_metadata(progress, None)) + # Entire stream consumed: send EOF + async with self._lock: + await asyncio.gather( + *[ + self._write_eof_to_player(x) + for x in self.sync_clients + if x.stream and x.stream.running + ], + return_exceptions=True, ) - raise - finally: - if not generator_exhausted: - await close_async_generator(self._audio_source) async def _write_chunk_to_player(self, airplay_player: AirPlayPlayer, chunk: bytes) -> None: """ @@ -431,7 +408,7 @@ class AirPlayStreamSession: output_format=airplay_player.stream.pcm_format, filter_params=filter_params, audio_output=airplay_player.stream.audio_pipe.path, - extra_input_args=["-y"], + extra_input_args=["-y", "-readrate", "1.0", "-readrate_initial_burst", "1.2"], ) await ffmpeg.start() self._player_ffmpeg[airplay_player.player_id] = ffmpeg