Fix: clean shutdown of ffmpeg process
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Sun, 2 Nov 2025 15:54:44 +0000 (16:54 +0100)
committerMarcel van der Veldt <m.vanderveldt@outlook.com>
Sun, 2 Nov 2025 15:54:44 +0000 (16:54 +0100)
music_assistant/helpers/ffmpeg.py
music_assistant/helpers/process.py

index 2ae882ccddc4933e1572ef6774e0494226ad6ac1..d05d0a597f8317043c941fbb3b1b239792b5de40 100644 (file)
@@ -59,8 +59,8 @@ class FFMpeg(AsyncProcess):
         self.input_format = input_format
         self.collect_log_history = collect_log_history
         self.log_history: deque[str] = deque(maxlen=100)
-        self._stdin_task: asyncio.Task[None] | None = None
-        self._logger_task: asyncio.Task[None] | None = None
+        self._stdin_feeder_task: asyncio.Task[None] | None = None
+        self._stderr_reader_task: asyncio.Task[None] | None = None
         self._input_codec_parsed = False
         stdin: bool | int
         if audio_input == "-" or isinstance(audio_input, AsyncGenerator):
@@ -93,9 +93,9 @@ class FFMpeg(AsyncProcess):
                 clean_args.append(arg)
         args_str = " ".join(clean_args)
         self.logger.log(VERBOSE_LOG_LEVEL, "started with args: %s", args_str)
-        self._logger_task = asyncio.create_task(self._log_reader_task())
+        self._stderr_reader_task = asyncio.create_task(self._log_reader_task())
         if isinstance(self.audio_input, AsyncGenerator):
-            self._stdin_task = asyncio.create_task(self._feed_stdin())
+            self._stdin_feeder_task = asyncio.create_task(self._feed_stdin())
 
     async def communicate(
         self,
@@ -103,37 +103,21 @@ class FFMpeg(AsyncProcess):
         timeout: float | None = None,
     ) -> tuple[bytes, bytes]:
         """Override communicate to avoid blocking."""
-        if self._stdin_task:
-            if not self._stdin_task.done():
-                self._stdin_task.cancel()
+        if self._stdin_feeder_task:
+            if not self._stdin_feeder_task.done():
+                self._stdin_feeder_task.cancel()
             # Always await the task to consume any exception and prevent
             # "Task exception was never retrieved" errors.
             # Suppress CancelledError (from cancel) and any other exception
             # since exceptions have already been propagated through the generator chain.
             with suppress(asyncio.CancelledError, Exception):
-                await self._stdin_task
-        if self._logger_task and not self._logger_task.done():
-            self._logger_task.cancel()
-        return await super().communicate(input, timeout)
-
-    async def close(self, send_signal: bool = True) -> None:
-        """Close/terminate the process and wait for exit."""
-        if self.closed:
-            return
-        if self._stdin_task:
-            if not self._stdin_task.done():
-                self._stdin_task.cancel()
-            # Always await the task to consume any exception and prevent
-            # "Task exception was never retrieved" errors.
-            # Suppress CancelledError (from cancel) and any other exception
-            # since exceptions have already been propagated through the generator chain.
+                await self._stdin_feeder_task
+        if self._stderr_reader_task:
+            if not self._stderr_reader_task.done():
+                self._stderr_reader_task.cancel()
             with suppress(asyncio.CancelledError, Exception):
-                await self._stdin_task
-        await super().close(send_signal)
-        if self._logger_task and not self._logger_task.done():
-            self._logger_task.cancel()
-            with suppress(asyncio.CancelledError):
-                await self._logger_task
+                await self._stderr_reader_task
+        return await super().communicate(input, timeout)
 
     async def _log_reader_task(self) -> None:
         """Read ffmpeg log from stderr."""
@@ -152,7 +136,6 @@ class FFMpeg(AsyncProcess):
                 decode_errors += 1
             if decode_errors >= 50:
                 self.logger.error(line)
-                await super().close(True)
 
             # if streamdetails contenttype is unknown, try parse it from the ffmpeg log
             if line.startswith("Stream #") and ": Audio: " in line:
index 499f7fc482c5ae512c27c1037877cdd633337b3a..394fe4adddbce2fd1bd1abd1dc82791efe048f18 100644 (file)
@@ -35,6 +35,9 @@ class AsyncProcess:
     without deadlocking.
     """
 
+    _stdin_feeder_task: asyncio.Task[None] | None = None  # used for ffmpeg
+    _stderr_reader_task: asyncio.Task[None] | None = None  # used for ffmpeg
+
     def __init__(
         self,
         args: list[str],
@@ -53,6 +56,9 @@ class AsyncProcess:
         self._stdin = None if stdin is False else stdin
         self._stdout = None if stdout is False else stdout
         self._stderr = asyncio.subprocess.DEVNULL if stderr is False else stderr
+        self._stderr_lock = asyncio.Lock()
+        self._stdout_lock = asyncio.Lock()
+        self._stdin_lock = asyncio.Lock()
         self._close_called = False
         self._returncode: int | None = None
 
@@ -123,10 +129,11 @@ class AsyncProcess:
             return b""
         assert self.proc is not None  # for type checking
         assert self.proc.stdout is not None  # for type checking
-        try:
-            return await self.proc.stdout.readexactly(n)
-        except asyncio.IncompleteReadError as err:
-            return err.partial
+        async with self._stdout_lock:
+            try:
+                return await self.proc.stdout.readexactly(n)
+            except asyncio.IncompleteReadError as err:
+                return err.partial
 
     async def read(self, n: int) -> bytes:
         """Read up to n bytes from the stdout stream.
@@ -139,36 +146,39 @@ class AsyncProcess:
             return b""
         assert self.proc is not None  # for type checking
         assert self.proc.stdout is not None  # for type checking
-        return await self.proc.stdout.read(n)
+        async with self._stdout_lock:
+            return await self.proc.stdout.read(n)
 
     async def write(self, data: bytes) -> None:
         """Write data to process stdin."""
-        if self.closed:
-            raise RuntimeError("write called while process already done")
+        if self._close_called:
+            return
         assert self.proc is not None  # for type checking
         assert self.proc.stdin is not None  # for type checking
-        self.proc.stdin.write(data)
-        with suppress(BrokenPipeError, ConnectionResetError):
-            await self.proc.stdin.drain()
+        async with self._stdin_lock:
+            self.proc.stdin.write(data)
+            with suppress(BrokenPipeError, ConnectionResetError):
+                await self.proc.stdin.drain()
 
     async def write_eof(self) -> None:
         """Write end of file to to process stdin."""
-        if self.closed:
+        if self._close_called:
             return
         assert self.proc is not None  # for type checking
         assert self.proc.stdin is not None  # for type checking
-        try:
-            if self.proc.stdin.can_write_eof():
-                self.proc.stdin.write_eof()
-        except (
-            AttributeError,
-            AssertionError,
-            BrokenPipeError,
-            RuntimeError,
-            ConnectionResetError,
-        ):
-            # already exited, race condition
-            pass
+        async with self._stdin_lock:
+            try:
+                if self.proc.stdin.can_write_eof():
+                    self.proc.stdin.write_eof()
+            except (
+                AttributeError,
+                AssertionError,
+                BrokenPipeError,
+                RuntimeError,
+                ConnectionResetError,
+            ):
+                # already exited, race condition
+                pass
 
     async def read_stderr(self) -> bytes:
         """Read line from stderr."""
@@ -176,18 +186,19 @@ class AsyncProcess:
             return b""
         assert self.proc is not None  # for type checking
         assert self.proc.stderr is not None  # for type checking
-        try:
-            return await self.proc.stderr.readline()
-        except ValueError as err:
-            # we're waiting for a line (separator found), but the line was too big
-            # this may happen with ffmpeg during a long (radio) stream where progress
-            # gets outputted to the stderr but no newline
-            # https://stackoverflow.com/questions/55457370/how-to-avoid-valueerror-separator-is-not-found-and-chunk-exceed-the-limit
-            # NOTE: this consumes the line that was too big
-            if "chunk exceed the limit" in str(err):
+        async with self._stderr_lock:
+            try:
                 return await self.proc.stderr.readline()
-            # raise for all other (value) errors
-            raise
+            except ValueError as err:
+                # we're waiting for a line (separator found), but the line was too big
+                # this may happen with ffmpeg during a long (radio) stream where progress
+                # gets outputted to the stderr but no newline
+                # https://stackoverflow.com/questions/55457370/how-to-avoid-valueerror-separator-is-not-found-and-chunk-exceed-the-limit
+                # NOTE: this consumes the line that was too big
+                if "chunk exceed the limit" in str(err):
+                    return await self.proc.stderr.readline()
+                # raise for all other (value) errors
+                raise
 
     async def iter_stderr(self) -> AsyncGenerator[str, None]:
         """Iterate lines from the stderr stream as string."""
@@ -210,17 +221,9 @@ class AsyncProcess:
         if self.closed:
             raise RuntimeError("communicate called while process already done")
         # abort existing readers on stderr/stdout first before we send communicate
-        waiter: asyncio.Future[None]
+        await self._stderr_lock.acquire()
+        await self._stdout_lock.acquire()
         assert self.proc is not None  # for type checking
-        # _waiter is attribute of StreamReader
-        if self.proc.stdout and (waiter := self.proc.stdout._waiter):  # type: ignore[attr-defined]
-            self.proc.stdout._waiter = None  # type: ignore[attr-defined]
-            if waiter and not waiter.done():
-                waiter.set_exception(asyncio.CancelledError())
-        if self.proc.stderr and (waiter := self.proc.stderr._waiter):  # type: ignore[attr-defined]
-            self.proc.stderr._waiter = None  # type: ignore[attr-defined]
-            if waiter and not waiter.done():
-                waiter.set_exception(asyncio.CancelledError())
         stdout, stderr = await asyncio.wait_for(self.proc.communicate(input), timeout)
         return (stdout, stderr)
 
@@ -231,21 +234,36 @@ class AsyncProcess:
             return
         if send_signal and self.returncode is None:
             self.proc.send_signal(SIGINT)
+
+        # cancel existing stdin feeder task if any
+        if self._stdin_feeder_task:
+            if not self._stdin_feeder_task.done():
+                self._stdin_feeder_task.cancel()
+            # Always await the task to consume any exception and prevent
+            # "Task exception was never retrieved" errors.
+            # Suppress CancelledError (from cancel) and any other exception
+            # since exceptions have already been propagated through the generator chain.
+            with suppress(asyncio.CancelledError, Exception):
+                await self._stdin_feeder_task
+
+        # close stdin to signal we're done sending data
+        await asyncio.wait_for(self._stdin_lock.acquire(), 10)
         if self.proc.stdin and not self.proc.stdin.is_closing():
             self.proc.stdin.close()
-        # abort existing readers on stderr/stdout first before we send communicate
-        # waiter: asyncio.Future[None]
-        # stdout_waiter = self.proc.stdout._waiter  # type: ignore[attr-defined]
-        # if self.proc.stdout and stdout_waiter:
-        #     self.proc.stdout._waiter = None  # type: ignore[attr-defined]
-        #     if stdout_waiter and not stdout_waiter.done():
-        #         stdout_waiter.set_exception(asyncio.CancelledError())
-        # stderr_waiter = self.proc.stderr._waiter  # type: ignore[attr-defined]
-        # if self.proc.stderr and stderr_waiter:
-        #     self.proc.stderr._waiter = None  # type: ignore[attr-defined]
-        #     if stderr_waiter and not stderr_waiter.done():
-        #         stderr_waiter.set_exception(asyncio.CancelledError())
-        await asyncio.sleep(0)  # yield to loop
+
+        # ensure we have no more readers active and stdout is drained
+        await asyncio.wait_for(self._stdout_lock.acquire(), 10)
+        if self.proc.stdout and not self.proc.stdout.at_eof():
+            with suppress(Exception):
+                await self.proc.stdout.read(-1)
+        # if we have a stderr task active, allow it to finish
+        if self._stderr_reader_task:
+            await asyncio.wait_for(self._stderr_reader_task, 10)
+        elif self.proc.stderr and not self.proc.stderr.at_eof():
+            await asyncio.wait_for(self._stderr_lock.acquire(), 10)
+            # drain stderr
+            with suppress(Exception):
+                await self.proc.stderr.read(-1)
 
         # make sure the process is really cleaned up.
         # especially with pipes this can cause deadlocks if not properly guarded
@@ -254,18 +272,14 @@ class AsyncProcess:
             try:
                 # use communicate to flush all pipe buffers
                 await asyncio.wait_for(self.proc.communicate(), 5)
-            except RuntimeError as err:
-                if "read() called while another coroutine" in str(err):
-                    # race condition
-                    continue
-                raise
             except TimeoutError:
                 self.logger.debug(
                     "Process %s with PID %s did not stop in time. Sending terminate...",
                     self.name,
                     self.proc.pid,
                 )
-                self.proc.terminate()
+                with suppress(ProcessLookupError):
+                    self.proc.terminate()
         self.logger.log(
             VERBOSE_LOG_LEVEL,
             "Process %s with PID %s stopped with returncode %s",