self, process_args: List, enable_write: bool = False, enable_shell=False
):
"""Initialize."""
+ self._id = "".join(process_args)
self._proc = subprocess.Popen(
process_args,
shell=enable_shell,
"""Exit context manager."""
self._cancelled = True
if self._proc.poll() is None:
- # prevent subprocess deadlocking, send terminate and read remaining bytes
- def close_proc():
- self._proc.terminate()
- self._proc.stdin.close()
- self._proc.stdout.read(-1)
+ # process needs to be cleaned up..
+ await self.loop.run_in_executor(None, self.__close)
- await self.loop.run_in_executor(None, close_proc)
return exc_type not in (GeneratorExit, asyncio.CancelledError)
async def iterate_chunks(
self, chunksize: int = DEFAULT_CHUNKSIZE
) -> AsyncGenerator[bytes, None]:
"""Yield chunks from the process stdout. Generator."""
- while True:
+ while not self._cancelled:
chunk = await self.read(chunksize)
yield chunk
if len(chunk) < chunksize:
raise asyncio.CancelledError()
return await self.loop.run_in_executor(None, self.__read, chunksize)
- def __read(self, chunksize: int = DEFAULT_CHUNKSIZE):
- """Try read chunk from process."""
- try:
- return self._proc.stdout.read(chunksize)
- except (BrokenPipeError, ValueError, AttributeError):
- # Process already exited
- return b""
-
async def write(self, data: bytes) -> None:
"""Write data to process stdin."""
if self._cancelled:
raise asyncio.CancelledError()
-
- def __write():
- try:
- self._proc.stdin.write(data)
- except (BrokenPipeError, ValueError, AttributeError):
- # Process already exited
- pass
-
- await self.loop.run_in_executor(None, __write)
+ await self.loop.run_in_executor(None, self.__write, data)
async def write_eof(self) -> None:
"""Write eof to process."""
if self._cancelled:
raise asyncio.CancelledError()
-
- def __write_eof():
- try:
- self._proc.stdin.close()
- except (BrokenPipeError, ValueError, AttributeError):
- # Process already exited
- pass
-
- await self.loop.run_in_executor(None, __write_eof)
+ await self.loop.run_in_executor(None, self.__write_eof)
async def communicate(self, input_data: Optional[bytes] = None) -> bytes:
"""Write bytes to process and read back results."""
raise asyncio.CancelledError()
return await self.loop.run_in_executor(None, self._proc.communicate, input_data)
+ def __read(self, chunksize: int = DEFAULT_CHUNKSIZE):
+ """Try read chunk from process."""
+ try:
+ chunk = self._proc.stdout.read(chunksize)
+ self._proc.stdout.flush()
+ return chunk
+ except (BrokenPipeError, ValueError, AttributeError):
+ # Process already exited
+ return b""
+
+ def __write(self, data: bytes):
+ """Write data to process stdin."""
+ try:
+ self._proc.stdin.write(data)
+ except (BrokenPipeError, ValueError, AttributeError):
+ # Process already exited
+ pass
+
+ def __write_eof(self):
+ """Write eof to process stdin."""
+ try:
+ self._proc.stdin.close()
+ except (BrokenPipeError, ValueError, AttributeError):
+ # Process already exited
+ pass
+
+ def __close(self):
+ """Prevent subprocess deadlocking, make sure it closes."""
+ LOGGER.debug("Cleaning up process %s...", self._id)
+ try:
+ self._proc.stdout.close()
+ except BrokenPipeError:
+ pass
+ if self._proc.stdin:
+ try:
+ self._proc.stdin.close()
+ except BrokenPipeError:
+ pass
+ self._proc.kill()
+ self._proc.wait()
+ LOGGER.debug("Process %s closed.", self._id)
+
class AsyncProcessBroken:
"""Implementation of a (truly) non blocking subprocess."""