LOGGER = logging.getLogger("AsyncProcess")
+DEFAULT_CHUNKSIZE = 1000000
+
class AsyncProcess:
"""Implementation of a (truly) non blocking subprocess."""
+ # workaround that is compatible with uvloop
+
def __init__(
self,
process_args: List,
shell=enable_shell,
stdout=subprocess.PIPE,
stdin=subprocess.PIPE if enable_write else None,
+ # bufsize needs to be very high for smooth playback
+ bufsize=64000000,
)
self.loop = asyncio.get_running_loop()
self._cancelled = False
self._cancelled = True
if await self.loop.run_in_executor(None, self._proc.poll) is None:
# prevent subprocess deadlocking, send terminate and read remaining bytes
- await self.loop.run_in_executor(None, self._proc.kill)
- self.loop.run_in_executor(None, self.__read)
+ await self.loop.run_in_executor(None, self._proc.terminate)
+ await self.loop.run_in_executor(None, self.__read)
+ LOGGER.debug("process finished")
del self._proc
async def iterate_chunks(
- self, chunksize: int = 512000
+ self, chunksize: int = DEFAULT_CHUNKSIZE
) -> AsyncGenerator[bytes, None]:
"""Yield chunks from the process stdout. Generator."""
while True:
break
yield chunk
- async def read(self, chunksize: int = -1) -> bytes:
+ async def read(self, chunksize: int = DEFAULT_CHUNKSIZE) -> bytes:
"""Read x bytes from the process stdout."""
if self._cancelled:
raise asyncio.CancelledError()
return await self.loop.run_in_executor(None, self.__read, chunksize)
- def __read(self, chunksize: int = -1):
+ def __read(self, chunksize: int = DEFAULT_CHUNKSIZE):
"""Try read chunk from process."""
try:
return self._proc.stdout.read(chunksize)
None, self._proc.communicate, input_data
)
return stdout
+
+
+class AsyncProcessBroken:
+ """Implementation of a (truly) non blocking subprocess."""
+
+ # this version is not compatible with uvloop
+
+ def __init__(self, process_args: List, enable_write: bool = False):
+ """Initialize."""
+ self._proc = None
+ self._process_args = process_args
+ self._enable_write = enable_write
+ self._cancelled = False
+
+ async def __aenter__(self) -> "AsyncProcess":
+ """Enter context manager."""
+ self._proc = await asyncio.create_subprocess_exec(
+ *self._process_args,
+ stdin=asyncio.subprocess.PIPE if self._enable_write else None,
+ stdout=asyncio.subprocess.PIPE,
+ limit=64000000
+ )
+ return self
+
+ async def __aexit__(self, exc_type, exc_value, traceback) -> bool:
+ """Exit context manager."""
+ self._cancelled = True
+ LOGGER.debug("subprocess exit requested")
+ if self._proc.returncode is None:
+ # prevent subprocess deadlocking, send terminate and read remaining bytes
+ if self._enable_write and self._proc.stdin.can_write_eof():
+ self._proc.stdin.write_eof()
+ self._proc.terminate()
+ await self._proc.stdout.read()
+ del self._proc
+ LOGGER.debug("subprocess exited")
+
+ async def iterate_chunks(
+ self, chunk_size: int = DEFAULT_CHUNKSIZE
+ ) -> AsyncGenerator[bytes, None]:
+ """Yield chunks from the process stdout. Generator."""
+ while True:
+ chunk = await self.read(chunk_size)
+ yield chunk
+ if len(chunk) < chunk_size:
+ break
+
+ async def read(self, chunk_size: int = DEFAULT_CHUNKSIZE) -> bytes:
+ """Read x bytes from the process stdout."""
+ if self._cancelled:
+ raise asyncio.CancelledError()
+ try:
+ return await self._proc.stdout.readexactly(chunk_size)
+ except asyncio.IncompleteReadError as err:
+ return err.partial
+
+ async def write(self, data: bytes) -> None:
+ """Write data to process stdin."""
+ if self._cancelled:
+ raise asyncio.CancelledError()
+ self._proc.stdin.write(data)
+ await self._proc.stdin.drain()
+
+ async def write_eof(self) -> None:
+ """Write eof to process."""
+ if self._cancelled:
+ raise asyncio.CancelledError()
+ if self._proc.stdin.can_write_eof():
+ self._proc.stdin.write_eof()
+
+ async def communicate(self, input_data: Optional[bytes] = None) -> bytes:
+ """Write bytes to process and read back results."""
+ if self._cancelled:
+ raise asyncio.CancelledError()
+ return await self._proc.communicate(input_data)
output_format: SoxOutputFormat = SoxOutputFormat.FLAC,
resample: Optional[int] = None,
gain_db_adjust: Optional[float] = None,
- chunk_size: int = 512000,
+ chunk_size: int = 1000000,
) -> AsyncGenerator[Tuple[bool, bytes], None]:
"""Get the sox manipulated audio data for the given streamdetails."""
# collect all args for sox
await sox_proc.write_eof()
fill_buffer_task = self.mass.loop.create_task(fill_buffer())
+ await asyncio.sleep(1)
# yield chunks from stdout
# we keep 1 chunk behind to detect end of stream properly
prev_chunk = b""
"""Stream the PlayerQueue's tracks as constant feed in flac format."""
player_conf = self.mass.config.get_player_config(player_id)
sample_rate = player_conf.get(CONF_MAX_SAMPLE_RATE, 96000)
- chunk_size = sample_rate * 2 * 10
args = [
"sox",
fill_buffer_task = self.mass.loop.create_task(fill_buffer())
# start yielding audio chunks
- async for chunk in sox_proc.iterate_chunks(chunk_size):
+ async for chunk in sox_proc.iterate_chunks(8000000):
yield chunk
await asyncio.wait([fill_buffer_task])
# start streaming
LOGGER.debug("Start streaming %s (%s)", queue_item_id, queue_item.name)
async for _, audio_chunk in self.async_get_sox_stream(
- streamdetails, gain_db_adjust=gain_correct
+ streamdetails, gain_db_adjust=gain_correct, chunk_size=8000000
):
yield audio_chunk
LOGGER.debug("Finished streaming %s (%s)", queue_item_id, queue_item.name)
stream_path = streamdetails.path
stream_type = StreamType(streamdetails.type)
audio_data = b""
- chunk_size = 512000
track_loudness = await self.mass.database.async_get_track_loudness(
streamdetails.item_id, streamdetails.provider
)
streamdetails.item_id,
streamdetails.type,
)
-
+ # stream from URL
if stream_type == StreamType.URL:
async with self.mass.http_session.get(stream_path) as response:
- async for chunk, _ in response.content.iter_chunks():
+ async for chunk in response.content.iter_chunks():
yield chunk
if needs_analyze and len(audio_data) < 100000000:
audio_data += chunk
+ # stream from file
elif stream_type == StreamType.FILE:
async with AIOFile(stream_path) as afp:
- async for chunk in Reader(afp, chunk_size=chunk_size):
- if not chunk:
- break
+ async for chunk in Reader(afp):
yield chunk
if needs_analyze and len(audio_data) < 100000000:
audio_data += chunk
+ # stream from executable's stdout
elif stream_type == StreamType.EXECUTABLE:
args = shlex.split(stream_path)
async with AsyncProcess(args) as process:
- async for chunk in process.iterate_chunks(chunk_size):
- if not chunk:
- break
+ async for chunk in process.iterate_chunks():
yield chunk
if needs_analyze and len(audio_data) < 100000000:
audio_data += chunk
)
# send analyze job to background worker
+ # TODO: feed audio chunks to analyzer while streaming
+ # so we don't have to load this large chunk in memory
if needs_analyze and audio_data:
self.mass.add_job(self.__analyze_audio, streamdetails, audio_data)
args = ["sox", "-m", "-v", "1.0", "-t"] + pcm_args + [fadeoutfile.name, "-v", "1.0"]
args += ["-t"] + pcm_args + [fadeinfile.name, "-t"] + pcm_args + ["-"]
async with AsyncProcess(args, enable_write=False) as sox_proc:
- crossfade_part = await sox_proc.communicate()
+ crossfade_part, _ = await sox_proc.communicate()
fadeinfile.close()
fadeoutfile.close()
del fadeinfile
if reverse:
args.append("reverse")
async with AsyncProcess(args, enable_write=True) as sox_proc:
- stripped_data = await sox_proc.communicate(audio_data)
+ stripped_data, _ = await sox_proc.communicate(audio_data)
return stripped_data