Add small buffer in Airplay streaming logic (#1168)
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Sat, 23 Mar 2024 23:17:52 +0000 (00:17 +0100)
committerGitHub <noreply@github.com>
Sat, 23 Mar 2024 23:17:52 +0000 (00:17 +0100)
music_assistant/server/controllers/streams.py
music_assistant/server/providers/airplay/__init__.py

index 9174f962252c155a996b012a0f01d1878ca3bbf5..1d701fb119176c8a7d3d6613686d8490d2d35cf4 100644 (file)
@@ -458,7 +458,6 @@ class StreamsController(CoreController):
                 return existing_job
             # cleanup existing job first
             if not existing_job.finished:
-                self.logger.warning("Detected existing (running) stream job for queue %s", queue_id)
                 existing_job.stop()
         self.multi_client_jobs[queue_id] = stream_job = MultiClientStreamJob(
             self,
index b1ced2d82a1e4de65370174e9ca7f8873aa87430..7150ac6c41e89ac9eff71dc15c0f9bb514e5d738 100644 (file)
@@ -18,7 +18,7 @@ from zeroconf import IPVersion, ServiceStateChange
 from zeroconf.asyncio import AsyncServiceInfo
 
 from music_assistant.common.helpers.datetime import utc
-from music_assistant.common.helpers.util import get_ip_pton, select_free_port
+from music_assistant.common.helpers.util import empty_queue, get_ip_pton, select_free_port
 from music_assistant.common.models.config_entries import (
     CONF_ENTRY_CROSSFADE,
     CONF_ENTRY_CROSSFADE_DURATION,
@@ -199,6 +199,7 @@ class AirplayStream:
         self._cliraop_proc: AsyncProcess | None = None
         self._ffmpeg_proc: AsyncProcess | None = None
         self._stop_requested = False
+        self.buffer = asyncio.Queue(5)
 
     @property
     def running(self) -> bool:
@@ -267,14 +268,23 @@ class AirplayStream:
             output_format=AIRPLAY_PCM_FORMAT,
             filter_params=get_player_filter_params(self.mass, player_id),
         )
+
+        async def get_chunks() -> AsyncGenerator[bytes, None]:
+            while True:
+                chunk = await self.buffer.get()
+                if chunk == b"EOF":
+                    break
+                yield chunk
+
         # launch ffmpeg process with player specific settings
         # the stream_job_runner will start pushing pcm chunks to the stdin
-        # the ffmpeg process will send the output directly to the given path (e.g. tcp socket)
+        # the ffmpeg process will send the output directly to the stdin of cliraop
         self._ffmpeg_proc = AsyncProcess(
             ffmpeg_args,
             enable_stdin=True,
             enable_stdout=True,
             enable_stderr=False,
+            custom_stdin=get_chunks(),
             custom_stdout=write,
             name="cliraop_ffmpeg",
         )
@@ -285,25 +295,17 @@ class AirplayStream:
             enable_stdout=False,
             enable_stderr=True,
             custom_stdin=read,
+            name="cliraop",
         )
         await self._cliraop_proc.start()
         self._log_reader_task = asyncio.create_task(self._log_watcher())
 
-    async def write_chunk(self, chunk: bytes) -> None:
-        """Write a (pcm) audio chunk to the player."""
-        await self._ffmpeg_proc.write(chunk)
-
-    async def write_eof(self) -> None:
-        """Write EOF to the ffmpeg stdin."""
-        await self._ffmpeg_proc.write_eof()
-        await self._ffmpeg_proc.wait()
-        await self.stop()
-
     async def stop(self, wait: bool = True):
         """Stop playback and cleanup."""
         if self._cliraop_proc.closed and self._ffmpeg_proc.closed:
             return
         self._stop_requested = True
+        empty_queue(self.buffer)
 
         async def _stop() -> None:
             # ffmpeg MUST be stopped before cliraop due to the chained pipes
@@ -666,7 +668,7 @@ class AirplayProvider(PlayerProvider):
                     if airplay_player.active_stream.start_ntp != start_ntp:
                         # checksum mismatch
                         continue
-                    tg.create_task(airplay_player.active_stream.write_chunk(chunk))
+                    tg.create_task(airplay_player.active_stream.buffer.put(chunk))
                     active_clients += 1
             if active_clients == 0:
                 # no more clients
@@ -679,7 +681,7 @@ class AirplayProvider(PlayerProvider):
                     or airplay_player.active_stream.start_ntp != start_ntp
                 ):
                     continue
-                tg.create_task(airplay_player.active_stream.write_eof())
+                tg.create_task(airplay_player.active_stream.buffer.put(b"EOF"))
 
     async def cmd_volume_set(self, player_id: str, volume_level: int) -> None:
         """Send VOLUME_SET command to given player.