From 2bef37fb9888ddf45b4b3a768c47d08994a60a5c Mon Sep 17 00:00:00 2001 From: Marcel van der Veldt Date: Sun, 2 Nov 2025 16:54:44 +0100 Subject: [PATCH] Fix: clean shutdown of ffmpeg process --- music_assistant/helpers/ffmpeg.py | 43 +++------ music_assistant/helpers/process.py | 140 ++++++++++++++++------------- 2 files changed, 90 insertions(+), 93 deletions(-) diff --git a/music_assistant/helpers/ffmpeg.py b/music_assistant/helpers/ffmpeg.py index 2ae882cc..d05d0a59 100644 --- a/music_assistant/helpers/ffmpeg.py +++ b/music_assistant/helpers/ffmpeg.py @@ -59,8 +59,8 @@ class FFMpeg(AsyncProcess): self.input_format = input_format self.collect_log_history = collect_log_history self.log_history: deque[str] = deque(maxlen=100) - self._stdin_task: asyncio.Task[None] | None = None - self._logger_task: asyncio.Task[None] | None = None + self._stdin_feeder_task: asyncio.Task[None] | None = None + self._stderr_reader_task: asyncio.Task[None] | None = None self._input_codec_parsed = False stdin: bool | int if audio_input == "-" or isinstance(audio_input, AsyncGenerator): @@ -93,9 +93,9 @@ class FFMpeg(AsyncProcess): clean_args.append(arg) args_str = " ".join(clean_args) self.logger.log(VERBOSE_LOG_LEVEL, "started with args: %s", args_str) - self._logger_task = asyncio.create_task(self._log_reader_task()) + self._stderr_reader_task = asyncio.create_task(self._log_reader_task()) if isinstance(self.audio_input, AsyncGenerator): - self._stdin_task = asyncio.create_task(self._feed_stdin()) + self._stdin_feeder_task = asyncio.create_task(self._feed_stdin()) async def communicate( self, @@ -103,37 +103,21 @@ class FFMpeg(AsyncProcess): timeout: float | None = None, ) -> tuple[bytes, bytes]: """Override communicate to avoid blocking.""" - if self._stdin_task: - if not self._stdin_task.done(): - self._stdin_task.cancel() + if self._stdin_feeder_task: + if not self._stdin_feeder_task.done(): + self._stdin_feeder_task.cancel() # Always await the task to consume any exception and prevent # "Task exception was never retrieved" errors. # Suppress CancelledError (from cancel) and any other exception # since exceptions have already been propagated through the generator chain. with suppress(asyncio.CancelledError, Exception): - await self._stdin_task - if self._logger_task and not self._logger_task.done(): - self._logger_task.cancel() - return await super().communicate(input, timeout) - - async def close(self, send_signal: bool = True) -> None: - """Close/terminate the process and wait for exit.""" - if self.closed: - return - if self._stdin_task: - if not self._stdin_task.done(): - self._stdin_task.cancel() - # Always await the task to consume any exception and prevent - # "Task exception was never retrieved" errors. - # Suppress CancelledError (from cancel) and any other exception - # since exceptions have already been propagated through the generator chain. + await self._stdin_feeder_task + if self._stderr_reader_task: + if not self._stderr_reader_task.done(): + self._stderr_reader_task.cancel() with suppress(asyncio.CancelledError, Exception): - await self._stdin_task - await super().close(send_signal) - if self._logger_task and not self._logger_task.done(): - self._logger_task.cancel() - with suppress(asyncio.CancelledError): - await self._logger_task + await self._stderr_reader_task + return await super().communicate(input, timeout) async def _log_reader_task(self) -> None: """Read ffmpeg log from stderr.""" @@ -152,7 +136,6 @@ class FFMpeg(AsyncProcess): decode_errors += 1 if decode_errors >= 50: self.logger.error(line) - await super().close(True) # if streamdetails contenttype is unknown, try parse it from the ffmpeg log if line.startswith("Stream #") and ": Audio: " in line: diff --git a/music_assistant/helpers/process.py b/music_assistant/helpers/process.py index 499f7fc4..394fe4ad 100644 --- a/music_assistant/helpers/process.py +++ b/music_assistant/helpers/process.py @@ -35,6 +35,9 @@ class AsyncProcess: without deadlocking. """ + _stdin_feeder_task: asyncio.Task[None] | None = None # used for ffmpeg + _stderr_reader_task: asyncio.Task[None] | None = None # used for ffmpeg + def __init__( self, args: list[str], @@ -53,6 +56,9 @@ class AsyncProcess: self._stdin = None if stdin is False else stdin self._stdout = None if stdout is False else stdout self._stderr = asyncio.subprocess.DEVNULL if stderr is False else stderr + self._stderr_lock = asyncio.Lock() + self._stdout_lock = asyncio.Lock() + self._stdin_lock = asyncio.Lock() self._close_called = False self._returncode: int | None = None @@ -123,10 +129,11 @@ class AsyncProcess: return b"" assert self.proc is not None # for type checking assert self.proc.stdout is not None # for type checking - try: - return await self.proc.stdout.readexactly(n) - except asyncio.IncompleteReadError as err: - return err.partial + async with self._stdout_lock: + try: + return await self.proc.stdout.readexactly(n) + except asyncio.IncompleteReadError as err: + return err.partial async def read(self, n: int) -> bytes: """Read up to n bytes from the stdout stream. @@ -139,36 +146,39 @@ class AsyncProcess: return b"" assert self.proc is not None # for type checking assert self.proc.stdout is not None # for type checking - return await self.proc.stdout.read(n) + async with self._stdout_lock: + return await self.proc.stdout.read(n) async def write(self, data: bytes) -> None: """Write data to process stdin.""" - if self.closed: - raise RuntimeError("write called while process already done") + if self._close_called: + return assert self.proc is not None # for type checking assert self.proc.stdin is not None # for type checking - self.proc.stdin.write(data) - with suppress(BrokenPipeError, ConnectionResetError): - await self.proc.stdin.drain() + async with self._stdin_lock: + self.proc.stdin.write(data) + with suppress(BrokenPipeError, ConnectionResetError): + await self.proc.stdin.drain() async def write_eof(self) -> None: """Write end of file to to process stdin.""" - if self.closed: + if self._close_called: return assert self.proc is not None # for type checking assert self.proc.stdin is not None # for type checking - try: - if self.proc.stdin.can_write_eof(): - self.proc.stdin.write_eof() - except ( - AttributeError, - AssertionError, - BrokenPipeError, - RuntimeError, - ConnectionResetError, - ): - # already exited, race condition - pass + async with self._stdin_lock: + try: + if self.proc.stdin.can_write_eof(): + self.proc.stdin.write_eof() + except ( + AttributeError, + AssertionError, + BrokenPipeError, + RuntimeError, + ConnectionResetError, + ): + # already exited, race condition + pass async def read_stderr(self) -> bytes: """Read line from stderr.""" @@ -176,18 +186,19 @@ class AsyncProcess: return b"" assert self.proc is not None # for type checking assert self.proc.stderr is not None # for type checking - try: - return await self.proc.stderr.readline() - except ValueError as err: - # we're waiting for a line (separator found), but the line was too big - # this may happen with ffmpeg during a long (radio) stream where progress - # gets outputted to the stderr but no newline - # https://stackoverflow.com/questions/55457370/how-to-avoid-valueerror-separator-is-not-found-and-chunk-exceed-the-limit - # NOTE: this consumes the line that was too big - if "chunk exceed the limit" in str(err): + async with self._stderr_lock: + try: return await self.proc.stderr.readline() - # raise for all other (value) errors - raise + except ValueError as err: + # we're waiting for a line (separator found), but the line was too big + # this may happen with ffmpeg during a long (radio) stream where progress + # gets outputted to the stderr but no newline + # https://stackoverflow.com/questions/55457370/how-to-avoid-valueerror-separator-is-not-found-and-chunk-exceed-the-limit + # NOTE: this consumes the line that was too big + if "chunk exceed the limit" in str(err): + return await self.proc.stderr.readline() + # raise for all other (value) errors + raise async def iter_stderr(self) -> AsyncGenerator[str, None]: """Iterate lines from the stderr stream as string.""" @@ -210,17 +221,9 @@ class AsyncProcess: if self.closed: raise RuntimeError("communicate called while process already done") # abort existing readers on stderr/stdout first before we send communicate - waiter: asyncio.Future[None] + await self._stderr_lock.acquire() + await self._stdout_lock.acquire() assert self.proc is not None # for type checking - # _waiter is attribute of StreamReader - if self.proc.stdout and (waiter := self.proc.stdout._waiter): # type: ignore[attr-defined] - self.proc.stdout._waiter = None # type: ignore[attr-defined] - if waiter and not waiter.done(): - waiter.set_exception(asyncio.CancelledError()) - if self.proc.stderr and (waiter := self.proc.stderr._waiter): # type: ignore[attr-defined] - self.proc.stderr._waiter = None # type: ignore[attr-defined] - if waiter and not waiter.done(): - waiter.set_exception(asyncio.CancelledError()) stdout, stderr = await asyncio.wait_for(self.proc.communicate(input), timeout) return (stdout, stderr) @@ -231,21 +234,36 @@ class AsyncProcess: return if send_signal and self.returncode is None: self.proc.send_signal(SIGINT) + + # cancel existing stdin feeder task if any + if self._stdin_feeder_task: + if not self._stdin_feeder_task.done(): + self._stdin_feeder_task.cancel() + # Always await the task to consume any exception and prevent + # "Task exception was never retrieved" errors. + # Suppress CancelledError (from cancel) and any other exception + # since exceptions have already been propagated through the generator chain. + with suppress(asyncio.CancelledError, Exception): + await self._stdin_feeder_task + + # close stdin to signal we're done sending data + await asyncio.wait_for(self._stdin_lock.acquire(), 10) if self.proc.stdin and not self.proc.stdin.is_closing(): self.proc.stdin.close() - # abort existing readers on stderr/stdout first before we send communicate - # waiter: asyncio.Future[None] - # stdout_waiter = self.proc.stdout._waiter # type: ignore[attr-defined] - # if self.proc.stdout and stdout_waiter: - # self.proc.stdout._waiter = None # type: ignore[attr-defined] - # if stdout_waiter and not stdout_waiter.done(): - # stdout_waiter.set_exception(asyncio.CancelledError()) - # stderr_waiter = self.proc.stderr._waiter # type: ignore[attr-defined] - # if self.proc.stderr and stderr_waiter: - # self.proc.stderr._waiter = None # type: ignore[attr-defined] - # if stderr_waiter and not stderr_waiter.done(): - # stderr_waiter.set_exception(asyncio.CancelledError()) - await asyncio.sleep(0) # yield to loop + + # ensure we have no more readers active and stdout is drained + await asyncio.wait_for(self._stdout_lock.acquire(), 10) + if self.proc.stdout and not self.proc.stdout.at_eof(): + with suppress(Exception): + await self.proc.stdout.read(-1) + # if we have a stderr task active, allow it to finish + if self._stderr_reader_task: + await asyncio.wait_for(self._stderr_reader_task, 10) + elif self.proc.stderr and not self.proc.stderr.at_eof(): + await asyncio.wait_for(self._stderr_lock.acquire(), 10) + # drain stderr + with suppress(Exception): + await self.proc.stderr.read(-1) # make sure the process is really cleaned up. # especially with pipes this can cause deadlocks if not properly guarded @@ -254,18 +272,14 @@ class AsyncProcess: try: # use communicate to flush all pipe buffers await asyncio.wait_for(self.proc.communicate(), 5) - except RuntimeError as err: - if "read() called while another coroutine" in str(err): - # race condition - continue - raise except TimeoutError: self.logger.debug( "Process %s with PID %s did not stop in time. Sending terminate...", self.name, self.proc.pid, ) - self.proc.terminate() + with suppress(ProcessLookupError): + self.proc.terminate() self.logger.log( VERBOSE_LOG_LEVEL, "Process %s with PID %s stopped with returncode %s", -- 2.34.1