From: Marcel van der Veldt Date: Sun, 7 Apr 2024 21:48:42 +0000 (+0200) Subject: Some optimizations to Airplay buffering (#1208) X-Git-Url: https://git.kitaultman.com/?a=commitdiff_plain;h=8fc075646031e3390f908dcd917c0b3e7a6a89fc;p=music-assistant-server.git Some optimizations to Airplay buffering (#1208) --- diff --git a/music_assistant/server/helpers/process.py b/music_assistant/server/helpers/process.py index 9dc88511..ca6e7139 100644 --- a/music_assistant/server/helpers/process.py +++ b/music_assistant/server/helpers/process.py @@ -23,7 +23,8 @@ from music_assistant.constants import MASS_LOGGER_NAME LOGGER = logging.getLogger(f"{MASS_LOGGER_NAME}.helpers.process") -DEFAULT_CHUNKSIZE = 128000 + +DEFAULT_CHUNKSIZE = 64000 # pylint: disable=invalid-name @@ -42,7 +43,7 @@ class AsyncProcess: args: list[str], stdin: bool | int | AsyncGenerator[bytes, None] | None = None, stdout: bool | int | None = None, - stderr: bool | int | None = False, + stderr: bool | int | None = None, name: str | None = None, ) -> None: """Initialize AsyncProcess.""" @@ -53,12 +54,9 @@ class AsyncProcess: self.attached_tasks: list[asyncio.Task] = [] self.logger = LOGGER.getChild(name) self._args = args - self._stdin = stdin - self._stdout = stdout - self._stderr = stderr - self._stdin_enabled = stdin not in (None, False) - self._stdout_enabled = stdout not in (None, False) - self._stderr_enabled = stderr not in (None, False) + self._stdin = None if stdin is False else stdin + self._stdout = None if stdout is False else stdout + self._stderr = asyncio.subprocess.DEVNULL if stderr is False else stderr self._close_called = False self._returncode: bool | None = None @@ -103,6 +101,10 @@ class AsyncProcess: else self._stdin, stdout=asyncio.subprocess.PIPE if self._stdout is True else self._stdout, stderr=asyncio.subprocess.PIPE if self._stderr is True else self._stderr, + # because we're exchanging big amounts of (audio) data with pipes + # it makes sense to extend the pipe size and (buffer) limits a bit + limit=1000000, + pipesize=1000000, ) self.logger.debug("Process %s started with PID %s", self.name, self.proc.pid) if isinstance(self._stdin, AsyncGenerator): @@ -172,12 +174,16 @@ class AsyncProcess: for task in self.attached_tasks: if not task.done(): task.cancel() - with suppress(asyncio.CancelledError): - await task if send_signal and self.returncode is None: self.proc.send_signal(SIGINT) - # allow the process a bit of time to respond to the signal before we go nuclear - await asyncio.sleep(0.5) + + # abort existing readers on stderr/stdout first before we send communicate + if self.proc.stdout and self.proc.stdout._waiter is not None: + self.proc.stdout._waiter.set_exception(asyncio.CancelledError()) + self.proc.stdout._waiter = None + if self.proc.stderr and self.proc.stderr._waiter is not None: + self.proc.stderr._waiter.set_exception(asyncio.CancelledError()) + self.proc.stderr._waiter = None # make sure the process is really cleaned up. # especially with pipes this can cause deadlocks if not properly guarded @@ -185,13 +191,6 @@ class AsyncProcess: while True: try: async with asyncio.timeout(5): - # abort existing readers on stderr/stdout first before we send communicate - if self.proc.stdout and self.proc.stdout._waiter is not None: - self.proc.stdout._waiter.set_exception(asyncio.CancelledError()) - self.proc.stdout._waiter = None - if self.proc.stderr and self.proc.stderr._waiter is not None: - self.proc.stderr._waiter.set_exception(asyncio.CancelledError()) - self.proc.stderr._waiter = None # use communicate to flush all pipe buffers await self.proc.communicate() if self.returncode is not None: @@ -213,9 +212,8 @@ class AsyncProcess: async def wait(self) -> int: """Wait for the process and return the returncode.""" - if self.returncode is not None: - return self.returncode - self._returncode = await self.proc.wait() + if self._returncode is None: + self._returncode = await self.proc.wait() return self._returncode async def communicate(self, input_data: bytes | None = None) -> tuple[bytes, bytes]: diff --git a/music_assistant/server/providers/airplay/__init__.py b/music_assistant/server/providers/airplay/__init__.py index 3cc9acec..a5ea4e8e 100644 --- a/music_assistant/server/providers/airplay/__init__.py +++ b/music_assistant/server/providers/airplay/__init__.py @@ -3,13 +3,11 @@ from __future__ import annotations import asyncio -import fcntl import logging import os import platform import socket import time -from collections.abc import AsyncGenerator from contextlib import suppress from dataclasses import dataclass from random import randint, randrange @@ -19,7 +17,7 @@ from zeroconf import IPVersion, ServiceStateChange from zeroconf.asyncio import AsyncServiceInfo from music_assistant.common.helpers.datetime import utc -from music_assistant.common.helpers.util import get_ip_pton, select_free_port +from music_assistant.common.helpers.util import empty_queue, get_ip_pton, select_free_port from music_assistant.common.models.config_entries import ( CONF_ENTRY_CROSSFADE, CONF_ENTRY_CROSSFADE_DURATION, @@ -197,17 +195,16 @@ class AirplayStream: # always generate a new active remote id to prevent race conditions # with the named pipe used to send audio self.active_remote_id: str = str(randint(1000, 8000)) - self.start_ntp: int | None = None # use as checksum self.prevent_playback: bool = False self.running = True + # audio_source_task will only exist for the main player in a sync group + self.audio_source_task: asyncio.Task | None = None self._log_reader_task: asyncio.Task | None = None - self._audio_reader_task: asyncio.Task | None = None self._cliraop_proc: AsyncProcess | None = None self._ffmpeg_proc: AsyncProcess | None = None - async def start(self, start_ntp: int) -> None: + async def start(self, start_ntp: int, wait_start: int = 1000) -> None: """Initialize CLIRaop process for a player.""" - self.start_ntp = start_ntp extra_args = [] player_id = self.airplay_player.player_id mass_player = self.mass.players.get(player_id) @@ -235,7 +232,7 @@ class AirplayStream: "-port", str(self.airplay_player.discovery_info.port), "-wait", - str(2500 - sync_adjust), + str(wait_start - sync_adjust), "-volume", str(mass_player.volume_level), *extra_args, @@ -251,26 +248,14 @@ class AirplayStream: if platform.system() == "Darwin": os.environ["DYLD_LIBRARY_PATH"] = "/usr/local/lib" - # launch ffmpeg, feeding (player specific) audio chunks on stdout - # one could argue that the intermediate ffmpeg towards cliraop is not needed - # when there are no player specific filters or extras but in this case - # ffmpeg serves as a small buffer towards the realtime cliraop streamer - - # create pipes to interconnect ffmpeg with cliraop - def create_pipes() -> tuple[int, int]: - read, write = os.pipe() - if platform.system() == "Linux": - # extend the pipe buffer a bit for smoother playback - fcntl.fcntl(read, 1031, 1000000) - fcntl.fcntl(write, 1031, 1000000) - return (read, write) - - read, write = await asyncio.to_thread(create_pipes) - + # ffmpeg handles the player specific stream + filters and pipes + # audio to the cliraop process + read, write = await asyncio.to_thread(os.pipe) ffmpeg_args = get_ffmpeg_args( input_format=self.input_format, output_format=AIRPLAY_PCM_FORMAT, filter_params=get_player_filter_params(self.mass, player_id), + loglevel="fatal", ) self._ffmpeg_proc = AsyncProcess( ffmpeg_args, @@ -279,41 +264,35 @@ class AirplayStream: name="cliraop_ffmpeg", ) await self._ffmpeg_proc.start() - os.close(write) + await asyncio.to_thread(os.close, write) self._cliraop_proc = AsyncProcess( cliraop_args, stdin=read, stdout=False, stderr=True, name="cliraop" ) await self._cliraop_proc.start() - os.close(read) + await asyncio.to_thread(os.close, read) self._log_reader_task = asyncio.create_task(self._log_watcher()) - async def stop(self, wait: bool = True): + async def stop(self): """Stop playback and cleanup.""" self.running = False + if self.audio_source_task and not self.audio_source_task.done(): + self.audio_source_task.cancel() + if not self._cliraop_proc.closed: + await self.send_cli_command("ACTION=STOP") + await self._cliraop_proc.wait() - async def _stop() -> None: - # ffmpeg MUST be stopped before cliraop due to the chained pipes - if not self._ffmpeg_proc.closed: - await self._ffmpeg_proc.close(True) - # allow the cliraop process to stop gracefully first - if not self._cliraop_proc.closed: - await self.send_cli_command("ACTION=STOP") - with suppress(TimeoutError): - await asyncio.wait_for(self._cliraop_proc.wait(), 5) - # send regular close anyway (which also logs the returncode) - await self._cliraop_proc.close(True) - - task = self.mass.create_task(_stop()) - if wait: - await task + # ffmpeg can sometimes hang due to the connected pipes + # we handle closing it but it can be a bit slow so do that in the background + if not self._ffmpeg_proc.closed: + self.mass.create_task(self._ffmpeg_proc.close(True)) async def write_chunk(self, chunk: bytes) -> None: - """Write a (pcm) audio chunk to ffmpeg.""" + """Write a (pcm) audio chunk.""" await self._ffmpeg_proc.write(chunk) async def write_eof(self) -> None: - """Write EOF to the ffmpeg stdin.""" + """Write EOF.""" await self._ffmpeg_proc.write_eof() async def send_cli_command(self, command: str) -> None: @@ -321,7 +300,6 @@ class AirplayStream: if not self._cliraop_proc or self._cliraop_proc.closed: return - named_pipe = f"/tmp/fifo-{self.active_remote_id}" # noqa: S108 if not command.endswith("\n"): command += "\n" @@ -329,6 +307,7 @@ class AirplayStream: with suppress(BrokenPipeError), open(named_pipe, "w") as f: f.write(command) + named_pipe = f"/tmp/raop-{self.active_remote_id}" # noqa: S108 self.airplay_player.logger.log(VERBOSE_LOG_LEVEL, "sending command %s", command) await self.mass.create_task(send_data) @@ -383,9 +362,9 @@ class AirplayStream: if "lost packet out of backlog" in line: lost_packets += 1 if lost_packets == 100: - logger.error("High packet loss detected, stopping playback...") - await self.stop(False) - elif lost_packets % 10 == 0: + logger.error("High packet loss detected, restarting playback...") + self.mass.create_task(self.mass.player_queues.resume(queue.queue_id)) + else: logger.warning("Packet loss detected!") logger.log(VERBOSE_LOG_LEVEL, line) @@ -395,9 +374,8 @@ class AirplayStream: if airplay_player.active_stream == self: mass_player.state = PlayerState.IDLE self.mass.players.update(airplay_player.player_id) - # ensure we're cleaned up afterwards - if self._ffmpeg_proc.returncode is None or self._cliraop_proc.returncode is None: - await self.stop() + # ensure we're cleaned up afterwards (this also logs the returncode) + await self.stop() async def _send_metadata(self, queue: PlayerQueue) -> None: """Send metadata to player (and connected sync childs).""" @@ -605,7 +583,7 @@ class AirplayProvider(PlayerProvider): async with asyncio.TaskGroup() as tg: for airplay_player in self._get_sync_clients(player_id): if airplay_player.active_stream and airplay_player.active_stream.running: - tg.create_task(airplay_player.active_stream.stop(wait=False)) + tg.create_task(airplay_player.active_stream.stop()) # select audio source if media.media_type == MediaType.ANNOUNCEMENT: # special case: stream announcement @@ -640,48 +618,62 @@ class AirplayProvider(PlayerProvider): input_format=AudioFormat(ContentType.try_parse(media.uri)), output_format=AIRPLAY_PCM_FORMAT, ) - self.mass.create_task(self._handle_stream_audio, player_id, audio_source, input_format) - async def _handle_stream_audio( - self, player_id: str, audio_source: AsyncGenerator[bytes, None], input_format: AudioFormat - ) -> None: - """Handle streaming of audio to one or more airplay players.""" # Python is not suitable for realtime audio streaming so we do the actual streaming # of (RAOP) audio using a small executable written in C based on libraop to do the actual # timestamped playback, which reads pcm audio from stdin # and we can send some interactive commands using a named pipe. - # get current ntp before we start - _, stdout = await check_output(f"{self.cliraop_bin} -ntp") - start_ntp = int(stdout.strip()) - - # setup Raop process for player and its sync childs - for airplay_player in self._get_sync_clients(player_id): + # setup AirplayStream for player and its sync childs + sync_clients = self._get_sync_clients(player_id) + for airplay_player in sync_clients: airplay_player.active_stream = AirplayStream( self, airplay_player, input_format=input_format ) - self.mass.create_task(airplay_player.active_stream.start(start_ntp)) - - async for chunk in audio_source: - active_clients = 0 - async with asyncio.TaskGroup() as tg: - for airplay_player in self._get_sync_clients(player_id): - if not airplay_player.active_stream or not airplay_player.active_stream.running: - # player stopped or switched to a new stream - continue - if airplay_player.active_stream.start_ntp != start_ntp: - # checksum mismatch - continue - tg.create_task(airplay_player.active_stream.write_chunk(chunk)) - active_clients += 1 - - if active_clients == 0: - # no more clients - return - # entire stream consumed: send EOF (empty chunk) + + # use a buffer here to consume small hiccups as the + # raop streaming is pretty much realtime and without a buffer to stdin + buffer: asyncio.Queue[bytes] = asyncio.Queue(10) + + async def fill_buffer() -> None: + async for chunk in audio_source: + await buffer.put(chunk) + await buffer.put(b"EOF") + + fill_buffer_task = asyncio.create_task(fill_buffer()) + + async def audio_streamer() -> None: + try: + while True: + chunk = await buffer.get() + if chunk == b"EOF": + break + async with asyncio.TaskGroup() as tg: + for airplay_player in sync_clients: + tg.create_task(airplay_player.active_stream.write_chunk(chunk)) + + # entire stream consumed: send EOF + for airplay_player in sync_clients: + self.mass.create_task(airplay_player.active_stream.write_eof()) + finally: + if not fill_buffer_task.done(): + fill_buffer_task.cancel() + # make sure the stdin generator is also properly closed + # by propagating a cancellederror within + task = asyncio.create_task(audio_source.__anext__()) + task.cancel() + empty_queue(buffer) + + # get current ntp and start cliraop + _, stdout = await check_output(f"{self.cliraop_bin} -ntp") + start_ntp = int(stdout.strip()) + wait_start = 1000 + (500 * len(sync_clients)) async with asyncio.TaskGroup() as tg: for airplay_player in self._get_sync_clients(player_id): - tg.create_task(airplay_player.active_stream.write_eof()) + tg.create_task(airplay_player.active_stream.start(start_ntp, wait_start)) + self._players[player_id].active_stream.audio_source_task = asyncio.create_task( + audio_streamer() + ) async def cmd_volume_set(self, player_id: str, volume_level: int) -> None: """Send VOLUME_SET command to given player. @@ -971,14 +963,14 @@ class AirplayProvider(PlayerProvider): count = 0 if not (airplay_player := self._players.get(player_id)): return - prev_ntp = airplay_player.active_stream.start_ntp + prev_active_remote_id = airplay_player.active_stream.active_remote_id while count < 40: count += 1 if not (airplay_player := self._players.get(player_id)): return if not (active_stream := airplay_player.active_stream): return - if active_stream.start_ntp != prev_ntp: + if active_stream.active_remote_id != prev_active_remote_id: # checksum return if not active_stream.prevent_playback: diff --git a/music_assistant/server/providers/airplay/bin/cliraop-linux-aarch64 b/music_assistant/server/providers/airplay/bin/cliraop-linux-aarch64 index 2e0a8e32..5de82f58 100755 Binary files a/music_assistant/server/providers/airplay/bin/cliraop-linux-aarch64 and b/music_assistant/server/providers/airplay/bin/cliraop-linux-aarch64 differ diff --git a/music_assistant/server/providers/airplay/bin/cliraop-linux-x86_64 b/music_assistant/server/providers/airplay/bin/cliraop-linux-x86_64 index ed7ddfbc..dada71fa 100755 Binary files a/music_assistant/server/providers/airplay/bin/cliraop-linux-x86_64 and b/music_assistant/server/providers/airplay/bin/cliraop-linux-x86_64 differ diff --git a/music_assistant/server/providers/airplay/bin/cliraop-macos-arm64 b/music_assistant/server/providers/airplay/bin/cliraop-macos-arm64 index fbd633a0..2bd3bfb4 100755 Binary files a/music_assistant/server/providers/airplay/bin/cliraop-macos-arm64 and b/music_assistant/server/providers/airplay/bin/cliraop-macos-arm64 differ