From: Marcel van der Veldt Date: Mon, 8 Apr 2024 18:25:14 +0000 (+0200) Subject: prevent race conditions on stop X-Git-Url: https://git.kitaultman.com/?a=commitdiff_plain;h=518d967e5fa5cb2a7aeb54129b96872c10217113;p=music-assistant-server.git prevent race conditions on stop --- diff --git a/music_assistant/server/providers/airplay/__init__.py b/music_assistant/server/providers/airplay/__init__.py index 4adab756..0ce6177d 100644 --- a/music_assistant/server/providers/airplay/__init__.py +++ b/music_assistant/server/providers/airplay/__init__.py @@ -196,12 +196,13 @@ class AirplayStream: # with the named pipe used to send audio self.active_remote_id: str = str(randint(1000, 8000)) self.prevent_playback: bool = False - self.running = True # audio_source_task will only exist for the main player in a sync group self.audio_source_task: asyncio.Task | None = None self._log_reader_task: asyncio.Task | None = None self._cliraop_proc: AsyncProcess | None = None self._ffmpeg_proc: AsyncProcess | None = None + self._started = asyncio.Event() + self._stopped = False async def start(self, start_ntp: int, wait_start: int = 1000) -> None: """Initialize CLIRaop process for a player.""" @@ -271,15 +272,18 @@ class AirplayStream: ) await self._cliraop_proc.start() await asyncio.to_thread(os.close, read) + self._started.set() self._log_reader_task = asyncio.create_task(self._log_watcher()) async def stop(self): """Stop playback and cleanup.""" - self.running = False - if self.audio_source_task and not self.audio_source_task.done(): - self.audio_source_task.cancel() + if self._stopped: + return if not self._cliraop_proc.closed: await self.send_cli_command("ACTION=STOP") + self._stopped = True # set after send_cli command! + if self.audio_source_task and not self.audio_source_task.done(): + self.audio_source_task.cancel() try: await asyncio.wait_for(self._cliraop_proc.wait(), 5) except TimeoutError: @@ -296,16 +300,23 @@ class AirplayStream: async def write_chunk(self, chunk: bytes) -> None: """Write a (pcm) audio chunk.""" + if self._stopped: + return + await self._started.wait() await self._ffmpeg_proc.write(chunk) async def write_eof(self) -> None: """Write EOF.""" + if self._stopped: + return + await self._started.wait() await self._ffmpeg_proc.write_eof() async def send_cli_command(self, command: str) -> None: """Send an interactive command to the running CLIRaop binary.""" - if not self._cliraop_proc or self._cliraop_proc.closed: + if self._stopped: return + await self._started.wait() if not command.endswith("\n"): command += "\n" @@ -589,7 +600,7 @@ class AirplayProvider(PlayerProvider): # always stop existing stream first async with asyncio.TaskGroup() as tg: for airplay_player in self._get_sync_clients(player_id): - if airplay_player.active_stream and airplay_player.active_stream.running: + if airplay_player.active_stream and airplay_player.active_stream: tg.create_task(airplay_player.active_stream.stop()) # select audio source if media.media_type == MediaType.ANNOUNCEMENT: