Increase Airplay playback buffer (#1172)
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Sun, 24 Mar 2024 16:01:47 +0000 (17:01 +0100)
committerGitHub <noreply@github.com>
Sun, 24 Mar 2024 16:01:47 +0000 (17:01 +0100)
* Some optimizations for realtime streaming to airplay

* implement buffer between ffmpeg and cliraop

* Update process.py

music_assistant/server/controllers/streams.py
music_assistant/server/helpers/audio.py
music_assistant/server/helpers/process.py
music_assistant/server/providers/airplay/__init__.py
music_assistant/server/providers/snapcast/__init__.py

index 1d701fb119176c8a7d3d6613686d8490d2d35cf4..a1887daf9706b1f68203d207d415aca6afef7383 100644 (file)
@@ -828,7 +828,7 @@ class StreamsController(CoreController):
                 queue.queue_id, CONF_CROSSFADE_DURATION, 8
             )
             crossfade_size = int(pcm_sample_size * crossfade_duration)
-            buffer_size = int(pcm_sample_size * 2)  # 2 seconds
+            buffer_size = int(pcm_sample_size * 5)  # 5 seconds
             if use_crossfade:
                 buffer_size += crossfade_size
             bytes_written = 0
@@ -872,7 +872,7 @@ class StreamsController(CoreController):
                     buffer = b""
 
                 #### OTHER: enough data in buffer, feed to output
-                else:
+                while len(buffer) > buffer_size:
                     yield buffer[:pcm_sample_size]
                     bytes_written += pcm_sample_size
                     buffer = buffer[pcm_sample_size:]
@@ -1011,7 +1011,8 @@ class StreamsController(CoreController):
             filter_params=filter_params,
             extra_args=extra_args,
             input_path=input_path,
-            loglevel="info",  # needed for loudness measurement
+            # loglevel info is needed for loudness measurement
+            loglevel="info",
         )
 
         async def log_reader(ffmpeg_proc: AsyncProcess, state_data: dict[str, Any]):
@@ -1119,7 +1120,7 @@ class StreamsController(CoreController):
             # we did not receive any data, somethinh wet wrong
             # raise here to prevent an endless loop elsewhere
             if state_data["bytes_sent"] == 0:
-                raise AudioError("stream error on %s", streamdetails.uri)
+                raise AudioError(f"stream error on {streamdetails.uri}")
 
             # all chunks received, strip silence of last part if needed and yield remaining bytes
             if strip_silence_end and prev_chunk:
index 178635388f915bd9f7f1e67b94112a82494da511..18ff1fd0a73f70d9ae96f134ffaffa27bbe4d2de 100644 (file)
@@ -608,7 +608,7 @@ async def get_ffmpeg_stream(
     according to player preferences.
     """
     if loglevel is None:
-        loglevel = "info" if LOGGER.isEnabledFor(VERBOSE_LOG_LEVEL) else "quiet"
+        loglevel = "info" if LOGGER.isEnabledFor(VERBOSE_LOG_LEVEL) else "fatal"
     use_stdin = not isinstance(audio_input, str)
     ffmpeg_args = get_ffmpeg_args(
         input_format=input_format,
@@ -662,7 +662,7 @@ async def get_preview_stream(
         "ffmpeg",
         "-hide_banner",
         "-loglevel",
-        "info",
+        "quiet",
         "-ignore_unknown",
     ]
     if streamdetails.direct:
@@ -798,14 +798,11 @@ def get_ffmpeg_args(
     extra_args: list[str] | None = None,
     input_path: str = "-",
     output_path: str = "-",
-    loglevel: str | None = None,
+    loglevel: str = "info",
 ) -> list[str]:
     """Collect all args to send to the ffmpeg process."""
-    if loglevel is None:
-        loglevel = "info" if LOGGER.isEnabledFor(VERBOSE_LOG_LEVEL) else "quiet"
     if extra_args is None:
         extra_args = []
-    extra_args += ["-bufsize", "32M"]
     ffmpeg_present, libsoxr_support, version = get_global_cache_value("ffmpeg_support")
     if not ffmpeg_present:
         msg = (
index 61e528a74918155d2d8db63b5725228289b8c3a0..83d1b1cf0ec1caffc15d506946a4fffb19b5c05b 100644 (file)
@@ -100,10 +100,10 @@ class AsyncProcess:
             stdin=stdin if self._enable_stdin else None,
             stdout=stdout if self._enable_stdout else None,
             stderr=asyncio.subprocess.PIPE if self._enable_stderr else None,
-            # setting the buffer limit is important to prevent exceeding the limit
-            # when reading lines from stderr (e.g. with long running ffmpeg process)
-            # https://stackoverflow.com/questions/55457370/how-to-avoid-valueerror-separator-is-not-found-and-chunk-exceed-the-limit
-            limit=1024 * 1024,
+            # setting the buffer limit somewhat high because we're working with large (PCM)
+            # audio chunks sent between (ffmpeg) processes. We'd rather consume a bit
+            # more memory than cpu cycles.
+            limit=1024000,
         )
         LOGGER.debug("Started %s with PID %s", self._name, self.proc.pid)
 
@@ -227,9 +227,13 @@ class AsyncProcess:
                         yield line
             except ValueError as err:
                 # we're waiting for a line (separator found), but the line was too big
+                # this may happen with ffmpeg during a long (radio) stream where progress
+                # gets outputted to the stderr but no newline
+                # https://stackoverflow.com/questions/55457370/how-to-avoid-valueerror-separator-is-not-found-and-chunk-exceed-the-limit
                 # NOTE: this consumes the line that was too big
                 if "chunk exceed the limit" in str(err):
                     continue
+                # raise for all other (value) errors
                 raise
 
     async def _feed_stdin(self, custom_stdin: AsyncGenerator[bytes, None]) -> None:
index 7150ac6c41e89ac9eff71dc15c0f9bb514e5d738..d8448fca256bf43c76e9aff4e02b862ac79df9ca 100644 (file)
@@ -18,7 +18,7 @@ from zeroconf import IPVersion, ServiceStateChange
 from zeroconf.asyncio import AsyncServiceInfo
 
 from music_assistant.common.helpers.datetime import utc
-from music_assistant.common.helpers.util import empty_queue, get_ip_pton, select_free_port
+from music_assistant.common.helpers.util import get_ip_pton, select_free_port
 from music_assistant.common.models.config_entries import (
     CONF_ENTRY_CROSSFADE,
     CONF_ENTRY_CROSSFADE_DURATION,
@@ -61,6 +61,8 @@ CONF_ALAC_ENCODE = "alac_encode"
 CONF_VOLUME_START = "volume_start"
 CONF_PASSWORD = "password"
 
+REQUIRED_BUFFER = int(44100 * (16 / 8) * 2) * 10  # 10 seconds
+
 
 PLAYER_CONFIG_ENTRIES = (
     CONF_ENTRY_CROSSFADE,
@@ -199,7 +201,6 @@ class AirplayStream:
         self._cliraop_proc: AsyncProcess | None = None
         self._ffmpeg_proc: AsyncProcess | None = None
         self._stop_requested = False
-        self.buffer = asyncio.Queue(5)
 
     @property
     def running(self) -> bool:
@@ -256,9 +257,6 @@ class AirplayStream:
         if platform.system() == "Darwin":
             os.environ["DYLD_LIBRARY_PATH"] = "/usr/local/lib"
 
-        # connect cliraop stdin with ffmpeg stdout using os pipes
-        read, write = os.pipe()
-
         # launch ffmpeg, feeding (player specific) audio chunks on stdout
         # one could argue that the intermediate ffmpeg towards cliraop is not needed
         # when there are no player specific filters or extras but in this case
@@ -267,25 +265,13 @@ class AirplayStream:
             input_format=self.input_format,
             output_format=AIRPLAY_PCM_FORMAT,
             filter_params=get_player_filter_params(self.mass, player_id),
+            loglevel="fatal",
         )
-
-        async def get_chunks() -> AsyncGenerator[bytes, None]:
-            while True:
-                chunk = await self.buffer.get()
-                if chunk == b"EOF":
-                    break
-                yield chunk
-
-        # launch ffmpeg process with player specific settings
-        # the stream_job_runner will start pushing pcm chunks to the stdin
-        # the ffmpeg process will send the output directly to the stdin of cliraop
         self._ffmpeg_proc = AsyncProcess(
             ffmpeg_args,
             enable_stdin=True,
             enable_stdout=True,
             enable_stderr=False,
-            custom_stdin=get_chunks(),
-            custom_stdout=write,
             name="cliraop_ffmpeg",
         )
         await self._ffmpeg_proc.start()
@@ -294,7 +280,7 @@ class AirplayStream:
             enable_stdin=True,
             enable_stdout=False,
             enable_stderr=True,
-            custom_stdin=read,
+            custom_stdin=self._audio_feeder(),
             name="cliraop",
         )
         await self._cliraop_proc.start()
@@ -305,7 +291,6 @@ class AirplayStream:
         if self._cliraop_proc.closed and self._ffmpeg_proc.closed:
             return
         self._stop_requested = True
-        empty_queue(self.buffer)
 
         async def _stop() -> None:
             # ffmpeg MUST be stopped before cliraop due to the chained pipes
@@ -321,6 +306,16 @@ class AirplayStream:
         if wait:
             await task
 
+    async def write_chunk(self, chunk: bytes) -> None:
+        """Write a (pcm) audio chunk to ffmpeg."""
+        await self._ffmpeg_proc.write(chunk)
+
+    async def write_eof(self) -> None:
+        """Write EOF to the ffmpeg stdin."""
+        await self._ffmpeg_proc.write_eof()
+        await self._ffmpeg_proc.wait()
+        await self.stop()
+
     async def send_cli_command(self, command: str) -> None:
         """Send an interactive command to the running CLIRaop binary."""
         if not self._cliraop_proc or self._cliraop_proc.closed:
@@ -386,8 +381,10 @@ class AirplayStream:
             if "lost packet out of backlog" in line:
                 lost_packets += 1
                 if lost_packets == 100:
-                    logger.warning("High packet loss detected, stopping playback...")
+                    logger.error("High packet loss detected, stopping playback...")
                     await self.stop(False)
+                elif lost_packets % 10 == 0:
+                    logger.warning("Packet loss detected!")
 
             logger.log(VERBOSE_LOG_LEVEL, line)
 
@@ -447,6 +444,24 @@ class AirplayStream:
         progress = int(queue.corrected_elapsed_time)
         await self.send_cli_command(f"PROGRESS={progress}\n")
 
+    async def _audio_feeder(self) -> AsyncGenerator[bytes, None]:
+        """Read chunks from ffmpeg and feed (buffered) to cliraop."""
+        buffer = b""
+        async for chunk in self._ffmpeg_proc.iter_any():
+            if self._stop_requested:
+                break
+            buffer += chunk
+            chunksize = len(chunk)
+            del chunk
+            while len(buffer) > REQUIRED_BUFFER:
+                yield buffer[:chunksize]
+                buffer = buffer[chunksize:]
+        # end of stream
+        if not self._stop_requested:
+            yield buffer
+            await self._cliraop_proc.write_eof()
+        del buffer
+
 
 @dataclass
 class AirPlayPlayer:
@@ -643,7 +658,7 @@ class AirplayProvider(PlayerProvider):
         """Handle streaming of audio to one or more airplay players."""
         # Python is not suitable for realtime audio streaming so we do the actual streaming
         # of (RAOP) audio using a small executable written in C based on libraop to do the actual
-        # timestamped playback, whicj reads pcm audio from stdin
+        # timestamped playback, which reads pcm audio from stdin
         # and we can send some interactive commands using a named pipe.
 
         # get current ntp before we start
@@ -668,8 +683,9 @@ class AirplayProvider(PlayerProvider):
                     if airplay_player.active_stream.start_ntp != start_ntp:
                         # checksum mismatch
                         continue
-                    tg.create_task(airplay_player.active_stream.buffer.put(chunk))
+                    tg.create_task(airplay_player.active_stream.write_chunk(chunk))
                     active_clients += 1
+
             if active_clients == 0:
                 # no more clients
                 return
@@ -681,7 +697,7 @@ class AirplayProvider(PlayerProvider):
                     or airplay_player.active_stream.start_ntp != start_ntp
                 ):
                     continue
-                tg.create_task(airplay_player.active_stream.buffer.put(b"EOF"))
+                tg.create_task(airplay_player.active_stream.write_eof())
 
     async def cmd_volume_set(self, player_id: str, volume_level: int) -> None:
         """Send VOLUME_SET command to given player.
index 5cdf4b880af4e9e26a7459e71ef6bc25a33adc80..0a903ca44c4a9d9b1cb9e4c93f2c0d160dd17d99 100644 (file)
@@ -360,6 +360,7 @@ class SnapCastProvider(PlayerProvider):
                 output_format=DEFAULT_SNAPCAST_FORMAT,
                 filter_params=get_player_filter_params(self.mass, player_id),
                 output_path=f"tcp://{host}:{port}",
+                loglevel="fatal",
             )
             try:
                 async with AsyncProcess(