From: Marcel van der Veldt Date: Wed, 12 Mar 2025 21:37:41 +0000 (+0100) Subject: Some small improvements to the Airplay provider (#2032) X-Git-Url: https://git.kitaultman.com/?a=commitdiff_plain;h=e1f7bbed4c64aec08166ea7be82c5bf09c04c0bf;p=music-assistant-server.git Some small improvements to the Airplay provider (#2032) --- diff --git a/music_assistant/providers/airplay/provider.py b/music_assistant/providers/airplay/provider.py index 0d405953..1c7395ae 100644 --- a/music_assistant/providers/airplay/provider.py +++ b/music_assistant/providers/airplay/provider.py @@ -309,9 +309,6 @@ class AirplayProvider(PlayerProvider): # this accounts for syncgroups and linked players (e.g. sonos) player.active_source = media.queue_id player.current_media = media - # always stop existing stream first - if airplay_player.raop_stream and airplay_player.raop_stream.running: - await airplay_player.cmd_stop() # select audio source if media.media_type == MediaType.ANNOUNCEMENT: @@ -370,6 +367,12 @@ class AirplayProvider(PlayerProvider): input_format=AudioFormat(content_type=ContentType.try_parse(media.uri)), output_format=AIRPLAY_PCM_FORMAT, ) + + # if an existing stream session is running, replace it with the new stream + if airplay_player.raop_stream and airplay_player.raop_stream.running: + await airplay_player.raop_stream.session.replace_stream(audio_source) + return + # setup RaopStreamSession for player (and its sync childs if any) sync_clients = self._get_sync_clients(player_id) raop_stream_session = RaopStreamSession(self, sync_clients, input_format, audio_source) @@ -423,14 +426,17 @@ class AirplayProvider(PlayerProvider): parent_player.group_childs.append(parent_player.player_id) parent_player.group_childs.append(child_player.player_id) child_player.synced_to = parent_player.player_id + # check if we should (re)start or join a stream session active_queue = self.mass.player_queues.get_active_queue(parent_player.player_id) if active_queue.state == PlayerState.PLAYING: # playback needs to be restarted to form a new multi client stream session + # TODO: allow late joining to existing stream + await self.mass.player_queues.stop(active_queue.queue_id) # this could potentially be called by multiple players at the exact same time # so we debounce the resync a bit here with a timer self.mass.call_later( - 1, + 0.5, self.mass.player_queues.resume, active_queue.queue_id, fade_in=False, diff --git a/music_assistant/providers/airplay/raop.py b/music_assistant/providers/airplay/raop.py index a0183eb6..62cae6f4 100644 --- a/music_assistant/providers/airplay/raop.py +++ b/music_assistant/providers/airplay/raop.py @@ -16,7 +16,7 @@ from music_assistant_models.enums import PlayerState from music_assistant_models.errors import PlayerCommandFailed from music_assistant.constants import CONF_SYNC_ADJUST, VERBOSE_LOG_LEVEL -from music_assistant.helpers.audio import get_player_filter_params +from music_assistant.helpers.audio import get_chunksize, get_player_filter_params from music_assistant.helpers.ffmpeg import FFMpeg from music_assistant.helpers.process import AsyncProcess, check_output from music_assistant.helpers.util import TaskManager, close_async_generator @@ -62,48 +62,6 @@ class RaopStreamSession: """Initialize RaopStreamSession.""" # initialize raop stream for all players - async def audio_streamer() -> None: - """Stream audio to all players.""" - generator_exhausted = False - try: - async for chunk in self._audio_source: - async with self._lock: - sync_clients = [ - x for x in self._sync_clients if x.raop_stream and x.raop_stream.running - ] - if not sync_clients: - return - await asyncio.gather( - *[ - x.raop_stream.write_chunk(chunk) - for x in sync_clients - if x.raop_stream - ], - return_exceptions=True, - ) - # entire stream consumed: send EOF - generator_exhausted = True - async with self._lock: - await asyncio.gather( - *[ - x.raop_stream.write_eof() - for x in self._sync_clients - if x.raop_stream and x.raop_stream.running - ], - return_exceptions=True, - ) - except Exception as err: - logger = self.prov.logger - logger.error( - "Stream error: %s", - str(err) or err.__class__.__name__, - exc_info=err if logger.isEnabledFor(logging.DEBUG) else None, - ) - raise - finally: - if not generator_exhausted: - await close_async_generator(self._audio_source) - # get current ntp and start RaopStream per player assert self.prov.cliraop_bin _, stdout = await check_output(self.prov.cliraop_bin, "-ntp") @@ -121,7 +79,7 @@ class RaopStreamSession: async with TaskManager(self.mass) as tm: for _raop_player in self._sync_clients: tm.create_task(_start_client(_raop_player)) - self._audio_source_task = asyncio.create_task(audio_streamer()) + self._audio_source_task = asyncio.create_task(self._audio_streamer()) async def stop(self) -> None: """Stop playback and cleanup.""" @@ -140,7 +98,8 @@ class RaopStreamSession: return assert airplay_player.raop_stream assert airplay_player.raop_stream.session == self - self._sync_clients.remove(airplay_player) + async with self._lock: + self._sync_clients.remove(airplay_player) await airplay_player.raop_stream.stop() airplay_player.raop_stream = None @@ -150,6 +109,67 @@ class RaopStreamSession: # e.g. by counting the number of frames sent etc. raise NotImplementedError("Adding clients to a session is not yet supported") + async def replace_stream(self, audio_source: AsyncGenerator[bytes, None]) -> None: + """Replace the audio source of the stream.""" + # cancel the per-player ffmpeg reader tasks + for _raop_player in self._sync_clients: + assert _raop_player.raop_stream # for type checker + assert _raop_player.raop_stream.ffmpeg_reader_task # for type checker + _raop_player.raop_stream.ffmpeg_reader_task.cancel() + with suppress(asyncio.CancelledError): + await _raop_player.raop_stream.ffmpeg_reader_task + # cancel the current audio source task + assert self._audio_source_task # for type checker + self._audio_source_task.cancel() + with suppress(asyncio.CancelledError): + await self._audio_source_task + # set new audio source and restart the stream + self._audio_source = audio_source + self._audio_source_task = asyncio.create_task(self._audio_streamer()) + for _raop_player in self._sync_clients: + assert _raop_player.raop_stream # for type checker + _raop_player.raop_stream.ffmpeg_reader_task = self.mass.create_task( + _raop_player.raop_stream.ffmpeg_reader() + ) + + async def _audio_streamer(self) -> None: + """Stream audio to all players.""" + generator_exhausted = False + try: + async for chunk in self._audio_source: + async with self._lock: + sync_clients = [ + x for x in self._sync_clients if x.raop_stream and x.raop_stream.running + ] + if not sync_clients: + return + await asyncio.gather( + *[x.raop_stream.write_chunk(chunk) for x in sync_clients if x.raop_stream], + return_exceptions=True, + ) + # entire stream consumed: send EOF + generator_exhausted = True + async with self._lock: + await asyncio.gather( + *[ + x.raop_stream.write_eof() + for x in self._sync_clients + if x.raop_stream and x.raop_stream.running + ], + return_exceptions=True, + ) + except Exception as err: + logger = self.prov.logger + logger.error( + "Stream error: %s", + str(err) or err.__class__.__name__, + exc_info=err if logger.isEnabledFor(logging.DEBUG) else None, + ) + raise + finally: + if not generator_exhausted: + await close_async_generator(self._audio_source) + class RaopStream: """ @@ -179,8 +199,11 @@ class RaopStream: self._stderr_reader_task: asyncio.Task[None] | None = None self._cliraop_proc: AsyncProcess | None = None self._ffmpeg_proc: AsyncProcess | None = None + self.ffmpeg_reader_task: asyncio.Task[None] | None = None self._started = asyncio.Event() self._stopped = False + self._total_bytes_sent = 0 + self._stream_bytes_sent = 0 @property def running(self) -> bool: @@ -226,21 +249,9 @@ class RaopStream: read_ahead = await self.mass.config.get_player_config_value( player_id, CONF_READ_AHEAD_BUFFER ) - # create os pipes to pipe ffmpeg to cliraop - read, write = await asyncio.to_thread(os.pipe) # ffmpeg handles the player specific stream + filters and pipes # audio to the cliraop process - self._ffmpeg_proc = FFMpeg( - audio_input="-", - input_format=self.session.input_format, - output_format=AIRPLAY_PCM_FORMAT, - filter_params=get_player_filter_params( - self.mass, player_id, self.session.input_format, AIRPLAY_PCM_FORMAT - ), - audio_output=write, - ) - await self._ffmpeg_proc.start() - await asyncio.to_thread(os.close, write) + self.ffmpeg_reader_task = self.mass.create_task(self.ffmpeg_reader()) # cliraop is the binary that handles the actual raop streaming to the player # this is a slightly modified bversion of philippe44's libraop @@ -270,11 +281,10 @@ class RaopStream: self.airplay_player.address, "-", ] - self._cliraop_proc = AsyncProcess(cliraop_args, stdin=read, stderr=True, name="cliraop") + self._cliraop_proc = AsyncProcess(cliraop_args, stdin=True, stderr=True, name="cliraop") if platform.system() == "Darwin": os.environ["DYLD_LIBRARY_PATH"] = "/usr/local/lib" await self._cliraop_proc.start() - await asyncio.to_thread(os.close, read) # read first 10 lines of stderr to get the initial status for _ in range(10): line = (await self._cliraop_proc.read_stderr()).decode("utf-8", errors="ignore") @@ -283,6 +293,7 @@ class RaopStream: self._started.set() break if "Cannot connect to AirPlay device" in line: + self.ffmpeg_reader_task.cancel() raise PlayerCommandFailed("Cannot connect to AirPlay device") # start reading the stderr of the cliraop process from another task self._stderr_reader_task = self.mass.create_task(self._stderr_reader()) @@ -298,12 +309,10 @@ class RaopStream: await self._cliraop_proc.wait_with_timeout(2) if self._stderr_reader_task and not self._stderr_reader_task.done(): self._stderr_reader_task.cancel() + if self.ffmpeg_reader_task and not self.ffmpeg_reader_task.done(): + self.ffmpeg_reader_task.cancel() if self._cliraop_proc.proc and not self._cliraop_proc.closed: await self._cliraop_proc.close(True) - if self._ffmpeg_proc and not self._ffmpeg_proc.closed: - await self._ffmpeg_proc.close(True) - self._cliraop_proc = None - self._ffmpeg_proc = None async def write_chunk(self, chunk: bytes) -> None: """Write a (pcm) audio chunk.""" @@ -339,7 +348,46 @@ class RaopStream: self.airplay_player.last_command_sent = time.time() await asyncio.to_thread(send_data) - async def _stderr_reader(self) -> None: # noqa: PLR0915 + async def ffmpeg_reader(self) -> None: + """Read audio from the audio source and pipe it to the CLIRaop process.""" + self._ffmpeg_proc = FFMpeg( + audio_input="-", + input_format=self.session.input_format, + output_format=AIRPLAY_PCM_FORMAT, + filter_params=get_player_filter_params( + self.mass, + self.airplay_player.player_id, + self.session.input_format, + AIRPLAY_PCM_FORMAT, + ), + ) + self._stream_bytes_sent = 0 + mass_player = self.mass.players.get(self.airplay_player.player_id) + assert mass_player # for type checker + try: + await self._ffmpeg_proc.start() + chunksize = get_chunksize(AIRPLAY_PCM_FORMAT) + # wait for cliraop to be ready + await asyncio.wait_for(self._started.wait(), 20) + async for chunk in self._ffmpeg_proc.iter_chunked(chunksize): + if self._stopped: + break + if not self._cliraop_proc or self._cliraop_proc.closed: + break + await self._cliraop_proc.write(chunk) + self._stream_bytes_sent += len(chunk) + self._total_bytes_sent += len(chunk) + del chunk + # we base elapsed time on the amount of bytes sent + # so we can account for reusing the same session for multiple streams + mass_player.elapsed_time = self._stream_bytes_sent / chunksize + mass_player.elapsed_time_last_updated = time.time() + if self._cliraop_proc and not self._cliraop_proc.closed: + await self._cliraop_proc.write_eof() + finally: + await self._ffmpeg_proc.close() + + async def _stderr_reader(self) -> None: """Monitor stderr for the running CLIRaop process.""" airplay_player = self.airplay_player mass_player = self.mass.players.get(airplay_player.player_id) @@ -355,14 +403,14 @@ class RaopStream: async for line in self._cliraop_proc.iter_stderr(): if "elapsed milliseconds:" in line: # this is received more or less every second while playing - millis = int(line.split("elapsed milliseconds: ")[1]) - mass_player.elapsed_time = millis / 1000 - mass_player.elapsed_time_last_updated = time.time() + # millis = int(line.split("elapsed milliseconds: ")[1]) + # mass_player.elapsed_time = (millis / 1000) - self.elapsed_time_correction + # mass_player.elapsed_time_last_updated = time.time() # send metadata to player(s) if needed # NOTE: this must all be done in separate tasks to not disturb audio now = time.time() if ( - mass_player.elapsed_time > 2 + (mass_player.elapsed_time or 0) > 2 and queue and queue.current_item and queue.current_item.streamdetails