Some small improvements to the Airplay provider (#2032)
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Wed, 12 Mar 2025 21:37:41 +0000 (22:37 +0100)
committerGitHub <noreply@github.com>
Wed, 12 Mar 2025 21:37:41 +0000 (22:37 +0100)
music_assistant/providers/airplay/provider.py
music_assistant/providers/airplay/raop.py

index 0d405953ea59b80a64a76fef823755322d7a7e76..1c7395aee787c0131603df81a3f257618126cb48 100644 (file)
@@ -309,9 +309,6 @@ class AirplayProvider(PlayerProvider):
         # this accounts for syncgroups and linked players (e.g. sonos)
         player.active_source = media.queue_id
         player.current_media = media
-        # always stop existing stream first
-        if airplay_player.raop_stream and airplay_player.raop_stream.running:
-            await airplay_player.cmd_stop()
 
         # select audio source
         if media.media_type == MediaType.ANNOUNCEMENT:
@@ -370,6 +367,12 @@ class AirplayProvider(PlayerProvider):
                 input_format=AudioFormat(content_type=ContentType.try_parse(media.uri)),
                 output_format=AIRPLAY_PCM_FORMAT,
             )
+
+        # if an existing stream session is running, replace it with the new stream
+        if airplay_player.raop_stream and airplay_player.raop_stream.running:
+            await airplay_player.raop_stream.session.replace_stream(audio_source)
+            return
+
         # setup RaopStreamSession for player (and its sync childs if any)
         sync_clients = self._get_sync_clients(player_id)
         raop_stream_session = RaopStreamSession(self, sync_clients, input_format, audio_source)
@@ -423,14 +426,17 @@ class AirplayProvider(PlayerProvider):
         parent_player.group_childs.append(parent_player.player_id)
         parent_player.group_childs.append(child_player.player_id)
         child_player.synced_to = parent_player.player_id
+
         # check if we should (re)start or join a stream session
         active_queue = self.mass.player_queues.get_active_queue(parent_player.player_id)
         if active_queue.state == PlayerState.PLAYING:
             # playback needs to be restarted to form a new multi client stream session
+            # TODO: allow late joining to existing stream
+            await self.mass.player_queues.stop(active_queue.queue_id)
             # this could potentially be called by multiple players at the exact same time
             # so we debounce the resync a bit here with a timer
             self.mass.call_later(
-                1,
+                0.5,
                 self.mass.player_queues.resume,
                 active_queue.queue_id,
                 fade_in=False,
index a0183eb609a712eaa8defe8eafef9be47f33799f..62cae6f42afe165bb690965c5fcab73db2d88b12 100644 (file)
@@ -16,7 +16,7 @@ from music_assistant_models.enums import PlayerState
 from music_assistant_models.errors import PlayerCommandFailed
 
 from music_assistant.constants import CONF_SYNC_ADJUST, VERBOSE_LOG_LEVEL
-from music_assistant.helpers.audio import get_player_filter_params
+from music_assistant.helpers.audio import get_chunksize, get_player_filter_params
 from music_assistant.helpers.ffmpeg import FFMpeg
 from music_assistant.helpers.process import AsyncProcess, check_output
 from music_assistant.helpers.util import TaskManager, close_async_generator
@@ -62,48 +62,6 @@ class RaopStreamSession:
         """Initialize RaopStreamSession."""
         # initialize raop stream for all players
 
-        async def audio_streamer() -> None:
-            """Stream audio to all players."""
-            generator_exhausted = False
-            try:
-                async for chunk in self._audio_source:
-                    async with self._lock:
-                        sync_clients = [
-                            x for x in self._sync_clients if x.raop_stream and x.raop_stream.running
-                        ]
-                        if not sync_clients:
-                            return
-                        await asyncio.gather(
-                            *[
-                                x.raop_stream.write_chunk(chunk)
-                                for x in sync_clients
-                                if x.raop_stream
-                            ],
-                            return_exceptions=True,
-                        )
-                # entire stream consumed: send EOF
-                generator_exhausted = True
-                async with self._lock:
-                    await asyncio.gather(
-                        *[
-                            x.raop_stream.write_eof()
-                            for x in self._sync_clients
-                            if x.raop_stream and x.raop_stream.running
-                        ],
-                        return_exceptions=True,
-                    )
-            except Exception as err:
-                logger = self.prov.logger
-                logger.error(
-                    "Stream error: %s",
-                    str(err) or err.__class__.__name__,
-                    exc_info=err if logger.isEnabledFor(logging.DEBUG) else None,
-                )
-                raise
-            finally:
-                if not generator_exhausted:
-                    await close_async_generator(self._audio_source)
-
         # get current ntp and start RaopStream per player
         assert self.prov.cliraop_bin
         _, stdout = await check_output(self.prov.cliraop_bin, "-ntp")
@@ -121,7 +79,7 @@ class RaopStreamSession:
         async with TaskManager(self.mass) as tm:
             for _raop_player in self._sync_clients:
                 tm.create_task(_start_client(_raop_player))
-        self._audio_source_task = asyncio.create_task(audio_streamer())
+        self._audio_source_task = asyncio.create_task(self._audio_streamer())
 
     async def stop(self) -> None:
         """Stop playback and cleanup."""
@@ -140,7 +98,8 @@ class RaopStreamSession:
             return
         assert airplay_player.raop_stream
         assert airplay_player.raop_stream.session == self
-        self._sync_clients.remove(airplay_player)
+        async with self._lock:
+            self._sync_clients.remove(airplay_player)
         await airplay_player.raop_stream.stop()
         airplay_player.raop_stream = None
 
@@ -150,6 +109,67 @@ class RaopStreamSession:
         # e.g. by counting the number of frames sent etc.
         raise NotImplementedError("Adding clients to a session is not yet supported")
 
+    async def replace_stream(self, audio_source: AsyncGenerator[bytes, None]) -> None:
+        """Replace the audio source of the stream."""
+        # cancel the per-player ffmpeg reader tasks
+        for _raop_player in self._sync_clients:
+            assert _raop_player.raop_stream  # for type checker
+            assert _raop_player.raop_stream.ffmpeg_reader_task  # for type checker
+            _raop_player.raop_stream.ffmpeg_reader_task.cancel()
+            with suppress(asyncio.CancelledError):
+                await _raop_player.raop_stream.ffmpeg_reader_task
+        # cancel the current audio source task
+        assert self._audio_source_task  # for type checker
+        self._audio_source_task.cancel()
+        with suppress(asyncio.CancelledError):
+            await self._audio_source_task
+        # set new audio source and restart the stream
+        self._audio_source = audio_source
+        self._audio_source_task = asyncio.create_task(self._audio_streamer())
+        for _raop_player in self._sync_clients:
+            assert _raop_player.raop_stream  # for type checker
+            _raop_player.raop_stream.ffmpeg_reader_task = self.mass.create_task(
+                _raop_player.raop_stream.ffmpeg_reader()
+            )
+
+    async def _audio_streamer(self) -> None:
+        """Stream audio to all players."""
+        generator_exhausted = False
+        try:
+            async for chunk in self._audio_source:
+                async with self._lock:
+                    sync_clients = [
+                        x for x in self._sync_clients if x.raop_stream and x.raop_stream.running
+                    ]
+                    if not sync_clients:
+                        return
+                    await asyncio.gather(
+                        *[x.raop_stream.write_chunk(chunk) for x in sync_clients if x.raop_stream],
+                        return_exceptions=True,
+                    )
+            # entire stream consumed: send EOF
+            generator_exhausted = True
+            async with self._lock:
+                await asyncio.gather(
+                    *[
+                        x.raop_stream.write_eof()
+                        for x in self._sync_clients
+                        if x.raop_stream and x.raop_stream.running
+                    ],
+                    return_exceptions=True,
+                )
+        except Exception as err:
+            logger = self.prov.logger
+            logger.error(
+                "Stream error: %s",
+                str(err) or err.__class__.__name__,
+                exc_info=err if logger.isEnabledFor(logging.DEBUG) else None,
+            )
+            raise
+        finally:
+            if not generator_exhausted:
+                await close_async_generator(self._audio_source)
+
 
 class RaopStream:
     """
@@ -179,8 +199,11 @@ class RaopStream:
         self._stderr_reader_task: asyncio.Task[None] | None = None
         self._cliraop_proc: AsyncProcess | None = None
         self._ffmpeg_proc: AsyncProcess | None = None
+        self.ffmpeg_reader_task: asyncio.Task[None] | None = None
         self._started = asyncio.Event()
         self._stopped = False
+        self._total_bytes_sent = 0
+        self._stream_bytes_sent = 0
 
     @property
     def running(self) -> bool:
@@ -226,21 +249,9 @@ class RaopStream:
         read_ahead = await self.mass.config.get_player_config_value(
             player_id, CONF_READ_AHEAD_BUFFER
         )
-        # create os pipes to pipe ffmpeg to cliraop
-        read, write = await asyncio.to_thread(os.pipe)
         # ffmpeg handles the player specific stream + filters and pipes
         # audio to the cliraop process
-        self._ffmpeg_proc = FFMpeg(
-            audio_input="-",
-            input_format=self.session.input_format,
-            output_format=AIRPLAY_PCM_FORMAT,
-            filter_params=get_player_filter_params(
-                self.mass, player_id, self.session.input_format, AIRPLAY_PCM_FORMAT
-            ),
-            audio_output=write,
-        )
-        await self._ffmpeg_proc.start()
-        await asyncio.to_thread(os.close, write)
+        self.ffmpeg_reader_task = self.mass.create_task(self.ffmpeg_reader())
 
         # cliraop is the binary that handles the actual raop streaming to the player
         # this is a slightly modified bversion of philippe44's libraop
@@ -270,11 +281,10 @@ class RaopStream:
             self.airplay_player.address,
             "-",
         ]
-        self._cliraop_proc = AsyncProcess(cliraop_args, stdin=read, stderr=True, name="cliraop")
+        self._cliraop_proc = AsyncProcess(cliraop_args, stdin=True, stderr=True, name="cliraop")
         if platform.system() == "Darwin":
             os.environ["DYLD_LIBRARY_PATH"] = "/usr/local/lib"
         await self._cliraop_proc.start()
-        await asyncio.to_thread(os.close, read)
         # read first 10 lines of stderr to get the initial status
         for _ in range(10):
             line = (await self._cliraop_proc.read_stderr()).decode("utf-8", errors="ignore")
@@ -283,6 +293,7 @@ class RaopStream:
                 self._started.set()
                 break
             if "Cannot connect to AirPlay device" in line:
+                self.ffmpeg_reader_task.cancel()
                 raise PlayerCommandFailed("Cannot connect to AirPlay device")
         # start reading the stderr of the cliraop process from another task
         self._stderr_reader_task = self.mass.create_task(self._stderr_reader())
@@ -298,12 +309,10 @@ class RaopStream:
             await self._cliraop_proc.wait_with_timeout(2)
         if self._stderr_reader_task and not self._stderr_reader_task.done():
             self._stderr_reader_task.cancel()
+        if self.ffmpeg_reader_task and not self.ffmpeg_reader_task.done():
+            self.ffmpeg_reader_task.cancel()
         if self._cliraop_proc.proc and not self._cliraop_proc.closed:
             await self._cliraop_proc.close(True)
-        if self._ffmpeg_proc and not self._ffmpeg_proc.closed:
-            await self._ffmpeg_proc.close(True)
-        self._cliraop_proc = None
-        self._ffmpeg_proc = None
 
     async def write_chunk(self, chunk: bytes) -> None:
         """Write a (pcm) audio chunk."""
@@ -339,7 +348,46 @@ class RaopStream:
         self.airplay_player.last_command_sent = time.time()
         await asyncio.to_thread(send_data)
 
-    async def _stderr_reader(self) -> None:  # noqa: PLR0915
+    async def ffmpeg_reader(self) -> None:
+        """Read audio from the audio source and pipe it to the CLIRaop process."""
+        self._ffmpeg_proc = FFMpeg(
+            audio_input="-",
+            input_format=self.session.input_format,
+            output_format=AIRPLAY_PCM_FORMAT,
+            filter_params=get_player_filter_params(
+                self.mass,
+                self.airplay_player.player_id,
+                self.session.input_format,
+                AIRPLAY_PCM_FORMAT,
+            ),
+        )
+        self._stream_bytes_sent = 0
+        mass_player = self.mass.players.get(self.airplay_player.player_id)
+        assert mass_player  # for type checker
+        try:
+            await self._ffmpeg_proc.start()
+            chunksize = get_chunksize(AIRPLAY_PCM_FORMAT)
+            # wait for cliraop to be ready
+            await asyncio.wait_for(self._started.wait(), 20)
+            async for chunk in self._ffmpeg_proc.iter_chunked(chunksize):
+                if self._stopped:
+                    break
+                if not self._cliraop_proc or self._cliraop_proc.closed:
+                    break
+                await self._cliraop_proc.write(chunk)
+                self._stream_bytes_sent += len(chunk)
+                self._total_bytes_sent += len(chunk)
+                del chunk
+                # we base elapsed time on the amount of bytes sent
+                # so we can account for reusing the same session for multiple streams
+                mass_player.elapsed_time = self._stream_bytes_sent / chunksize
+                mass_player.elapsed_time_last_updated = time.time()
+            if self._cliraop_proc and not self._cliraop_proc.closed:
+                await self._cliraop_proc.write_eof()
+        finally:
+            await self._ffmpeg_proc.close()
+
+    async def _stderr_reader(self) -> None:
         """Monitor stderr for the running CLIRaop process."""
         airplay_player = self.airplay_player
         mass_player = self.mass.players.get(airplay_player.player_id)
@@ -355,14 +403,14 @@ class RaopStream:
         async for line in self._cliraop_proc.iter_stderr():
             if "elapsed milliseconds:" in line:
                 # this is received more or less every second while playing
-                millis = int(line.split("elapsed milliseconds: ")[1])
-                mass_player.elapsed_time = millis / 1000
-                mass_player.elapsed_time_last_updated = time.time()
+                millis = int(line.split("elapsed milliseconds: ")[1])
+                # mass_player.elapsed_time = (millis / 1000) - self.elapsed_time_correction
+                mass_player.elapsed_time_last_updated = time.time()
                 # send metadata to player(s) if needed
                 # NOTE: this must all be done in separate tasks to not disturb audio
                 now = time.time()
                 if (
-                    mass_player.elapsed_time > 2
+                    (mass_player.elapsed_time or 0) > 2
                     and queue
                     and queue.current_item
                     and queue.current_item.streamdetails