self._chunksize = chunksize
self._enable_write = enable_write
self._enable_shell = enable_shell
+ self.loop = asyncio.get_running_loop()
+ self.__queue_in = asyncio.Queue(4)
+ self.__queue_out = asyncio.Queue(8)
+ self.__proc_task = None
self._exit = False
- self._proc = None
self._id = int(time.time()) # some identifier for logging
async def __aenter__(self) -> "AsyncProcess":
"""Enter context manager, start running the process in executor."""
LOGGER.debug("[%s] Entered context manager", self._id)
- self._proc = subprocess.Popen(
- self._process_args,
- **{
- "shell": self._enable_shell,
- "stdout": subprocess.PIPE,
- "stdin": subprocess.PIPE if self._enable_write else None,
- },
- )
+ self.__proc_task = self.loop.run_in_executor(None, self.__run_proc)
return self
async def __aexit__(self, exc_type, exc_value, traceback) -> bool:
)
else:
LOGGER.debug("[%s] Context manager exit", self._id)
- # signal that we must exit
- self._exit = True
-
- def close_proc():
- if self._proc and self._proc.poll() is None:
- # there is no real clean way to do this with all the blocking pipes
- self._proc.kill()
- await asyncio.get_running_loop().run_in_executor(None, close_proc)
+ self._exit = True
+ # prevent a deadlock by clearing the queues
+ while self.__queue_in.qsize():
+ await self.__queue_in.get()
+ self.__queue_in.task_done()
+ self.__queue_in.put_nowait(b"")
+ while self.__queue_out.qsize():
+ await self.__queue_out.get()
+ self.__queue_out.task_done()
+ await self.__proc_task
LOGGER.debug("[%s] Cleanup finished", self._id)
return True
async def iterate_chunks(self) -> AsyncGenerator[bytes, None]:
"""Yield chunks from the output Queue. Generator."""
LOGGER.debug("[%s] start reading from generator", self._id)
- while not self._exit:
+ while True:
chunk = await self.read()
yield chunk
- if len(chunk) < self._chunksize:
+ if not chunk or len(chunk) < self._chunksize:
break
LOGGER.debug("[%s] finished reading from generator", self._id)
async def read(self) -> bytes:
"""Read single chunk from the output Queue."""
-
- def try_read():
- try:
- data = self._proc.stdout.read(self._chunksize)
- return data
- except BrokenPipeError:
- return b""
- except Exception as exc: # pylint: disable=broad-except
- LOGGER.exception(exc)
- return b""
-
- return await asyncio.get_running_loop().run_in_executor(None, try_read)
+ if self._exit:
+ raise RuntimeError("Already exited")
+ data = await self.__queue_out.get()
+ self.__queue_out.task_done()
+ return data
async def write(self, data: bytes) -> None:
"""Write data to process."""
-
- def try_write(_data):
- try:
- self._proc.stdin.write(_data)
- except BrokenPipeError:
- pass
- except Exception as exc: # pylint: disable=broad-except
- LOGGER.exception(exc)
-
- await asyncio.get_running_loop().run_in_executor(None, try_write, data)
+ if self._exit:
+ raise RuntimeError("Already exited")
+ await self.__queue_in.put(data)
async def write_eof(self) -> None:
"""Write eof to process."""
-
- def try_write():
- try:
- self._proc.stdin.close()
- except BrokenPipeError:
- pass
- except Exception as exc: # pylint: disable=broad-except
- LOGGER.exception(exc)
-
- await asyncio.get_running_loop().run_in_executor(None, try_write)
-
- async def communicate(self, input_data: Optional[bytes] = None) -> bytes:
- """Write bytes to process and read back results."""
- if not self._enable_write and input_data:
- raise RuntimeError("Write is disabled")
- if input_data:
- await self.write(input_data)
- output = b""
- async for chunk in self.iterate_chunks():
- output += chunk
- return output
-
-
-# first attempt with queues, too complicated
-# left here as reference
-class AsyncProcessWithQueues(object):
- """Implementation of a (truly) non blocking subprocess."""
-
- def __init__(
- self,
- process_args: List,
- chunksize=512000,
- enable_write: bool = False,
- enable_shell=False,
- ):
- """Initialize."""
- self._process_args = process_args
- self._chunksize = chunksize
- self._enable_write = enable_write
- self._enable_shell = enable_shell
- # we have large chunks, limit the queue size a bit.
- import janus
-
- self.__queue_in = janus.Queue(8)
- self.__queue_out = janus.Queue(4)
- self.__proc_task = None
- self._exit = threading.Event()
- self._id = int(time.time()) # some identifier for logging
-
- async def __aenter__(self) -> "AsyncProcess":
- """Enter context manager, start running the process in executor."""
- LOGGER.debug("[%s] Entered context manager", self._id)
- self.__proc_task = asyncio.get_running_loop().run_in_executor(None, self._run)
- return self
-
- async def __aexit__(self, exc_type, exc_value, traceback) -> bool:
- """Exit context manager."""
- if exc_type:
- LOGGER.debug(
- "[%s] Context manager exit with exception %s (%s)",
- self._id,
- exc_type,
- str(exc_value),
- )
- else:
- LOGGER.debug("[%s] Context manager exit", self._id)
- # signal that we must exit
- self._exit.set()
- # if self._proc and self._proc.poll() is None:
- # asyncio.get_running_loop().run_in_executor(None, self._proc.communicate)
- print("1")
- self.__queue_out.close()
- self.__queue_in.close()
- print("2")
- await self.__queue_out.wait_closed()
- await self.__queue_in.wait_closed()
- print("3")
- # await executor job
- self.__proc_task.cancel()
- # await self.__proc_task
- print("4")
-
- LOGGER.debug("[%s] Cleanup finished", self._id)
- return True
-
- async def iterate_chunks(self) -> AsyncGenerator[bytes, None]:
- """Yield chunks from the output Queue. Generator."""
- LOGGER.debug("[%s] start reading from generator", self._id)
- while not self._exit.is_set():
- chunk = await self.__queue_out.async_q.get()
- self.__queue_out.async_q.task_done()
- if not chunk:
- break
- yield chunk
- LOGGER.debug("[%s] finished reading from generator", self._id)
-
- async def read(self) -> bytes:
- """Read single chunk from the output Queue."""
- chunk = await self.__queue_out.async_q.get()
- self.__queue_out.async_q.task_done()
- return chunk
-
- async def write(self, data: Optional[bytes] = None) -> None:
- """Write data to process."""
- if not self._exit.is_set():
- await self.__queue_in.async_q.put(data)
-
- async def write_eof(self) -> None:
- """Write eof to process stdin."""
- await self.write(b"")
+ await self.__queue_in.put(b"")
async def communicate(self, input_data: Optional[bytes] = None) -> bytes:
"""Write bytes to process and read back results."""
output += chunk
return output
- def _run(self):
- """Run actual process in executor thread."""
- LOGGER.info(
- "[%s] Starting process with args: %s", self._id, str(self._process_args)
- )
- proc = subprocess.Popen(
- self._process_args,
- **{
- "shell": self._enable_shell,
- "stdout": subprocess.PIPE,
- "stdin": subprocess.PIPE if self._enable_write else None,
- },
- )
-
- # start fill buffer task in (yet another) background thread
- def fill_buffer():
- LOGGER.debug("[%s] start fill buffer", self._id)
- try:
- while not self._exit.is_set() and not self.__queue_in.closed:
- chunk = self.__queue_in.sync_q.get()
- if not chunk:
- break
- proc.stdin.write(chunk)
- except Exception as exc: # pylint: disable=broad-except
- LOGGER.debug("[%s], fill buffer aborted (%s)", self._id, str(exc))
- else:
- LOGGER.debug("[%s] fill buffer finished", self._id)
-
- if self._enable_write:
- fill_buffer_thread = threading.Thread(
- target=fill_buffer, name=f"AsyncProcess_{self._id}"
- )
- fill_buffer_thread.start()
-
- # consume bytes from stdout
+ def __run_proc(self):
+ """Run process in executor."""
try:
- while not self._exit.is_set() and not self.__queue_out.closed:
- chunk = proc.stdout.read(self._chunksize)
- self.__queue_out.sync_q.put(chunk)
- if len(chunk) < self._chunksize:
- LOGGER.debug("[%s] last chunk received on stdout", self._id)
- break
+ LOGGER.info(
+ "[%s] Starting process with args: %s", self._id, str(self._process_args)
+ )
+ proc = subprocess.Popen(
+ self._process_args,
+ shell=self._enable_shell,
+ stdout=subprocess.PIPE,
+ stdin=subprocess.PIPE if self._enable_write else None,
+ )
if self._enable_write:
- fill_buffer_thread.join()
- # write empty chunk to out queue to indicate end of stream just in case
- self.__queue_out.sync_q.put(b"")
+ threading.Thread(
+ target=self.__write_stdin,
+ args=(proc.stdin,),
+ name=f"AsyncProcess_{self._id}_write_stdin",
+ daemon=True,
+ ).start()
+ threading.Thread(
+ target=self.__read_stdout,
+ args=(proc.stdout,),
+ name=f"AsyncProcess_{self._id}_read_stdout",
+ daemon=True,
+ ).start()
+ proc.wait()
+
+ except Exception as exc: # pylint: disable=broad-except
+ LOGGER.exception(exc)
finally:
- LOGGER.info("[%s] wait for process exit", self._id)
- # pickup remaining bytes if process is stull running
+ LOGGER.error("[%s] process exiting", self._id)
if proc.poll() is None:
+ proc.terminate()
proc.communicate()
+ LOGGER.debug("[%s] process finished", self._id)
+
+ def __write_stdin(self, _stdin):
+ """Put chunks from queue to stdin."""
+ LOGGER.debug("[%s] start write_stdin", self._id)
+ try:
+ while True:
+ chunk = asyncio.run_coroutine_threadsafe(
+ self.__queue_in.get(), self.loop
+ ).result()
+ self.__queue_in.task_done()
+ if not chunk:
+ _stdin.close()
+ break
+ _stdin.write(chunk)
+ except Exception as exc: # pylint: disable=broad-except
+ LOGGER.debug("[%s] write_stdin aborted (%s)", self._id, str(exc))
+ else:
+ LOGGER.debug("[%s] write_stdin finished", self._id)
+
+ def __read_stdout(self, _stdout):
+ """Put chunks from stdout to queue."""
+ LOGGER.debug("[%s] start read_stdout", self._id)
+ try:
+ while True:
+ chunk = _stdout.read(self._chunksize)
+ asyncio.run_coroutine_threadsafe(
+ self.__queue_out.put(chunk), self.loop
+ ).result()
+ if not chunk or len(chunk) < self._chunksize:
+ LOGGER.debug("[%s] last chunk received on stdout", self._id)
+ break
+ # write empty chunk just in case
+ asyncio.run_coroutine_threadsafe(self.__queue_out.put(b""), self.loop)
+ except Exception as exc: # pylint: disable=broad-except
+ LOGGER.debug("[%s] read_stdout aborted (%s)", self._id, str(exc))
+ else:
+ LOGGER.debug("[%s] read_stdout finished", self._id)
)
async with AsyncProcess(args, chunk_size, enable_write=True) as sox_proc:
+ cancelled = False
+
async def fill_buffer():
"""Forward audio chunks to sox stdin."""
LOGGER.debug(
)
# feed audio data into sox stdin for processing
async for chunk in self.async_get_media_stream(streamdetails):
+ if self.mass.exit or cancelled:
+ break
await sox_proc.write(chunk)
await sox_proc.write_eof()
LOGGER.debug(
fill_buffer_task = self.mass.loop.create_task(fill_buffer())
# yield chunks from stdout
# we keep 1 chunk behind to detect end of stream properly
- prev_chunk = b""
- async for chunk in sox_proc.iterate_chunks():
- if len(chunk) < chunk_size:
- # last chunk
- yield (True, prev_chunk + chunk)
- break
- if prev_chunk:
- yield (False, prev_chunk)
- prev_chunk = chunk
+ try:
+ prev_chunk = b""
+ async for chunk in sox_proc.iterate_chunks():
+ if len(chunk) < chunk_size:
+ # last chunk
+ yield (True, prev_chunk + chunk)
+ break
+ if prev_chunk:
+ yield (False, prev_chunk)
+ prev_chunk = chunk
- await asyncio.wait([fill_buffer_task])
+ await asyncio.wait([fill_buffer_task])
- LOGGER.debug(
- "[async_get_sox_stream] [%s/%s] finished",
- streamdetails.provider,
- streamdetails.item_id,
- )
+ except (GeneratorExit, Exception) as exc: # pylint: disable=broad-except
+ cancelled = True
+ fill_buffer_task.cancel()
+ LOGGER.debug(
+ "[async_get_sox_stream] [%s/%s] cancelled",
+ streamdetails.provider,
+ streamdetails.item_id,
+ )
+ raise exc
+ else:
+ LOGGER.debug(
+ "[async_get_sox_stream] [%s/%s] finished",
+ streamdetails.provider,
+ streamdetails.item_id,
+ )
async def async_queue_stream_flac(self, player_id) -> AsyncGenerator[bytes, None]:
"""Stream the PlayerQueue's tracks as constant feed in flac format."""
)
# feed stdin with pcm samples
+ cancelled = False
+
async def fill_buffer():
"""Feed audio data into sox stdin for processing."""
LOGGER.debug(
"[async_queue_stream_flac] [%s] fill buffer started", player_id
)
async for chunk in self.async_queue_stream_pcm(player_id, 96000, 32):
- if self.mass.exit:
- return
+ if self.mass.exit or cancelled:
+ break
await sox_proc.write(chunk)
# write eof when no more data
await sox_proc.write_eof()
)
fill_buffer_task = self.mass.loop.create_task(fill_buffer())
- # start yielding audio chunks
- async for chunk in sox_proc.iterate_chunks():
- yield chunk
- await asyncio.wait([fill_buffer_task])
- LOGGER.debug(
- "[async_queue_stream_flac] [%s] finished",
- player_id,
- )
+ try:
+ # start yielding audio chunks
+ async for chunk in sox_proc.iterate_chunks():
+ yield chunk
+ await asyncio.wait([fill_buffer_task])
+ except (GeneratorExit, Exception) as exc: # pylint: disable=broad-except
+ cancelled = True
+ fill_buffer_task.cancel()
+ LOGGER.debug(
+ "[async_queue_stream_flac] [%s] cancelled",
+ player_id,
+ )
+ raise exc
+ else:
+ LOGGER.debug(
+ "[async_queue_stream_flac] [%s] finished",
+ player_id,
+ )
async def async_queue_stream_pcm(
self, player_id, sample_rate=96000, bit_depth=32