prevent race conditions on stop
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Mon, 8 Apr 2024 18:25:14 +0000 (20:25 +0200)
committerMarcel van der Veldt <m.vanderveldt@outlook.com>
Mon, 8 Apr 2024 18:25:14 +0000 (20:25 +0200)
music_assistant/server/providers/airplay/__init__.py

index 4adab756aa131fcf625e2a1cc4dfa9cfdb24c8e1..0ce6177d2699ba8aeb45c9c077875a1a12f5b5d8 100644 (file)
@@ -196,12 +196,13 @@ class AirplayStream:
         # with the named pipe used to send audio
         self.active_remote_id: str = str(randint(1000, 8000))
         self.prevent_playback: bool = False
-        self.running = True
         # audio_source_task will only exist for the main player in a sync group
         self.audio_source_task: asyncio.Task | None = None
         self._log_reader_task: asyncio.Task | None = None
         self._cliraop_proc: AsyncProcess | None = None
         self._ffmpeg_proc: AsyncProcess | None = None
+        self._started = asyncio.Event()
+        self._stopped = False
 
     async def start(self, start_ntp: int, wait_start: int = 1000) -> None:
         """Initialize CLIRaop process for a player."""
@@ -271,15 +272,18 @@ class AirplayStream:
         )
         await self._cliraop_proc.start()
         await asyncio.to_thread(os.close, read)
+        self._started.set()
         self._log_reader_task = asyncio.create_task(self._log_watcher())
 
     async def stop(self):
         """Stop playback and cleanup."""
-        self.running = False
-        if self.audio_source_task and not self.audio_source_task.done():
-            self.audio_source_task.cancel()
+        if self._stopped:
+            return
         if not self._cliraop_proc.closed:
             await self.send_cli_command("ACTION=STOP")
+        self._stopped = True  # set after send_cli command!
+        if self.audio_source_task and not self.audio_source_task.done():
+            self.audio_source_task.cancel()
         try:
             await asyncio.wait_for(self._cliraop_proc.wait(), 5)
         except TimeoutError:
@@ -296,16 +300,23 @@ class AirplayStream:
 
     async def write_chunk(self, chunk: bytes) -> None:
         """Write a (pcm) audio chunk."""
+        if self._stopped:
+            return
+        await self._started.wait()
         await self._ffmpeg_proc.write(chunk)
 
     async def write_eof(self) -> None:
         """Write EOF."""
+        if self._stopped:
+            return
+        await self._started.wait()
         await self._ffmpeg_proc.write_eof()
 
     async def send_cli_command(self, command: str) -> None:
         """Send an interactive command to the running CLIRaop binary."""
-        if not self._cliraop_proc or self._cliraop_proc.closed:
+        if self._stopped:
             return
+        await self._started.wait()
 
         if not command.endswith("\n"):
             command += "\n"
@@ -589,7 +600,7 @@ class AirplayProvider(PlayerProvider):
         # always stop existing stream first
         async with asyncio.TaskGroup() as tg:
             for airplay_player in self._get_sync_clients(player_id):
-                if airplay_player.active_stream and airplay_player.active_stream.running:
+                if airplay_player.active_stream and airplay_player.active_stream:
                     tg.create_task(airplay_player.active_stream.stop())
         # select audio source
         if media.media_type == MediaType.ANNOUNCEMENT: