From: Marcel van der Veldt Date: Sat, 23 Mar 2024 23:17:52 +0000 (+0100) Subject: Add small buffer in Airplay streaming logic (#1168) X-Git-Url: https://git.kitaultman.com/?a=commitdiff_plain;h=cc2c03c69dfc7b665724db2ba621d18ef590d142;p=music-assistant-server.git Add small buffer in Airplay streaming logic (#1168) --- diff --git a/music_assistant/server/controllers/streams.py b/music_assistant/server/controllers/streams.py index 9174f962..1d701fb1 100644 --- a/music_assistant/server/controllers/streams.py +++ b/music_assistant/server/controllers/streams.py @@ -458,7 +458,6 @@ class StreamsController(CoreController): return existing_job # cleanup existing job first if not existing_job.finished: - self.logger.warning("Detected existing (running) stream job for queue %s", queue_id) existing_job.stop() self.multi_client_jobs[queue_id] = stream_job = MultiClientStreamJob( self, diff --git a/music_assistant/server/providers/airplay/__init__.py b/music_assistant/server/providers/airplay/__init__.py index b1ced2d8..7150ac6c 100644 --- a/music_assistant/server/providers/airplay/__init__.py +++ b/music_assistant/server/providers/airplay/__init__.py @@ -18,7 +18,7 @@ from zeroconf import IPVersion, ServiceStateChange from zeroconf.asyncio import AsyncServiceInfo from music_assistant.common.helpers.datetime import utc -from music_assistant.common.helpers.util import get_ip_pton, select_free_port +from music_assistant.common.helpers.util import empty_queue, get_ip_pton, select_free_port from music_assistant.common.models.config_entries import ( CONF_ENTRY_CROSSFADE, CONF_ENTRY_CROSSFADE_DURATION, @@ -199,6 +199,7 @@ class AirplayStream: self._cliraop_proc: AsyncProcess | None = None self._ffmpeg_proc: AsyncProcess | None = None self._stop_requested = False + self.buffer = asyncio.Queue(5) @property def running(self) -> bool: @@ -267,14 +268,23 @@ class AirplayStream: output_format=AIRPLAY_PCM_FORMAT, filter_params=get_player_filter_params(self.mass, player_id), ) + + async def get_chunks() -> AsyncGenerator[bytes, None]: + while True: + chunk = await self.buffer.get() + if chunk == b"EOF": + break + yield chunk + # launch ffmpeg process with player specific settings # the stream_job_runner will start pushing pcm chunks to the stdin - # the ffmpeg process will send the output directly to the given path (e.g. tcp socket) + # the ffmpeg process will send the output directly to the stdin of cliraop self._ffmpeg_proc = AsyncProcess( ffmpeg_args, enable_stdin=True, enable_stdout=True, enable_stderr=False, + custom_stdin=get_chunks(), custom_stdout=write, name="cliraop_ffmpeg", ) @@ -285,25 +295,17 @@ class AirplayStream: enable_stdout=False, enable_stderr=True, custom_stdin=read, + name="cliraop", ) await self._cliraop_proc.start() self._log_reader_task = asyncio.create_task(self._log_watcher()) - async def write_chunk(self, chunk: bytes) -> None: - """Write a (pcm) audio chunk to the player.""" - await self._ffmpeg_proc.write(chunk) - - async def write_eof(self) -> None: - """Write EOF to the ffmpeg stdin.""" - await self._ffmpeg_proc.write_eof() - await self._ffmpeg_proc.wait() - await self.stop() - async def stop(self, wait: bool = True): """Stop playback and cleanup.""" if self._cliraop_proc.closed and self._ffmpeg_proc.closed: return self._stop_requested = True + empty_queue(self.buffer) async def _stop() -> None: # ffmpeg MUST be stopped before cliraop due to the chained pipes @@ -666,7 +668,7 @@ class AirplayProvider(PlayerProvider): if airplay_player.active_stream.start_ntp != start_ntp: # checksum mismatch continue - tg.create_task(airplay_player.active_stream.write_chunk(chunk)) + tg.create_task(airplay_player.active_stream.buffer.put(chunk)) active_clients += 1 if active_clients == 0: # no more clients @@ -679,7 +681,7 @@ class AirplayProvider(PlayerProvider): or airplay_player.active_stream.start_ntp != start_ntp ): continue - tg.create_task(airplay_player.active_stream.write_eof()) + tg.create_task(airplay_player.active_stream.buffer.put(b"EOF")) async def cmd_volume_set(self, player_id: str, volume_level: int) -> None: """Send VOLUME_SET command to given player.