From f1054291a7f5133a4106c656b7f179a84954781d Mon Sep 17 00:00:00 2001 From: Marcel van der Veldt Date: Thu, 23 Jan 2025 22:05:24 +0100 Subject: [PATCH] Fix various issues in the Airplay provider - Added some more locking - Wait for cliraop to exit when sending stop - Exit audiostream loop when all clients dissapeared - Fix provider reload --- music_assistant/providers/airplay/player.py | 27 +++-- music_assistant/providers/airplay/provider.py | 4 +- music_assistant/providers/airplay/raop.py | 108 +++++++++++------- 3 files changed, 82 insertions(+), 57 deletions(-) diff --git a/music_assistant/providers/airplay/player.py b/music_assistant/providers/airplay/player.py index 8eed6323..ee09e1ef 100644 --- a/music_assistant/providers/airplay/player.py +++ b/music_assistant/providers/airplay/player.py @@ -2,6 +2,7 @@ from __future__ import annotations +import asyncio from typing import TYPE_CHECKING from music_assistant_models.enums import PlayerState @@ -28,23 +29,27 @@ class AirPlayPlayer: self.logger = prov.logger.getChild(player_id) self.raop_stream: RaopStream | None = None self.last_command_sent = 0.0 + self._lock = asyncio.Lock() async def cmd_stop(self, update_state: bool = True) -> None: """Send STOP command to player.""" - if self.raop_stream: - # forward stop to the entire stream session - await self.raop_stream.session.stop() - if update_state and (mass_player := self.mass.players.get(self.player_id)): - mass_player.state = PlayerState.IDLE - self.mass.players.update(mass_player.player_id) + async with self._lock: + if self.raop_stream: + # forward stop to the entire stream session + await self.raop_stream.session.stop() + if update_state and (mass_player := self.mass.players.get(self.player_id)): + mass_player.state = PlayerState.IDLE + self.mass.players.update(mass_player.player_id) async def cmd_play(self) -> None: """Send PLAY (unpause) command to player.""" - if self.raop_stream and self.raop_stream.running: - await self.raop_stream.send_cli_command("ACTION=PLAY") + async with self._lock: + if self.raop_stream and self.raop_stream.running: + await self.raop_stream.send_cli_command("ACTION=PLAY") async def cmd_pause(self) -> None: """Send PAUSE command to player.""" - if not self.raop_stream or not self.raop_stream.running: - return - await self.raop_stream.send_cli_command("ACTION=PAUSE") + async with self._lock: + if not self.raop_stream or not self.raop_stream.running: + return + await self.raop_stream.send_cli_command("ACTION=PAUSE") diff --git a/music_assistant/providers/airplay/provider.py b/music_assistant/providers/airplay/provider.py index 70a035fd..16067b26 100644 --- a/music_assistant/providers/airplay/provider.py +++ b/music_assistant/providers/airplay/provider.py @@ -221,8 +221,8 @@ class AirplayProvider(PlayerProvider): async def unload(self, is_removed: bool = False) -> None: """Handle unload/close of the provider.""" # power off all players (will disconnect and close cliraop) - for player_id in self._players: - await self.cmd_power(player_id, False) + for player in self._players.values(): + await player.cmd_stop() # shutdown DACP server if self._dacp_server: self._dacp_server.close() diff --git a/music_assistant/providers/airplay/raop.py b/music_assistant/providers/airplay/raop.py index 93185373..585fdf40 100644 --- a/music_assistant/providers/airplay/raop.py +++ b/music_assistant/providers/airplay/raop.py @@ -13,12 +13,13 @@ from random import randint from typing import TYPE_CHECKING from music_assistant_models.enums import PlayerState +from music_assistant_models.errors import PlayerCommandFailed from music_assistant.constants import CONF_SYNC_ADJUST, VERBOSE_LOG_LEVEL from music_assistant.helpers.audio import get_player_filter_params from music_assistant.helpers.ffmpeg import FFMpeg from music_assistant.helpers.process import AsyncProcess, check_output -from music_assistant.helpers.util import close_async_generator +from music_assistant.helpers.util import TaskManager, close_async_generator from .const import ( AIRPLAY_PCM_FORMAT, @@ -55,29 +56,27 @@ class RaopStreamSession: self._sync_clients = sync_clients self._audio_source = audio_source self._audio_source_task: asyncio.Task[None] | None = None - self._stopped: bool = False self._lock = asyncio.Lock() async def start(self) -> None: """Initialize RaopStreamSession.""" # initialize raop stream for all players - for airplay_player in self._sync_clients: - if airplay_player.raop_stream and airplay_player.raop_stream.running: - raise RuntimeError("Player already has an active stream") - airplay_player.raop_stream = RaopStream(self, airplay_player) async def audio_streamer() -> None: """Stream audio to all players.""" generator_exhausted = False try: async for chunk in self._audio_source: - if not self._sync_clients: - return async with self._lock: + sync_clients = [ + x for x in self._sync_clients if x.raop_stream and x.raop_stream.running + ] + if not sync_clients: + return await asyncio.gather( *[ x.raop_stream.write_chunk(chunk) - for x in self._sync_clients + for x in sync_clients if x.raop_stream ], return_exceptions=True, @@ -86,7 +85,11 @@ class RaopStreamSession: generator_exhausted = True async with self._lock: await asyncio.gather( - *[x.raop_stream.write_eof() for x in self._sync_clients if x.raop_stream], + *[ + x.raop_stream.write_eof() + for x in self._sync_clients + if x.raop_stream and x.raop_stream.running + ], return_exceptions=True, ) except Exception as err: @@ -96,6 +99,7 @@ class RaopStreamSession: str(err) or err.__class__.__name__, exc_info=err if logger.isEnabledFor(logging.DEBUG) else None, ) + raise finally: if not generator_exhausted: await close_async_generator(self._audio_source) @@ -104,24 +108,24 @@ class RaopStreamSession: assert self.prov.cliraop_bin _, stdout = await check_output(self.prov.cliraop_bin, "-ntp") start_ntp = int(stdout.strip()) - wait_start = 1500 + (250 * len(self._sync_clients)) - async with self._lock: - await asyncio.gather( - *[ - x.raop_stream.start(start_ntp, wait_start) - for x in self._sync_clients - if x.raop_stream - ], - return_exceptions=True, - ) + wait_start = 1750 + (250 * len(self._sync_clients)) + + async def _start_client(raop_player: AirPlayPlayer) -> None: + # stop existing stream if running + if raop_player.raop_stream and raop_player.raop_stream.running: + await raop_player.raop_stream.stop() + + raop_player.raop_stream = RaopStream(self, raop_player) + await raop_player.raop_stream.start(start_ntp, wait_start) + + async with TaskManager(self.mass) as tm: + for _raop_player in self._sync_clients: + tm.create_task(_start_client(_raop_player)) self._audio_source_task = asyncio.create_task(audio_streamer()) async def stop(self) -> None: """Stop playback and cleanup.""" - if self._stopped: - return - self._stopped = True - if self._audio_source_task: + if self._audio_source_task and not self._audio_source_task.done(): self._audio_source_task.cancel() with suppress(asyncio.CancelledError): await self._audio_source_task @@ -138,7 +142,7 @@ class RaopStreamSession: assert airplay_player.raop_stream.session == self async with self._lock: self._sync_clients.remove(airplay_player) - await airplay_player.raop_stream.stop() + await airplay_player.raop_stream.stop() airplay_player.raop_stream = None async def add_client(self, airplay_player: AirPlayPlayer) -> None: @@ -173,7 +177,7 @@ class RaopStream: # with the named pipe used to send audio self.active_remote_id: str = str(randint(1000, 8000)) self.prevent_playback: bool = False - self._log_reader_task: asyncio.Task[None] | asyncio.Future[None] | None = None + self._stderr_reader_task: asyncio.Task[None] | None = None self._cliraop_proc: AsyncProcess | None = None self._ffmpeg_proc: AsyncProcess | None = None self._started = asyncio.Event() @@ -182,7 +186,12 @@ class RaopStream: @property def running(self) -> bool: """Return boolean if this stream is running.""" - return not self._stopped and self._started.is_set() + return ( + not self._stopped + and self._started.is_set() + and self._cliraop_proc is not None + and not self._cliraop_proc.closed + ) async def start(self, start_ntp: int, wait_start: int = 1000) -> None: """Initialize CLIRaop process for a player.""" @@ -218,10 +227,8 @@ class RaopStream: read_ahead = await self.mass.config.get_player_config_value( player_id, CONF_READ_AHEAD_BUFFER ) - # create os pipes to pipe ffmpeg to cliraop read, write = await asyncio.to_thread(os.pipe) - # ffmpeg handles the player specific stream + filters and pipes # audio to the cliraop process self._ffmpeg_proc = FFMpeg( @@ -235,6 +242,11 @@ class RaopStream: await asyncio.to_thread(os.close, write) # cliraop is the binary that handles the actual raop streaming to the player + # this is a slightly modified bversion of philippe44's libraop + # https://github.com/music-assistant/libraop + # we use this intermediate binary to do the actual streaming because attempts to do + # so using pure python (e.g. pyatv) were not successful due to the realtime nature + # TODO: Either enhance libraop with airplay 2 support or find a better alternative cliraop_args = [ self.prov.cliraop_bin, "-ntpstart", @@ -262,33 +274,40 @@ class RaopStream: os.environ["DYLD_LIBRARY_PATH"] = "/usr/local/lib" await self._cliraop_proc.start() await asyncio.to_thread(os.close, read) - self._started.set() - self._log_reader_task = self.mass.create_task(self._log_watcher()) + # read first 10 lines of stderr to get the initial status + for _ in range(10): + line = (await self._cliraop_proc.read_stderr()).decode("utf-8", errors="ignore") + self.airplay_player.logger.debug(line) + if "connected to " in line: + self._started.set() + break + if "Cannot connect to AirPlay device" in line: + raise PlayerCommandFailed("Cannot connect to AirPlay device") + # start reading the stderr of the cliraop process from another task + self._stderr_reader_task = self.mass.create_task(self._stderr_reader()) async def stop(self) -> None: """Stop playback and cleanup.""" - if self._stopped: - return - if not self._cliraop_proc: + if self._stopped or not self._cliraop_proc: return if self._cliraop_proc.proc and not self._cliraop_proc.closed: await self.send_cli_command("ACTION=STOP") - self._stopped = True # set after send_cli command! + self._stopped = True + with suppress(asyncio.TimeoutError): + await self._cliraop_proc.wait_with_timeout(5) + if self._stderr_reader_task and not self._stderr_reader_task.done(): + self._stderr_reader_task.cancel() if self._cliraop_proc.proc and not self._cliraop_proc.closed: await self._cliraop_proc.close(True) if self._ffmpeg_proc and not self._ffmpeg_proc.closed: await self._ffmpeg_proc.close(True) - if self._log_reader_task: - self._log_reader_task.cancel() - with suppress(asyncio.CancelledError): - await self._log_reader_task self._cliraop_proc = None self._ffmpeg_proc = None async def write_chunk(self, chunk: bytes) -> None: """Write a (pcm) audio chunk.""" if self._stopped: - return + raise RuntimeError("Stream is already stopped") await self._started.wait() assert self._ffmpeg_proc await self._ffmpeg_proc.write(chunk) @@ -296,7 +315,7 @@ class RaopStream: async def write_eof(self) -> None: """Write EOF.""" if self._stopped: - return + raise RuntimeError("Stream is already stopped") await self._started.wait() assert self._ffmpeg_proc await self._ffmpeg_proc.write_eof() @@ -304,7 +323,7 @@ class RaopStream: async def send_cli_command(self, command: str) -> None: """Send an interactive command to the running CLIRaop binary.""" if self._stopped: - return + raise RuntimeError("Stream is already stopped") await self._started.wait() if not command.endswith("\n"): @@ -319,7 +338,7 @@ class RaopStream: self.airplay_player.last_command_sent = time.time() await asyncio.to_thread(send_data) - async def _log_watcher(self) -> None: + async def _stderr_reader(self) -> None: # noqa: PLR0915 """Monitor stderr for the running CLIRaop process.""" airplay_player = self.airplay_player mass_player = self.mass.players.get(airplay_player.player_id) @@ -389,7 +408,8 @@ class RaopStream: mass_player.state = PlayerState.IDLE self.mass.players.update(airplay_player.player_id) # ensure we're cleaned up afterwards (this also logs the returncode) - await self.stop() + if not self._stopped: + await self.stop() async def _send_metadata(self, queue: PlayerQueue) -> None: """Send metadata to player (and connected sync childs).""" -- 2.34.1