async def __aenter__(self) -> "AsyncProcess":
"""Enter context manager, start running the process in executor."""
- LOGGER.debug("[%s] Entered context manager", self._id)
self.__proc_task = self.loop.run_in_executor(None, self.__run_proc)
return self
exc_type,
str(exc_value),
)
- else:
- LOGGER.debug("[%s] Context manager exit", self._id)
self._exit = True
# prevent a deadlock by clearing the queues
await self.__queue_out.get()
self.__queue_out.task_done()
await self.__proc_task
- LOGGER.debug("[%s] Cleanup finished", self._id)
+ LOGGER.debug("[%s] Context manager closed", self._id)
return True
async def iterate_chunks(self) -> AsyncGenerator[bytes, None]:
"""Yield chunks from the output Queue. Generator."""
- LOGGER.debug("[%s] start reading from generator", self._id)
while True:
chunk = await self.read()
yield chunk
if not chunk or len(chunk) < self._chunksize:
break
- LOGGER.debug("[%s] finished reading from generator", self._id)
async def read(self) -> bytes:
"""Read single chunk from the output Queue."""
def __run_proc(self):
"""Run process in executor."""
try:
- LOGGER.info(
+ LOGGER.debug(
"[%s] Starting process with args: %s", self._id, str(self._process_args)
)
proc = subprocess.Popen(
proc.wait()
except Exception as exc: # pylint: disable=broad-except
+ LOGGER.warning("[%s] process exiting abormally: %s", self._id, str(exc))
LOGGER.exception(exc)
finally:
- LOGGER.error("[%s] process exiting", self._id)
if proc.poll() is None:
proc.terminate()
proc.communicate()
def __write_stdin(self, _stdin):
"""Put chunks from queue to stdin."""
- LOGGER.debug("[%s] start write_stdin", self._id)
try:
while True:
chunk = asyncio.run_coroutine_threadsafe(
break
_stdin.write(chunk)
except Exception as exc: # pylint: disable=broad-except
- LOGGER.debug("[%s] write_stdin aborted (%s)", self._id, str(exc))
- else:
- LOGGER.debug("[%s] write_stdin finished", self._id)
+ LOGGER.debug(
+ "[%s] write to stdin aborted with exception: %s", self._id, str(exc)
+ )
def __read_stdout(self, _stdout):
"""Put chunks from stdout to queue."""
- LOGGER.debug("[%s] start read_stdout", self._id)
try:
while True:
chunk = _stdout.read(self._chunksize)
self.__queue_out.put(chunk), self.loop
).result()
if not chunk or len(chunk) < self._chunksize:
- LOGGER.debug("[%s] last chunk received on stdout", self._id)
break
# write empty chunk just in case
asyncio.run_coroutine_threadsafe(self.__queue_out.put(b""), self.loop)
except Exception as exc: # pylint: disable=broad-except
- LOGGER.debug("[%s] read_stdout aborted (%s)", self._id, str(exc))
- else:
- LOGGER.debug("[%s] read_stdout finished", self._id)
+ LOGGER.debug(
+ "[%s] read from stdout aborted with exception: %s", self._id, str(exc)
+ )