Fix race conditions in closing a (ffmpeg) process
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Mon, 3 Nov 2025 17:29:39 +0000 (18:29 +0100)
committerMarcel van der Veldt <m.vanderveldt@outlook.com>
Mon, 3 Nov 2025 17:29:39 +0000 (18:29 +0100)
music_assistant/helpers/audio_buffer.py
music_assistant/helpers/process.py
music_assistant/providers/airplay/protocols/_protocol.py
music_assistant/providers/spotify_connect/__init__.py

index 8d96bc1d8a8a9c402bcee4699e8f34aa005d7071..17f4b8740077fd03f5aad18d536ac9078961527b 100644 (file)
@@ -307,6 +307,15 @@ class AudioBuffer:
         # if we reach here, we have broken out of the loop due to inactivity
         await self.clear(cancel_inactivity_task=False)
 
+    async def _notify_on_producer_error(self) -> None:
+        """Notify waiting consumers that producer has failed.
+
+        This is called from the producer task done callback and properly
+        acquires the lock before calling notify_all.
+        """
+        async with self._lock:
+            self._data_available.notify_all()
+
     def attach_producer_task(self, task: asyncio.Task[Any]) -> None:
         """Attach a background task that fills the buffer."""
         self._producer_task = task
@@ -324,8 +333,9 @@ class AudioBuffer:
                 # This prevents reuse of a buffer in error state
                 self._cancelled = True
                 # Wake up any waiting consumers so they can see the error
+                # We need to acquire the lock before calling notify_all
                 loop = asyncio.get_running_loop()
-                loop.call_soon_threadsafe(self._data_available.notify_all)
+                loop.create_task(self._notify_on_producer_error())
 
         task.add_done_callback(_on_producer_done)
 
index bbbb49cd78c836ab1ed35265f2261b53138484ec..46df59f8d68d9a2a53ed1be622d81b46711edfc5 100644 (file)
@@ -90,8 +90,8 @@ class AsyncProcess:
         exc_tb: TracebackType | None,
     ) -> bool | None:
         """Exit context manager."""
-        # send interrupt signal to process when we're cancelled
-        await self.close(send_signal=exc_type in (GeneratorExit, asyncio.CancelledError))
+        # make sure we close and cleanup the process
+        await self.close()
         self._returncode = self.returncode
         return None
 
@@ -228,13 +228,11 @@ class AsyncProcess:
         stdout, stderr = await asyncio.wait_for(self.proc.communicate(input), timeout)
         return (stdout, stderr)
 
-    async def close(self, send_signal: bool = False) -> None:
+    async def close(self) -> None:
         """Close/terminate the process and wait for exit."""
         self._close_called = True
         if not self.proc:
             return
-        if send_signal and self.returncode is None:
-            self.proc.send_signal(SIGINT)
 
         # cancel existing stdin feeder task if any
         if self._stdin_feeder_task:
@@ -251,6 +249,8 @@ class AsyncProcess:
         await asyncio.wait_for(self._stdin_lock.acquire(), 10)
         if self.proc.stdin and not self.proc.stdin.is_closing():
             self.proc.stdin.close()
+        elif not self.proc.stdin and self.proc.returncode is None:
+            self.proc.send_signal(SIGINT)
 
         # ensure we have no more readers active and stdout is drained
         await asyncio.wait_for(self._stdout_lock.acquire(), 10)
@@ -300,6 +300,10 @@ class AsyncProcess:
         """Wait for the process and return the returncode with a timeout."""
         return await asyncio.wait_for(self.wait(), timeout)
 
+    def attach_stderr_reader(self, task: asyncio.Task[None]) -> None:
+        """Attach a stderr reader task to this process."""
+        self._stderr_reader_task = task
+
 
 async def check_output(*args: str, env: dict[str, str] | None = None) -> tuple[int, bytes]:
     """Run subprocess and return returncode and output."""
index 93a57d003e8ae489baa402d708a83455805ad691..a9e37356fbecae4adfe025e8cca5b467fb137634 100644 (file)
@@ -89,7 +89,7 @@ class AirPlayProtocol(ABC):
 
         # Close the CLI process (wait for it to terminate)
         if self._cli_proc and not self._cli_proc.closed:
-            await self._cli_proc.close(True)
+            await self._cli_proc.close()
 
         self.player.set_state_from_stream(state=PlaybackState.IDLE, elapsed_time=0)
 
index 378278a1e6a32e170c4751b945778bc1b220a768..2c95b1e40a31a06220eae05e8def21f1e81851e8 100644 (file)
@@ -415,7 +415,7 @@ class SpotifyConnectProvider(PluginProvider):
                     continue
                 self.logger.debug(line)
         finally:
-            await librespot.close(True)
+            await librespot.close()
             self.logger.info("Spotify Connect background daemon stopped for %s", self.name)
             await check_output("rm", "-f", self.named_pipe)
             if not self._librespot_started.is_set():