Some optimizations to Airplay buffering (#1208)
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Sun, 7 Apr 2024 21:48:42 +0000 (23:48 +0200)
committerGitHub <noreply@github.com>
Sun, 7 Apr 2024 21:48:42 +0000 (23:48 +0200)
music_assistant/server/helpers/process.py
music_assistant/server/providers/airplay/__init__.py
music_assistant/server/providers/airplay/bin/cliraop-linux-aarch64
music_assistant/server/providers/airplay/bin/cliraop-linux-x86_64
music_assistant/server/providers/airplay/bin/cliraop-macos-arm64

index 9dc88511acc52558355df5e34adae5a2aef607f5..ca6e7139b4532a553069e046dbc5b312dce59de6 100644 (file)
@@ -23,7 +23,8 @@ from music_assistant.constants import MASS_LOGGER_NAME
 
 LOGGER = logging.getLogger(f"{MASS_LOGGER_NAME}.helpers.process")
 
-DEFAULT_CHUNKSIZE = 128000
+
+DEFAULT_CHUNKSIZE = 64000
 
 # pylint: disable=invalid-name
 
@@ -42,7 +43,7 @@ class AsyncProcess:
         args: list[str],
         stdin: bool | int | AsyncGenerator[bytes, None] | None = None,
         stdout: bool | int | None = None,
-        stderr: bool | int | None = False,
+        stderr: bool | int | None = None,
         name: str | None = None,
     ) -> None:
         """Initialize AsyncProcess."""
@@ -53,12 +54,9 @@ class AsyncProcess:
         self.attached_tasks: list[asyncio.Task] = []
         self.logger = LOGGER.getChild(name)
         self._args = args
-        self._stdin = stdin
-        self._stdout = stdout
-        self._stderr = stderr
-        self._stdin_enabled = stdin not in (None, False)
-        self._stdout_enabled = stdout not in (None, False)
-        self._stderr_enabled = stderr not in (None, False)
+        self._stdin = None if stdin is False else stdin
+        self._stdout = None if stdout is False else stdout
+        self._stderr = asyncio.subprocess.DEVNULL if stderr is False else stderr
         self._close_called = False
         self._returncode: bool | None = None
 
@@ -103,6 +101,10 @@ class AsyncProcess:
             else self._stdin,
             stdout=asyncio.subprocess.PIPE if self._stdout is True else self._stdout,
             stderr=asyncio.subprocess.PIPE if self._stderr is True else self._stderr,
+            # because we're exchanging big amounts of (audio) data with pipes
+            # it makes sense to extend the pipe size and (buffer) limits a bit
+            limit=1000000,
+            pipesize=1000000,
         )
         self.logger.debug("Process %s started with PID %s", self.name, self.proc.pid)
         if isinstance(self._stdin, AsyncGenerator):
@@ -172,12 +174,16 @@ class AsyncProcess:
         for task in self.attached_tasks:
             if not task.done():
                 task.cancel()
-                with suppress(asyncio.CancelledError):
-                    await task
         if send_signal and self.returncode is None:
             self.proc.send_signal(SIGINT)
-            # allow the process a bit of time to respond to the signal before we go nuclear
-            await asyncio.sleep(0.5)
+
+        # abort existing readers on stderr/stdout first before we send communicate
+        if self.proc.stdout and self.proc.stdout._waiter is not None:
+            self.proc.stdout._waiter.set_exception(asyncio.CancelledError())
+            self.proc.stdout._waiter = None
+        if self.proc.stderr and self.proc.stderr._waiter is not None:
+            self.proc.stderr._waiter.set_exception(asyncio.CancelledError())
+            self.proc.stderr._waiter = None
 
         # make sure the process is really cleaned up.
         # especially with pipes this can cause deadlocks if not properly guarded
@@ -185,13 +191,6 @@ class AsyncProcess:
         while True:
             try:
                 async with asyncio.timeout(5):
-                    # abort existing readers on stderr/stdout first before we send communicate
-                    if self.proc.stdout and self.proc.stdout._waiter is not None:
-                        self.proc.stdout._waiter.set_exception(asyncio.CancelledError())
-                        self.proc.stdout._waiter = None
-                    if self.proc.stderr and self.proc.stderr._waiter is not None:
-                        self.proc.stderr._waiter.set_exception(asyncio.CancelledError())
-                        self.proc.stderr._waiter = None
                     # use communicate to flush all pipe buffers
                     await self.proc.communicate()
                     if self.returncode is not None:
@@ -213,9 +212,8 @@ class AsyncProcess:
 
     async def wait(self) -> int:
         """Wait for the process and return the returncode."""
-        if self.returncode is not None:
-            return self.returncode
-        self._returncode = await self.proc.wait()
+        if self._returncode is None:
+            self._returncode = await self.proc.wait()
         return self._returncode
 
     async def communicate(self, input_data: bytes | None = None) -> tuple[bytes, bytes]:
index 3cc9acec907a8c466e2273efaaeb70c5fa76eb9e..a5ea4e8ee1b2a0423935f296f1af29501b00b4a8 100644 (file)
@@ -3,13 +3,11 @@
 from __future__ import annotations
 
 import asyncio
-import fcntl
 import logging
 import os
 import platform
 import socket
 import time
-from collections.abc import AsyncGenerator
 from contextlib import suppress
 from dataclasses import dataclass
 from random import randint, randrange
@@ -19,7 +17,7 @@ from zeroconf import IPVersion, ServiceStateChange
 from zeroconf.asyncio import AsyncServiceInfo
 
 from music_assistant.common.helpers.datetime import utc
-from music_assistant.common.helpers.util import get_ip_pton, select_free_port
+from music_assistant.common.helpers.util import empty_queue, get_ip_pton, select_free_port
 from music_assistant.common.models.config_entries import (
     CONF_ENTRY_CROSSFADE,
     CONF_ENTRY_CROSSFADE_DURATION,
@@ -197,17 +195,16 @@ class AirplayStream:
         # always generate a new active remote id to prevent race conditions
         # with the named pipe used to send audio
         self.active_remote_id: str = str(randint(1000, 8000))
-        self.start_ntp: int | None = None  # use as checksum
         self.prevent_playback: bool = False
         self.running = True
+        # audio_source_task will only exist for the main player in a sync group
+        self.audio_source_task: asyncio.Task | None = None
         self._log_reader_task: asyncio.Task | None = None
-        self._audio_reader_task: asyncio.Task | None = None
         self._cliraop_proc: AsyncProcess | None = None
         self._ffmpeg_proc: AsyncProcess | None = None
 
-    async def start(self, start_ntp: int) -> None:
+    async def start(self, start_ntp: int, wait_start: int = 1000) -> None:
         """Initialize CLIRaop process for a player."""
-        self.start_ntp = start_ntp
         extra_args = []
         player_id = self.airplay_player.player_id
         mass_player = self.mass.players.get(player_id)
@@ -235,7 +232,7 @@ class AirplayStream:
             "-port",
             str(self.airplay_player.discovery_info.port),
             "-wait",
-            str(2500 - sync_adjust),
+            str(wait_start - sync_adjust),
             "-volume",
             str(mass_player.volume_level),
             *extra_args,
@@ -251,26 +248,14 @@ class AirplayStream:
         if platform.system() == "Darwin":
             os.environ["DYLD_LIBRARY_PATH"] = "/usr/local/lib"
 
-        # launch ffmpeg, feeding (player specific) audio chunks on stdout
-        # one could argue that the intermediate ffmpeg towards cliraop is not needed
-        # when there are no player specific filters or extras but in this case
-        # ffmpeg serves as a small buffer towards the realtime cliraop streamer
-
-        # create pipes to interconnect ffmpeg with cliraop
-        def create_pipes() -> tuple[int, int]:
-            read, write = os.pipe()
-            if platform.system() == "Linux":
-                # extend the pipe buffer a bit for smoother playback
-                fcntl.fcntl(read, 1031, 1000000)
-                fcntl.fcntl(write, 1031, 1000000)
-            return (read, write)
-
-        read, write = await asyncio.to_thread(create_pipes)
-
+        # ffmpeg handles the player specific stream + filters and pipes
+        # audio to the cliraop process
+        read, write = await asyncio.to_thread(os.pipe)
         ffmpeg_args = get_ffmpeg_args(
             input_format=self.input_format,
             output_format=AIRPLAY_PCM_FORMAT,
             filter_params=get_player_filter_params(self.mass, player_id),
+            loglevel="fatal",
         )
         self._ffmpeg_proc = AsyncProcess(
             ffmpeg_args,
@@ -279,41 +264,35 @@ class AirplayStream:
             name="cliraop_ffmpeg",
         )
         await self._ffmpeg_proc.start()
-        os.close(write)
+        await asyncio.to_thread(os.close, write)
 
         self._cliraop_proc = AsyncProcess(
             cliraop_args, stdin=read, stdout=False, stderr=True, name="cliraop"
         )
         await self._cliraop_proc.start()
-        os.close(read)
+        await asyncio.to_thread(os.close, read)
         self._log_reader_task = asyncio.create_task(self._log_watcher())
 
-    async def stop(self, wait: bool = True):
+    async def stop(self):
         """Stop playback and cleanup."""
         self.running = False
+        if self.audio_source_task and not self.audio_source_task.done():
+            self.audio_source_task.cancel()
+        if not self._cliraop_proc.closed:
+            await self.send_cli_command("ACTION=STOP")
+        await self._cliraop_proc.wait()
 
-        async def _stop() -> None:
-            # ffmpeg MUST be stopped before cliraop due to the chained pipes
-            if not self._ffmpeg_proc.closed:
-                await self._ffmpeg_proc.close(True)
-            # allow the cliraop process to stop gracefully first
-            if not self._cliraop_proc.closed:
-                await self.send_cli_command("ACTION=STOP")
-                with suppress(TimeoutError):
-                    await asyncio.wait_for(self._cliraop_proc.wait(), 5)
-            # send regular close anyway (which also logs the returncode)
-            await self._cliraop_proc.close(True)
-
-        task = self.mass.create_task(_stop())
-        if wait:
-            await task
+        # ffmpeg can sometimes hang due to the connected pipes
+        # we handle closing it but it can be a bit slow so do that in the background
+        if not self._ffmpeg_proc.closed:
+            self.mass.create_task(self._ffmpeg_proc.close(True))
 
     async def write_chunk(self, chunk: bytes) -> None:
-        """Write a (pcm) audio chunk to ffmpeg."""
+        """Write a (pcm) audio chunk."""
         await self._ffmpeg_proc.write(chunk)
 
     async def write_eof(self) -> None:
-        """Write EOF to the ffmpeg stdin."""
+        """Write EOF."""
         await self._ffmpeg_proc.write_eof()
 
     async def send_cli_command(self, command: str) -> None:
@@ -321,7 +300,6 @@ class AirplayStream:
         if not self._cliraop_proc or self._cliraop_proc.closed:
             return
 
-        named_pipe = f"/tmp/fifo-{self.active_remote_id}"  # noqa: S108
         if not command.endswith("\n"):
             command += "\n"
 
@@ -329,6 +307,7 @@ class AirplayStream:
             with suppress(BrokenPipeError), open(named_pipe, "w") as f:
                 f.write(command)
 
+        named_pipe = f"/tmp/raop-{self.active_remote_id}"  # noqa: S108
         self.airplay_player.logger.log(VERBOSE_LOG_LEVEL, "sending command %s", command)
         await self.mass.create_task(send_data)
 
@@ -383,9 +362,9 @@ class AirplayStream:
             if "lost packet out of backlog" in line:
                 lost_packets += 1
                 if lost_packets == 100:
-                    logger.error("High packet loss detected, stopping playback...")
-                    await self.stop(False)
-                elif lost_packets % 10 == 0:
+                    logger.error("High packet loss detected, restarting playback...")
+                    self.mass.create_task(self.mass.player_queues.resume(queue.queue_id))
+                else:
                     logger.warning("Packet loss detected!")
 
             logger.log(VERBOSE_LOG_LEVEL, line)
@@ -395,9 +374,8 @@ class AirplayStream:
         if airplay_player.active_stream == self:
             mass_player.state = PlayerState.IDLE
             self.mass.players.update(airplay_player.player_id)
-        # ensure we're cleaned up afterwards
-        if self._ffmpeg_proc.returncode is None or self._cliraop_proc.returncode is None:
-            await self.stop()
+        # ensure we're cleaned up afterwards (this also logs the returncode)
+        await self.stop()
 
     async def _send_metadata(self, queue: PlayerQueue) -> None:
         """Send metadata to player (and connected sync childs)."""
@@ -605,7 +583,7 @@ class AirplayProvider(PlayerProvider):
         async with asyncio.TaskGroup() as tg:
             for airplay_player in self._get_sync_clients(player_id):
                 if airplay_player.active_stream and airplay_player.active_stream.running:
-                    tg.create_task(airplay_player.active_stream.stop(wait=False))
+                    tg.create_task(airplay_player.active_stream.stop())
         # select audio source
         if media.media_type == MediaType.ANNOUNCEMENT:
             # special case: stream announcement
@@ -640,48 +618,62 @@ class AirplayProvider(PlayerProvider):
                 input_format=AudioFormat(ContentType.try_parse(media.uri)),
                 output_format=AIRPLAY_PCM_FORMAT,
             )
-        self.mass.create_task(self._handle_stream_audio, player_id, audio_source, input_format)
 
-    async def _handle_stream_audio(
-        self, player_id: str, audio_source: AsyncGenerator[bytes, None], input_format: AudioFormat
-    ) -> None:
-        """Handle streaming of audio to one or more airplay players."""
         # Python is not suitable for realtime audio streaming so we do the actual streaming
         # of (RAOP) audio using a small executable written in C based on libraop to do the actual
         # timestamped playback, which reads pcm audio from stdin
         # and we can send some interactive commands using a named pipe.
 
-        # get current ntp before we start
-        _, stdout = await check_output(f"{self.cliraop_bin} -ntp")
-        start_ntp = int(stdout.strip())
-
-        # setup Raop process for player and its sync childs
-        for airplay_player in self._get_sync_clients(player_id):
+        # setup AirplayStream for player and its sync childs
+        sync_clients = self._get_sync_clients(player_id)
+        for airplay_player in sync_clients:
             airplay_player.active_stream = AirplayStream(
                 self, airplay_player, input_format=input_format
             )
-            self.mass.create_task(airplay_player.active_stream.start(start_ntp))
-
-        async for chunk in audio_source:
-            active_clients = 0
-            async with asyncio.TaskGroup() as tg:
-                for airplay_player in self._get_sync_clients(player_id):
-                    if not airplay_player.active_stream or not airplay_player.active_stream.running:
-                        # player stopped or switched to a new stream
-                        continue
-                    if airplay_player.active_stream.start_ntp != start_ntp:
-                        # checksum mismatch
-                        continue
-                    tg.create_task(airplay_player.active_stream.write_chunk(chunk))
-                    active_clients += 1
-
-            if active_clients == 0:
-                # no more clients
-                return
-        # entire stream consumed: send EOF (empty chunk)
+
+        # use a buffer here to consume small hiccups as the
+        # raop streaming is pretty much realtime and without a buffer to stdin
+        buffer: asyncio.Queue[bytes] = asyncio.Queue(10)
+
+        async def fill_buffer() -> None:
+            async for chunk in audio_source:
+                await buffer.put(chunk)
+            await buffer.put(b"EOF")
+
+        fill_buffer_task = asyncio.create_task(fill_buffer())
+
+        async def audio_streamer() -> None:
+            try:
+                while True:
+                    chunk = await buffer.get()
+                    if chunk == b"EOF":
+                        break
+                    async with asyncio.TaskGroup() as tg:
+                        for airplay_player in sync_clients:
+                            tg.create_task(airplay_player.active_stream.write_chunk(chunk))
+
+                # entire stream consumed: send EOF
+                for airplay_player in sync_clients:
+                    self.mass.create_task(airplay_player.active_stream.write_eof())
+            finally:
+                if not fill_buffer_task.done():
+                    fill_buffer_task.cancel()
+                    # make sure the stdin generator is also properly closed
+                    # by propagating a cancellederror within
+                    task = asyncio.create_task(audio_source.__anext__())
+                    task.cancel()
+                empty_queue(buffer)
+
+        # get current ntp and start cliraop
+        _, stdout = await check_output(f"{self.cliraop_bin} -ntp")
+        start_ntp = int(stdout.strip())
+        wait_start = 1000 + (500 * len(sync_clients))
         async with asyncio.TaskGroup() as tg:
             for airplay_player in self._get_sync_clients(player_id):
-                tg.create_task(airplay_player.active_stream.write_eof())
+                tg.create_task(airplay_player.active_stream.start(start_ntp, wait_start))
+        self._players[player_id].active_stream.audio_source_task = asyncio.create_task(
+            audio_streamer()
+        )
 
     async def cmd_volume_set(self, player_id: str, volume_level: int) -> None:
         """Send VOLUME_SET command to given player.
@@ -971,14 +963,14 @@ class AirplayProvider(PlayerProvider):
         count = 0
         if not (airplay_player := self._players.get(player_id)):
             return
-        prev_ntp = airplay_player.active_stream.start_ntp
+        prev_active_remote_id = airplay_player.active_stream.active_remote_id
         while count < 40:
             count += 1
             if not (airplay_player := self._players.get(player_id)):
                 return
             if not (active_stream := airplay_player.active_stream):
                 return
-            if active_stream.start_ntp != prev_ntp:
+            if active_stream.active_remote_id != prev_active_remote_id:
                 # checksum
                 return
             if not active_stream.prevent_playback:
index 2e0a8e326566f53a3c36f345f2dd50135c24f218..5de82f58543c029cf3e5061def34ca53e4244485 100755 (executable)
Binary files a/music_assistant/server/providers/airplay/bin/cliraop-linux-aarch64 and b/music_assistant/server/providers/airplay/bin/cliraop-linux-aarch64 differ
index ed7ddfbc1eb964bd81ab4702aa31bd8dc561a0cb..dada71fa0ddf05850fe570d0a112a9906cfb28c0 100755 (executable)
Binary files a/music_assistant/server/providers/airplay/bin/cliraop-linux-x86_64 and b/music_assistant/server/providers/airplay/bin/cliraop-linux-x86_64 differ
index fbd633a0bf6f5f51d010561a1588819ddeb2cfe1..2bd3bfb4555ee12af3ef9290667e3873506adad1 100755 (executable)
Binary files a/music_assistant/server/providers/airplay/bin/cliraop-macos-arm64 and b/music_assistant/server/providers/airplay/bin/cliraop-macos-arm64 differ