From 01b9e295462115e31442a05dcae6fd575b22094f Mon Sep 17 00:00:00 2001 From: Marcel van der Veldt Date: Mon, 3 Nov 2025 18:29:39 +0100 Subject: [PATCH] Fix race conditions in closing a (ffmpeg) process --- music_assistant/helpers/audio_buffer.py | 12 +++++++++++- music_assistant/helpers/process.py | 14 +++++++++----- .../providers/airplay/protocols/_protocol.py | 2 +- .../providers/spotify_connect/__init__.py | 2 +- 4 files changed, 22 insertions(+), 8 deletions(-) diff --git a/music_assistant/helpers/audio_buffer.py b/music_assistant/helpers/audio_buffer.py index 8d96bc1d..17f4b874 100644 --- a/music_assistant/helpers/audio_buffer.py +++ b/music_assistant/helpers/audio_buffer.py @@ -307,6 +307,15 @@ class AudioBuffer: # if we reach here, we have broken out of the loop due to inactivity await self.clear(cancel_inactivity_task=False) + async def _notify_on_producer_error(self) -> None: + """Notify waiting consumers that producer has failed. + + This is called from the producer task done callback and properly + acquires the lock before calling notify_all. + """ + async with self._lock: + self._data_available.notify_all() + def attach_producer_task(self, task: asyncio.Task[Any]) -> None: """Attach a background task that fills the buffer.""" self._producer_task = task @@ -324,8 +333,9 @@ class AudioBuffer: # This prevents reuse of a buffer in error state self._cancelled = True # Wake up any waiting consumers so they can see the error + # We need to acquire the lock before calling notify_all loop = asyncio.get_running_loop() - loop.call_soon_threadsafe(self._data_available.notify_all) + loop.create_task(self._notify_on_producer_error()) task.add_done_callback(_on_producer_done) diff --git a/music_assistant/helpers/process.py b/music_assistant/helpers/process.py index bbbb49cd..46df59f8 100644 --- a/music_assistant/helpers/process.py +++ b/music_assistant/helpers/process.py @@ -90,8 +90,8 @@ class AsyncProcess: exc_tb: TracebackType | None, ) -> bool | None: """Exit context manager.""" - # send interrupt signal to process when we're cancelled - await self.close(send_signal=exc_type in (GeneratorExit, asyncio.CancelledError)) + # make sure we close and cleanup the process + await self.close() self._returncode = self.returncode return None @@ -228,13 +228,11 @@ class AsyncProcess: stdout, stderr = await asyncio.wait_for(self.proc.communicate(input), timeout) return (stdout, stderr) - async def close(self, send_signal: bool = False) -> None: + async def close(self) -> None: """Close/terminate the process and wait for exit.""" self._close_called = True if not self.proc: return - if send_signal and self.returncode is None: - self.proc.send_signal(SIGINT) # cancel existing stdin feeder task if any if self._stdin_feeder_task: @@ -251,6 +249,8 @@ class AsyncProcess: await asyncio.wait_for(self._stdin_lock.acquire(), 10) if self.proc.stdin and not self.proc.stdin.is_closing(): self.proc.stdin.close() + elif not self.proc.stdin and self.proc.returncode is None: + self.proc.send_signal(SIGINT) # ensure we have no more readers active and stdout is drained await asyncio.wait_for(self._stdout_lock.acquire(), 10) @@ -300,6 +300,10 @@ class AsyncProcess: """Wait for the process and return the returncode with a timeout.""" return await asyncio.wait_for(self.wait(), timeout) + def attach_stderr_reader(self, task: asyncio.Task[None]) -> None: + """Attach a stderr reader task to this process.""" + self._stderr_reader_task = task + async def check_output(*args: str, env: dict[str, str] | None = None) -> tuple[int, bytes]: """Run subprocess and return returncode and output.""" diff --git a/music_assistant/providers/airplay/protocols/_protocol.py b/music_assistant/providers/airplay/protocols/_protocol.py index 93a57d00..a9e37356 100644 --- a/music_assistant/providers/airplay/protocols/_protocol.py +++ b/music_assistant/providers/airplay/protocols/_protocol.py @@ -89,7 +89,7 @@ class AirPlayProtocol(ABC): # Close the CLI process (wait for it to terminate) if self._cli_proc and not self._cli_proc.closed: - await self._cli_proc.close(True) + await self._cli_proc.close() self.player.set_state_from_stream(state=PlaybackState.IDLE, elapsed_time=0) diff --git a/music_assistant/providers/spotify_connect/__init__.py b/music_assistant/providers/spotify_connect/__init__.py index 378278a1..2c95b1e4 100644 --- a/music_assistant/providers/spotify_connect/__init__.py +++ b/music_assistant/providers/spotify_connect/__init__.py @@ -415,7 +415,7 @@ class SpotifyConnectProvider(PluginProvider): continue self.logger.debug(line) finally: - await librespot.close(True) + await librespot.close() self.logger.info("Spotify Connect background daemon stopped for %s", self.name) await check_output("rm", "-f", self.named_pipe) if not self._librespot_started.is_set(): -- 2.34.1