Fix various issues in the Airplay provider
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Thu, 23 Jan 2025 21:05:24 +0000 (22:05 +0100)
committerMarcel van der Veldt <m.vanderveldt@outlook.com>
Thu, 23 Jan 2025 21:05:24 +0000 (22:05 +0100)
- Added some more locking
- Wait for cliraop to exit when sending stop
- Exit audiostream loop when all clients dissapeared
- Fix provider reload

music_assistant/providers/airplay/player.py
music_assistant/providers/airplay/provider.py
music_assistant/providers/airplay/raop.py

index 8eed6323e9a4dc888007f005487edcaba69cfbb4..ee09e1ef704f595bb7e9962473618ad230981cfd 100644 (file)
@@ -2,6 +2,7 @@
 
 from __future__ import annotations
 
+import asyncio
 from typing import TYPE_CHECKING
 
 from music_assistant_models.enums import PlayerState
@@ -28,23 +29,27 @@ class AirPlayPlayer:
         self.logger = prov.logger.getChild(player_id)
         self.raop_stream: RaopStream | None = None
         self.last_command_sent = 0.0
+        self._lock = asyncio.Lock()
 
     async def cmd_stop(self, update_state: bool = True) -> None:
         """Send STOP command to player."""
-        if self.raop_stream:
-            # forward stop to the entire stream session
-            await self.raop_stream.session.stop()
-        if update_state and (mass_player := self.mass.players.get(self.player_id)):
-            mass_player.state = PlayerState.IDLE
-            self.mass.players.update(mass_player.player_id)
+        async with self._lock:
+            if self.raop_stream:
+                # forward stop to the entire stream session
+                await self.raop_stream.session.stop()
+            if update_state and (mass_player := self.mass.players.get(self.player_id)):
+                mass_player.state = PlayerState.IDLE
+                self.mass.players.update(mass_player.player_id)
 
     async def cmd_play(self) -> None:
         """Send PLAY (unpause) command to player."""
-        if self.raop_stream and self.raop_stream.running:
-            await self.raop_stream.send_cli_command("ACTION=PLAY")
+        async with self._lock:
+            if self.raop_stream and self.raop_stream.running:
+                await self.raop_stream.send_cli_command("ACTION=PLAY")
 
     async def cmd_pause(self) -> None:
         """Send PAUSE command to player."""
-        if not self.raop_stream or not self.raop_stream.running:
-            return
-        await self.raop_stream.send_cli_command("ACTION=PAUSE")
+        async with self._lock:
+            if not self.raop_stream or not self.raop_stream.running:
+                return
+            await self.raop_stream.send_cli_command("ACTION=PAUSE")
index 70a035fd026412ebe6c3b1e034b62f3886a72dff..16067b26c0e046ddde0b52b979c70a00f4de1f6a 100644 (file)
@@ -221,8 +221,8 @@ class AirplayProvider(PlayerProvider):
     async def unload(self, is_removed: bool = False) -> None:
         """Handle unload/close of the provider."""
         # power off all players (will disconnect and close cliraop)
-        for player_id in self._players:
-            await self.cmd_power(player_id, False)
+        for player in self._players.values():
+            await player.cmd_stop()
         # shutdown DACP server
         if self._dacp_server:
             self._dacp_server.close()
index 93185373431e83850d732b464b0cfd2b57f657ee..585fdf40facabffaa7a20a3341567024a360e39c 100644 (file)
@@ -13,12 +13,13 @@ from random import randint
 from typing import TYPE_CHECKING
 
 from music_assistant_models.enums import PlayerState
+from music_assistant_models.errors import PlayerCommandFailed
 
 from music_assistant.constants import CONF_SYNC_ADJUST, VERBOSE_LOG_LEVEL
 from music_assistant.helpers.audio import get_player_filter_params
 from music_assistant.helpers.ffmpeg import FFMpeg
 from music_assistant.helpers.process import AsyncProcess, check_output
-from music_assistant.helpers.util import close_async_generator
+from music_assistant.helpers.util import TaskManager, close_async_generator
 
 from .const import (
     AIRPLAY_PCM_FORMAT,
@@ -55,29 +56,27 @@ class RaopStreamSession:
         self._sync_clients = sync_clients
         self._audio_source = audio_source
         self._audio_source_task: asyncio.Task[None] | None = None
-        self._stopped: bool = False
         self._lock = asyncio.Lock()
 
     async def start(self) -> None:
         """Initialize RaopStreamSession."""
         # initialize raop stream for all players
-        for airplay_player in self._sync_clients:
-            if airplay_player.raop_stream and airplay_player.raop_stream.running:
-                raise RuntimeError("Player already has an active stream")
-            airplay_player.raop_stream = RaopStream(self, airplay_player)
 
         async def audio_streamer() -> None:
             """Stream audio to all players."""
             generator_exhausted = False
             try:
                 async for chunk in self._audio_source:
-                    if not self._sync_clients:
-                        return
                     async with self._lock:
+                        sync_clients = [
+                            x for x in self._sync_clients if x.raop_stream and x.raop_stream.running
+                        ]
+                        if not sync_clients:
+                            return
                         await asyncio.gather(
                             *[
                                 x.raop_stream.write_chunk(chunk)
-                                for x in self._sync_clients
+                                for x in sync_clients
                                 if x.raop_stream
                             ],
                             return_exceptions=True,
@@ -86,7 +85,11 @@ class RaopStreamSession:
                 generator_exhausted = True
                 async with self._lock:
                     await asyncio.gather(
-                        *[x.raop_stream.write_eof() for x in self._sync_clients if x.raop_stream],
+                        *[
+                            x.raop_stream.write_eof()
+                            for x in self._sync_clients
+                            if x.raop_stream and x.raop_stream.running
+                        ],
                         return_exceptions=True,
                     )
             except Exception as err:
@@ -96,6 +99,7 @@ class RaopStreamSession:
                     str(err) or err.__class__.__name__,
                     exc_info=err if logger.isEnabledFor(logging.DEBUG) else None,
                 )
+                raise
             finally:
                 if not generator_exhausted:
                     await close_async_generator(self._audio_source)
@@ -104,24 +108,24 @@ class RaopStreamSession:
         assert self.prov.cliraop_bin
         _, stdout = await check_output(self.prov.cliraop_bin, "-ntp")
         start_ntp = int(stdout.strip())
-        wait_start = 1500 + (250 * len(self._sync_clients))
-        async with self._lock:
-            await asyncio.gather(
-                *[
-                    x.raop_stream.start(start_ntp, wait_start)
-                    for x in self._sync_clients
-                    if x.raop_stream
-                ],
-                return_exceptions=True,
-            )
+        wait_start = 1750 + (250 * len(self._sync_clients))
+
+        async def _start_client(raop_player: AirPlayPlayer) -> None:
+            # stop existing stream if running
+            if raop_player.raop_stream and raop_player.raop_stream.running:
+                await raop_player.raop_stream.stop()
+
+            raop_player.raop_stream = RaopStream(self, raop_player)
+            await raop_player.raop_stream.start(start_ntp, wait_start)
+
+        async with TaskManager(self.mass) as tm:
+            for _raop_player in self._sync_clients:
+                tm.create_task(_start_client(_raop_player))
         self._audio_source_task = asyncio.create_task(audio_streamer())
 
     async def stop(self) -> None:
         """Stop playback and cleanup."""
-        if self._stopped:
-            return
-        self._stopped = True
-        if self._audio_source_task:
+        if self._audio_source_task and not self._audio_source_task.done():
             self._audio_source_task.cancel()
             with suppress(asyncio.CancelledError):
                 await self._audio_source_task
@@ -138,7 +142,7 @@ class RaopStreamSession:
         assert airplay_player.raop_stream.session == self
         async with self._lock:
             self._sync_clients.remove(airplay_player)
-        await airplay_player.raop_stream.stop()
+            await airplay_player.raop_stream.stop()
         airplay_player.raop_stream = None
 
     async def add_client(self, airplay_player: AirPlayPlayer) -> None:
@@ -173,7 +177,7 @@ class RaopStream:
         # with the named pipe used to send audio
         self.active_remote_id: str = str(randint(1000, 8000))
         self.prevent_playback: bool = False
-        self._log_reader_task: asyncio.Task[None] | asyncio.Future[None] | None = None
+        self._stderr_reader_task: asyncio.Task[None] | None = None
         self._cliraop_proc: AsyncProcess | None = None
         self._ffmpeg_proc: AsyncProcess | None = None
         self._started = asyncio.Event()
@@ -182,7 +186,12 @@ class RaopStream:
     @property
     def running(self) -> bool:
         """Return boolean if this stream is running."""
-        return not self._stopped and self._started.is_set()
+        return (
+            not self._stopped
+            and self._started.is_set()
+            and self._cliraop_proc is not None
+            and not self._cliraop_proc.closed
+        )
 
     async def start(self, start_ntp: int, wait_start: int = 1000) -> None:
         """Initialize CLIRaop process for a player."""
@@ -218,10 +227,8 @@ class RaopStream:
         read_ahead = await self.mass.config.get_player_config_value(
             player_id, CONF_READ_AHEAD_BUFFER
         )
-
         # create os pipes to pipe ffmpeg to cliraop
         read, write = await asyncio.to_thread(os.pipe)
-
         # ffmpeg handles the player specific stream + filters and pipes
         # audio to the cliraop process
         self._ffmpeg_proc = FFMpeg(
@@ -235,6 +242,11 @@ class RaopStream:
         await asyncio.to_thread(os.close, write)
 
         # cliraop is the binary that handles the actual raop streaming to the player
+        # this is a slightly modified bversion of philippe44's libraop
+        # https://github.com/music-assistant/libraop
+        # we use this intermediate binary to do the actual streaming because attempts to do
+        # so using pure python (e.g. pyatv) were not successful due to the realtime nature
+        # TODO: Either enhance libraop with airplay 2 support or find a better alternative
         cliraop_args = [
             self.prov.cliraop_bin,
             "-ntpstart",
@@ -262,33 +274,40 @@ class RaopStream:
             os.environ["DYLD_LIBRARY_PATH"] = "/usr/local/lib"
         await self._cliraop_proc.start()
         await asyncio.to_thread(os.close, read)
-        self._started.set()
-        self._log_reader_task = self.mass.create_task(self._log_watcher())
+        # read first 10 lines of stderr to get the initial status
+        for _ in range(10):
+            line = (await self._cliraop_proc.read_stderr()).decode("utf-8", errors="ignore")
+            self.airplay_player.logger.debug(line)
+            if "connected to " in line:
+                self._started.set()
+                break
+            if "Cannot connect to AirPlay device" in line:
+                raise PlayerCommandFailed("Cannot connect to AirPlay device")
+        # start reading the stderr of the cliraop process from another task
+        self._stderr_reader_task = self.mass.create_task(self._stderr_reader())
 
     async def stop(self) -> None:
         """Stop playback and cleanup."""
-        if self._stopped:
-            return
-        if not self._cliraop_proc:
+        if self._stopped or not self._cliraop_proc:
             return
         if self._cliraop_proc.proc and not self._cliraop_proc.closed:
             await self.send_cli_command("ACTION=STOP")
-        self._stopped = True  # set after send_cli command!
+        self._stopped = True
+        with suppress(asyncio.TimeoutError):
+            await self._cliraop_proc.wait_with_timeout(5)
+        if self._stderr_reader_task and not self._stderr_reader_task.done():
+            self._stderr_reader_task.cancel()
         if self._cliraop_proc.proc and not self._cliraop_proc.closed:
             await self._cliraop_proc.close(True)
         if self._ffmpeg_proc and not self._ffmpeg_proc.closed:
             await self._ffmpeg_proc.close(True)
-        if self._log_reader_task:
-            self._log_reader_task.cancel()
-            with suppress(asyncio.CancelledError):
-                await self._log_reader_task
         self._cliraop_proc = None
         self._ffmpeg_proc = None
 
     async def write_chunk(self, chunk: bytes) -> None:
         """Write a (pcm) audio chunk."""
         if self._stopped:
-            return
+            raise RuntimeError("Stream is already stopped")
         await self._started.wait()
         assert self._ffmpeg_proc
         await self._ffmpeg_proc.write(chunk)
@@ -296,7 +315,7 @@ class RaopStream:
     async def write_eof(self) -> None:
         """Write EOF."""
         if self._stopped:
-            return
+            raise RuntimeError("Stream is already stopped")
         await self._started.wait()
         assert self._ffmpeg_proc
         await self._ffmpeg_proc.write_eof()
@@ -304,7 +323,7 @@ class RaopStream:
     async def send_cli_command(self, command: str) -> None:
         """Send an interactive command to the running CLIRaop binary."""
         if self._stopped:
-            return
+            raise RuntimeError("Stream is already stopped")
         await self._started.wait()
 
         if not command.endswith("\n"):
@@ -319,7 +338,7 @@ class RaopStream:
         self.airplay_player.last_command_sent = time.time()
         await asyncio.to_thread(send_data)
 
-    async def _log_watcher(self) -> None:
+    async def _stderr_reader(self) -> None:  # noqa: PLR0915
         """Monitor stderr for the running CLIRaop process."""
         airplay_player = self.airplay_player
         mass_player = self.mass.players.get(airplay_player.player_id)
@@ -389,7 +408,8 @@ class RaopStream:
             mass_player.state = PlayerState.IDLE
             self.mass.players.update(airplay_player.player_id)
         # ensure we're cleaned up afterwards (this also logs the returncode)
-        await self.stop()
+        if not self._stopped:
+            await self.stop()
 
     async def _send_metadata(self, queue: PlayerQueue) -> None:
         """Send metadata to player (and connected sync childs)."""