# if we reach here, we have broken out of the loop due to inactivity
await self.clear(cancel_inactivity_task=False)
+ async def _notify_on_producer_error(self) -> None:
+ """Notify waiting consumers that producer has failed.
+
+ This is called from the producer task done callback and properly
+ acquires the lock before calling notify_all.
+ """
+ async with self._lock:
+ self._data_available.notify_all()
+
def attach_producer_task(self, task: asyncio.Task[Any]) -> None:
"""Attach a background task that fills the buffer."""
self._producer_task = task
# This prevents reuse of a buffer in error state
self._cancelled = True
# Wake up any waiting consumers so they can see the error
+ # We need to acquire the lock before calling notify_all
loop = asyncio.get_running_loop()
- loop.call_soon_threadsafe(self._data_available.notify_all)
+ loop.create_task(self._notify_on_producer_error())
task.add_done_callback(_on_producer_done)
exc_tb: TracebackType | None,
) -> bool | None:
"""Exit context manager."""
- # send interrupt signal to process when we're cancelled
- await self.close(send_signal=exc_type in (GeneratorExit, asyncio.CancelledError))
+ # make sure we close and cleanup the process
+ await self.close()
self._returncode = self.returncode
return None
stdout, stderr = await asyncio.wait_for(self.proc.communicate(input), timeout)
return (stdout, stderr)
- async def close(self, send_signal: bool = False) -> None:
+ async def close(self) -> None:
"""Close/terminate the process and wait for exit."""
self._close_called = True
if not self.proc:
return
- if send_signal and self.returncode is None:
- self.proc.send_signal(SIGINT)
# cancel existing stdin feeder task if any
if self._stdin_feeder_task:
await asyncio.wait_for(self._stdin_lock.acquire(), 10)
if self.proc.stdin and not self.proc.stdin.is_closing():
self.proc.stdin.close()
+ elif not self.proc.stdin and self.proc.returncode is None:
+ self.proc.send_signal(SIGINT)
# ensure we have no more readers active and stdout is drained
await asyncio.wait_for(self._stdout_lock.acquire(), 10)
"""Wait for the process and return the returncode with a timeout."""
return await asyncio.wait_for(self.wait(), timeout)
+ def attach_stderr_reader(self, task: asyncio.Task[None]) -> None:
+ """Attach a stderr reader task to this process."""
+ self._stderr_reader_task = task
+
async def check_output(*args: str, env: dict[str, str] | None = None) -> tuple[int, bytes]:
"""Run subprocess and return returncode and output."""