self.input_format = input_format
self.collect_log_history = collect_log_history
self.log_history: deque[str] = deque(maxlen=100)
- self._stdin_task: asyncio.Task[None] | None = None
- self._logger_task: asyncio.Task[None] | None = None
+ self._stdin_feeder_task: asyncio.Task[None] | None = None
+ self._stderr_reader_task: asyncio.Task[None] | None = None
self._input_codec_parsed = False
stdin: bool | int
if audio_input == "-" or isinstance(audio_input, AsyncGenerator):
clean_args.append(arg)
args_str = " ".join(clean_args)
self.logger.log(VERBOSE_LOG_LEVEL, "started with args: %s", args_str)
- self._logger_task = asyncio.create_task(self._log_reader_task())
+ self._stderr_reader_task = asyncio.create_task(self._log_reader_task())
if isinstance(self.audio_input, AsyncGenerator):
- self._stdin_task = asyncio.create_task(self._feed_stdin())
+ self._stdin_feeder_task = asyncio.create_task(self._feed_stdin())
async def communicate(
self,
timeout: float | None = None,
) -> tuple[bytes, bytes]:
"""Override communicate to avoid blocking."""
- if self._stdin_task:
- if not self._stdin_task.done():
- self._stdin_task.cancel()
+ if self._stdin_feeder_task:
+ if not self._stdin_feeder_task.done():
+ self._stdin_feeder_task.cancel()
# Always await the task to consume any exception and prevent
# "Task exception was never retrieved" errors.
# Suppress CancelledError (from cancel) and any other exception
# since exceptions have already been propagated through the generator chain.
with suppress(asyncio.CancelledError, Exception):
- await self._stdin_task
- if self._logger_task and not self._logger_task.done():
- self._logger_task.cancel()
- return await super().communicate(input, timeout)
-
- async def close(self, send_signal: bool = True) -> None:
- """Close/terminate the process and wait for exit."""
- if self.closed:
- return
- if self._stdin_task:
- if not self._stdin_task.done():
- self._stdin_task.cancel()
- # Always await the task to consume any exception and prevent
- # "Task exception was never retrieved" errors.
- # Suppress CancelledError (from cancel) and any other exception
- # since exceptions have already been propagated through the generator chain.
+ await self._stdin_feeder_task
+ if self._stderr_reader_task:
+ if not self._stderr_reader_task.done():
+ self._stderr_reader_task.cancel()
with suppress(asyncio.CancelledError, Exception):
- await self._stdin_task
- await super().close(send_signal)
- if self._logger_task and not self._logger_task.done():
- self._logger_task.cancel()
- with suppress(asyncio.CancelledError):
- await self._logger_task
+ await self._stderr_reader_task
+ return await super().communicate(input, timeout)
async def _log_reader_task(self) -> None:
"""Read ffmpeg log from stderr."""
decode_errors += 1
if decode_errors >= 50:
self.logger.error(line)
- await super().close(True)
# if streamdetails contenttype is unknown, try parse it from the ffmpeg log
if line.startswith("Stream #") and ": Audio: " in line:
without deadlocking.
"""
+ _stdin_feeder_task: asyncio.Task[None] | None = None # used for ffmpeg
+ _stderr_reader_task: asyncio.Task[None] | None = None # used for ffmpeg
+
def __init__(
self,
args: list[str],
self._stdin = None if stdin is False else stdin
self._stdout = None if stdout is False else stdout
self._stderr = asyncio.subprocess.DEVNULL if stderr is False else stderr
+ self._stderr_lock = asyncio.Lock()
+ self._stdout_lock = asyncio.Lock()
+ self._stdin_lock = asyncio.Lock()
self._close_called = False
self._returncode: int | None = None
return b""
assert self.proc is not None # for type checking
assert self.proc.stdout is not None # for type checking
- try:
- return await self.proc.stdout.readexactly(n)
- except asyncio.IncompleteReadError as err:
- return err.partial
+ async with self._stdout_lock:
+ try:
+ return await self.proc.stdout.readexactly(n)
+ except asyncio.IncompleteReadError as err:
+ return err.partial
async def read(self, n: int) -> bytes:
"""Read up to n bytes from the stdout stream.
return b""
assert self.proc is not None # for type checking
assert self.proc.stdout is not None # for type checking
- return await self.proc.stdout.read(n)
+ async with self._stdout_lock:
+ return await self.proc.stdout.read(n)
async def write(self, data: bytes) -> None:
"""Write data to process stdin."""
- if self.closed:
- raise RuntimeError("write called while process already done")
+ if self._close_called:
+ return
assert self.proc is not None # for type checking
assert self.proc.stdin is not None # for type checking
- self.proc.stdin.write(data)
- with suppress(BrokenPipeError, ConnectionResetError):
- await self.proc.stdin.drain()
+ async with self._stdin_lock:
+ self.proc.stdin.write(data)
+ with suppress(BrokenPipeError, ConnectionResetError):
+ await self.proc.stdin.drain()
async def write_eof(self) -> None:
"""Write end of file to to process stdin."""
- if self.closed:
+ if self._close_called:
return
assert self.proc is not None # for type checking
assert self.proc.stdin is not None # for type checking
- try:
- if self.proc.stdin.can_write_eof():
- self.proc.stdin.write_eof()
- except (
- AttributeError,
- AssertionError,
- BrokenPipeError,
- RuntimeError,
- ConnectionResetError,
- ):
- # already exited, race condition
- pass
+ async with self._stdin_lock:
+ try:
+ if self.proc.stdin.can_write_eof():
+ self.proc.stdin.write_eof()
+ except (
+ AttributeError,
+ AssertionError,
+ BrokenPipeError,
+ RuntimeError,
+ ConnectionResetError,
+ ):
+ # already exited, race condition
+ pass
async def read_stderr(self) -> bytes:
"""Read line from stderr."""
return b""
assert self.proc is not None # for type checking
assert self.proc.stderr is not None # for type checking
- try:
- return await self.proc.stderr.readline()
- except ValueError as err:
- # we're waiting for a line (separator found), but the line was too big
- # this may happen with ffmpeg during a long (radio) stream where progress
- # gets outputted to the stderr but no newline
- # https://stackoverflow.com/questions/55457370/how-to-avoid-valueerror-separator-is-not-found-and-chunk-exceed-the-limit
- # NOTE: this consumes the line that was too big
- if "chunk exceed the limit" in str(err):
+ async with self._stderr_lock:
+ try:
return await self.proc.stderr.readline()
- # raise for all other (value) errors
- raise
+ except ValueError as err:
+ # we're waiting for a line (separator found), but the line was too big
+ # this may happen with ffmpeg during a long (radio) stream where progress
+ # gets outputted to the stderr but no newline
+ # https://stackoverflow.com/questions/55457370/how-to-avoid-valueerror-separator-is-not-found-and-chunk-exceed-the-limit
+ # NOTE: this consumes the line that was too big
+ if "chunk exceed the limit" in str(err):
+ return await self.proc.stderr.readline()
+ # raise for all other (value) errors
+ raise
async def iter_stderr(self) -> AsyncGenerator[str, None]:
"""Iterate lines from the stderr stream as string."""
if self.closed:
raise RuntimeError("communicate called while process already done")
# abort existing readers on stderr/stdout first before we send communicate
- waiter: asyncio.Future[None]
+ await self._stderr_lock.acquire()
+ await self._stdout_lock.acquire()
assert self.proc is not None # for type checking
- # _waiter is attribute of StreamReader
- if self.proc.stdout and (waiter := self.proc.stdout._waiter): # type: ignore[attr-defined]
- self.proc.stdout._waiter = None # type: ignore[attr-defined]
- if waiter and not waiter.done():
- waiter.set_exception(asyncio.CancelledError())
- if self.proc.stderr and (waiter := self.proc.stderr._waiter): # type: ignore[attr-defined]
- self.proc.stderr._waiter = None # type: ignore[attr-defined]
- if waiter and not waiter.done():
- waiter.set_exception(asyncio.CancelledError())
stdout, stderr = await asyncio.wait_for(self.proc.communicate(input), timeout)
return (stdout, stderr)
return
if send_signal and self.returncode is None:
self.proc.send_signal(SIGINT)
+
+ # cancel existing stdin feeder task if any
+ if self._stdin_feeder_task:
+ if not self._stdin_feeder_task.done():
+ self._stdin_feeder_task.cancel()
+ # Always await the task to consume any exception and prevent
+ # "Task exception was never retrieved" errors.
+ # Suppress CancelledError (from cancel) and any other exception
+ # since exceptions have already been propagated through the generator chain.
+ with suppress(asyncio.CancelledError, Exception):
+ await self._stdin_feeder_task
+
+ # close stdin to signal we're done sending data
+ await asyncio.wait_for(self._stdin_lock.acquire(), 10)
if self.proc.stdin and not self.proc.stdin.is_closing():
self.proc.stdin.close()
- # abort existing readers on stderr/stdout first before we send communicate
- # waiter: asyncio.Future[None]
- # stdout_waiter = self.proc.stdout._waiter # type: ignore[attr-defined]
- # if self.proc.stdout and stdout_waiter:
- # self.proc.stdout._waiter = None # type: ignore[attr-defined]
- # if stdout_waiter and not stdout_waiter.done():
- # stdout_waiter.set_exception(asyncio.CancelledError())
- # stderr_waiter = self.proc.stderr._waiter # type: ignore[attr-defined]
- # if self.proc.stderr and stderr_waiter:
- # self.proc.stderr._waiter = None # type: ignore[attr-defined]
- # if stderr_waiter and not stderr_waiter.done():
- # stderr_waiter.set_exception(asyncio.CancelledError())
- await asyncio.sleep(0) # yield to loop
+
+ # ensure we have no more readers active and stdout is drained
+ await asyncio.wait_for(self._stdout_lock.acquire(), 10)
+ if self.proc.stdout and not self.proc.stdout.at_eof():
+ with suppress(Exception):
+ await self.proc.stdout.read(-1)
+ # if we have a stderr task active, allow it to finish
+ if self._stderr_reader_task:
+ await asyncio.wait_for(self._stderr_reader_task, 10)
+ elif self.proc.stderr and not self.proc.stderr.at_eof():
+ await asyncio.wait_for(self._stderr_lock.acquire(), 10)
+ # drain stderr
+ with suppress(Exception):
+ await self.proc.stderr.read(-1)
# make sure the process is really cleaned up.
# especially with pipes this can cause deadlocks if not properly guarded
try:
# use communicate to flush all pipe buffers
await asyncio.wait_for(self.proc.communicate(), 5)
- except RuntimeError as err:
- if "read() called while another coroutine" in str(err):
- # race condition
- continue
- raise
except TimeoutError:
self.logger.debug(
"Process %s with PID %s did not stop in time. Sending terminate...",
self.name,
self.proc.pid,
)
- self.proc.terminate()
+ with suppress(ProcessLookupError):
+ self.proc.terminate()
self.logger.log(
VERBOSE_LOG_LEVEL,
"Process %s with PID %s stopped with returncode %s",