From: Marcel van der Veldt Date: Sun, 24 Mar 2024 16:01:47 +0000 (+0100) Subject: Increase Airplay playback buffer (#1172) X-Git-Url: https://git.kitaultman.com/?a=commitdiff_plain;h=b69fcc00827270f821b98f8547eb9a85b905f8a2;p=music-assistant-server.git Increase Airplay playback buffer (#1172) * Some optimizations for realtime streaming to airplay * implement buffer between ffmpeg and cliraop * Update process.py --- diff --git a/music_assistant/server/controllers/streams.py b/music_assistant/server/controllers/streams.py index 1d701fb1..a1887daf 100644 --- a/music_assistant/server/controllers/streams.py +++ b/music_assistant/server/controllers/streams.py @@ -828,7 +828,7 @@ class StreamsController(CoreController): queue.queue_id, CONF_CROSSFADE_DURATION, 8 ) crossfade_size = int(pcm_sample_size * crossfade_duration) - buffer_size = int(pcm_sample_size * 2) # 2 seconds + buffer_size = int(pcm_sample_size * 5) # 5 seconds if use_crossfade: buffer_size += crossfade_size bytes_written = 0 @@ -872,7 +872,7 @@ class StreamsController(CoreController): buffer = b"" #### OTHER: enough data in buffer, feed to output - else: + while len(buffer) > buffer_size: yield buffer[:pcm_sample_size] bytes_written += pcm_sample_size buffer = buffer[pcm_sample_size:] @@ -1011,7 +1011,8 @@ class StreamsController(CoreController): filter_params=filter_params, extra_args=extra_args, input_path=input_path, - loglevel="info", # needed for loudness measurement + # loglevel info is needed for loudness measurement + loglevel="info", ) async def log_reader(ffmpeg_proc: AsyncProcess, state_data: dict[str, Any]): @@ -1119,7 +1120,7 @@ class StreamsController(CoreController): # we did not receive any data, somethinh wet wrong # raise here to prevent an endless loop elsewhere if state_data["bytes_sent"] == 0: - raise AudioError("stream error on %s", streamdetails.uri) + raise AudioError(f"stream error on {streamdetails.uri}") # all chunks received, strip silence of last part if needed and yield remaining bytes if strip_silence_end and prev_chunk: diff --git a/music_assistant/server/helpers/audio.py b/music_assistant/server/helpers/audio.py index 17863538..18ff1fd0 100644 --- a/music_assistant/server/helpers/audio.py +++ b/music_assistant/server/helpers/audio.py @@ -608,7 +608,7 @@ async def get_ffmpeg_stream( according to player preferences. """ if loglevel is None: - loglevel = "info" if LOGGER.isEnabledFor(VERBOSE_LOG_LEVEL) else "quiet" + loglevel = "info" if LOGGER.isEnabledFor(VERBOSE_LOG_LEVEL) else "fatal" use_stdin = not isinstance(audio_input, str) ffmpeg_args = get_ffmpeg_args( input_format=input_format, @@ -662,7 +662,7 @@ async def get_preview_stream( "ffmpeg", "-hide_banner", "-loglevel", - "info", + "quiet", "-ignore_unknown", ] if streamdetails.direct: @@ -798,14 +798,11 @@ def get_ffmpeg_args( extra_args: list[str] | None = None, input_path: str = "-", output_path: str = "-", - loglevel: str | None = None, + loglevel: str = "info", ) -> list[str]: """Collect all args to send to the ffmpeg process.""" - if loglevel is None: - loglevel = "info" if LOGGER.isEnabledFor(VERBOSE_LOG_LEVEL) else "quiet" if extra_args is None: extra_args = [] - extra_args += ["-bufsize", "32M"] ffmpeg_present, libsoxr_support, version = get_global_cache_value("ffmpeg_support") if not ffmpeg_present: msg = ( diff --git a/music_assistant/server/helpers/process.py b/music_assistant/server/helpers/process.py index 61e528a7..83d1b1cf 100644 --- a/music_assistant/server/helpers/process.py +++ b/music_assistant/server/helpers/process.py @@ -100,10 +100,10 @@ class AsyncProcess: stdin=stdin if self._enable_stdin else None, stdout=stdout if self._enable_stdout else None, stderr=asyncio.subprocess.PIPE if self._enable_stderr else None, - # setting the buffer limit is important to prevent exceeding the limit - # when reading lines from stderr (e.g. with long running ffmpeg process) - # https://stackoverflow.com/questions/55457370/how-to-avoid-valueerror-separator-is-not-found-and-chunk-exceed-the-limit - limit=1024 * 1024, + # setting the buffer limit somewhat high because we're working with large (PCM) + # audio chunks sent between (ffmpeg) processes. We'd rather consume a bit + # more memory than cpu cycles. + limit=1024000, ) LOGGER.debug("Started %s with PID %s", self._name, self.proc.pid) @@ -227,9 +227,13 @@ class AsyncProcess: yield line except ValueError as err: # we're waiting for a line (separator found), but the line was too big + # this may happen with ffmpeg during a long (radio) stream where progress + # gets outputted to the stderr but no newline + # https://stackoverflow.com/questions/55457370/how-to-avoid-valueerror-separator-is-not-found-and-chunk-exceed-the-limit # NOTE: this consumes the line that was too big if "chunk exceed the limit" in str(err): continue + # raise for all other (value) errors raise async def _feed_stdin(self, custom_stdin: AsyncGenerator[bytes, None]) -> None: diff --git a/music_assistant/server/providers/airplay/__init__.py b/music_assistant/server/providers/airplay/__init__.py index 7150ac6c..d8448fca 100644 --- a/music_assistant/server/providers/airplay/__init__.py +++ b/music_assistant/server/providers/airplay/__init__.py @@ -18,7 +18,7 @@ from zeroconf import IPVersion, ServiceStateChange from zeroconf.asyncio import AsyncServiceInfo from music_assistant.common.helpers.datetime import utc -from music_assistant.common.helpers.util import empty_queue, get_ip_pton, select_free_port +from music_assistant.common.helpers.util import get_ip_pton, select_free_port from music_assistant.common.models.config_entries import ( CONF_ENTRY_CROSSFADE, CONF_ENTRY_CROSSFADE_DURATION, @@ -61,6 +61,8 @@ CONF_ALAC_ENCODE = "alac_encode" CONF_VOLUME_START = "volume_start" CONF_PASSWORD = "password" +REQUIRED_BUFFER = int(44100 * (16 / 8) * 2) * 10 # 10 seconds + PLAYER_CONFIG_ENTRIES = ( CONF_ENTRY_CROSSFADE, @@ -199,7 +201,6 @@ class AirplayStream: self._cliraop_proc: AsyncProcess | None = None self._ffmpeg_proc: AsyncProcess | None = None self._stop_requested = False - self.buffer = asyncio.Queue(5) @property def running(self) -> bool: @@ -256,9 +257,6 @@ class AirplayStream: if platform.system() == "Darwin": os.environ["DYLD_LIBRARY_PATH"] = "/usr/local/lib" - # connect cliraop stdin with ffmpeg stdout using os pipes - read, write = os.pipe() - # launch ffmpeg, feeding (player specific) audio chunks on stdout # one could argue that the intermediate ffmpeg towards cliraop is not needed # when there are no player specific filters or extras but in this case @@ -267,25 +265,13 @@ class AirplayStream: input_format=self.input_format, output_format=AIRPLAY_PCM_FORMAT, filter_params=get_player_filter_params(self.mass, player_id), + loglevel="fatal", ) - - async def get_chunks() -> AsyncGenerator[bytes, None]: - while True: - chunk = await self.buffer.get() - if chunk == b"EOF": - break - yield chunk - - # launch ffmpeg process with player specific settings - # the stream_job_runner will start pushing pcm chunks to the stdin - # the ffmpeg process will send the output directly to the stdin of cliraop self._ffmpeg_proc = AsyncProcess( ffmpeg_args, enable_stdin=True, enable_stdout=True, enable_stderr=False, - custom_stdin=get_chunks(), - custom_stdout=write, name="cliraop_ffmpeg", ) await self._ffmpeg_proc.start() @@ -294,7 +280,7 @@ class AirplayStream: enable_stdin=True, enable_stdout=False, enable_stderr=True, - custom_stdin=read, + custom_stdin=self._audio_feeder(), name="cliraop", ) await self._cliraop_proc.start() @@ -305,7 +291,6 @@ class AirplayStream: if self._cliraop_proc.closed and self._ffmpeg_proc.closed: return self._stop_requested = True - empty_queue(self.buffer) async def _stop() -> None: # ffmpeg MUST be stopped before cliraop due to the chained pipes @@ -321,6 +306,16 @@ class AirplayStream: if wait: await task + async def write_chunk(self, chunk: bytes) -> None: + """Write a (pcm) audio chunk to ffmpeg.""" + await self._ffmpeg_proc.write(chunk) + + async def write_eof(self) -> None: + """Write EOF to the ffmpeg stdin.""" + await self._ffmpeg_proc.write_eof() + await self._ffmpeg_proc.wait() + await self.stop() + async def send_cli_command(self, command: str) -> None: """Send an interactive command to the running CLIRaop binary.""" if not self._cliraop_proc or self._cliraop_proc.closed: @@ -386,8 +381,10 @@ class AirplayStream: if "lost packet out of backlog" in line: lost_packets += 1 if lost_packets == 100: - logger.warning("High packet loss detected, stopping playback...") + logger.error("High packet loss detected, stopping playback...") await self.stop(False) + elif lost_packets % 10 == 0: + logger.warning("Packet loss detected!") logger.log(VERBOSE_LOG_LEVEL, line) @@ -447,6 +444,24 @@ class AirplayStream: progress = int(queue.corrected_elapsed_time) await self.send_cli_command(f"PROGRESS={progress}\n") + async def _audio_feeder(self) -> AsyncGenerator[bytes, None]: + """Read chunks from ffmpeg and feed (buffered) to cliraop.""" + buffer = b"" + async for chunk in self._ffmpeg_proc.iter_any(): + if self._stop_requested: + break + buffer += chunk + chunksize = len(chunk) + del chunk + while len(buffer) > REQUIRED_BUFFER: + yield buffer[:chunksize] + buffer = buffer[chunksize:] + # end of stream + if not self._stop_requested: + yield buffer + await self._cliraop_proc.write_eof() + del buffer + @dataclass class AirPlayPlayer: @@ -643,7 +658,7 @@ class AirplayProvider(PlayerProvider): """Handle streaming of audio to one or more airplay players.""" # Python is not suitable for realtime audio streaming so we do the actual streaming # of (RAOP) audio using a small executable written in C based on libraop to do the actual - # timestamped playback, whicj reads pcm audio from stdin + # timestamped playback, which reads pcm audio from stdin # and we can send some interactive commands using a named pipe. # get current ntp before we start @@ -668,8 +683,9 @@ class AirplayProvider(PlayerProvider): if airplay_player.active_stream.start_ntp != start_ntp: # checksum mismatch continue - tg.create_task(airplay_player.active_stream.buffer.put(chunk)) + tg.create_task(airplay_player.active_stream.write_chunk(chunk)) active_clients += 1 + if active_clients == 0: # no more clients return @@ -681,7 +697,7 @@ class AirplayProvider(PlayerProvider): or airplay_player.active_stream.start_ntp != start_ntp ): continue - tg.create_task(airplay_player.active_stream.buffer.put(b"EOF")) + tg.create_task(airplay_player.active_stream.write_eof()) async def cmd_volume_set(self, player_id: str, volume_level: int) -> None: """Send VOLUME_SET command to given player. diff --git a/music_assistant/server/providers/snapcast/__init__.py b/music_assistant/server/providers/snapcast/__init__.py index 5cdf4b88..0a903ca4 100644 --- a/music_assistant/server/providers/snapcast/__init__.py +++ b/music_assistant/server/providers/snapcast/__init__.py @@ -360,6 +360,7 @@ class SnapCastProvider(PlayerProvider): output_format=DEFAULT_SNAPCAST_FORMAT, filter_params=get_player_filter_params(self.mass, player_id), output_path=f"tcp://{host}:{port}", + loglevel="fatal", ) try: async with AsyncProcess(