FFMpeg,
check_audio_support,
crossfade_pcm_parts,
+ get_chunksize,
get_ffmpeg_stream,
get_hls_stream,
get_icy_stream,
logger=logger,
) as ffmpeg_proc:
try:
- async for chunk in ffmpeg_proc.iter_any():
+ async for chunk in ffmpeg_proc.iter_any(get_chunksize(output_format)):
bytes_sent += len(chunk)
yield chunk
del chunk
finished = True
finally:
- await ffmpeg_proc.close()
- logger.debug(
- "stream %s (with code %s) for %s",
- "finished" if finished else "aborted",
- ffmpeg_proc.returncode,
- streamdetails.uri,
- )
+ if finished:
+ await ffmpeg_proc.wait()
+ else:
+ await ffmpeg_proc.close()
+
# try to determine how many seconds we've streamed
seconds_streamed = 0
if output_format.content_type.is_pcm():
duration_str = line.split("time=")[1].split(" ")[0]
seconds_streamed = try_parse_duration(duration_str)
+ logger.debug(
+ "stream %s (with code %s) for %s - seconds streamed: %s",
+ "finished" if finished else "aborted",
+ ffmpeg_proc.returncode,
+ streamdetails.uri,
+ seconds_streamed,
+ )
+
if seconds_streamed:
streamdetails.seconds_streamed = seconds_streamed
# store accurate duration
"""Yield chunks of n size from the process stdout."""
while True:
chunk = await self.readexactly(n)
- if chunk == b"":
- break
yield chunk
+ if len(chunk) == 0:
+ break
async def iter_any(self, n: int = DEFAULT_CHUNKSIZE) -> AsyncGenerator[bytes, None]:
"""Yield chunks as they come in from process stdout."""
while True:
chunk = await self.read(n)
- if chunk == b"":
- break
yield chunk
+ if len(chunk) == 0:
+ break
async def readexactly(self, n: int) -> bytes:
"""Read exactly n bytes from the process stdout (or less if eof)."""
async def write(self, data: bytes) -> None:
"""Write data to process stdin."""
- if self._close_called:
+ if self.closed:
self.logger.warning("write called while process already done")
return
self.proc.stdin.write(data)
async def write_eof(self) -> None:
"""Write end of file to to process stdin."""
- if self._close_called:
+ if self.closed:
return
try:
if self.proc.stdin.can_write_eof():
async def read_stderr(self) -> bytes:
"""Read line from stderr."""
+ if self.closed:
+ return b""
try:
return await self.proc.stderr.readline()
except ValueError as err:
self.proc.send_signal(SIGINT)
if self.proc.stdin and not self.proc.stdin.is_closing():
self.proc.stdin.close()
- await asyncio.sleep(0) # yield to loop
# abort existing readers on stderr/stdout first before we send communicate
if self.proc.stdout and self.proc.stdout._waiter is not None:
with suppress(asyncio.exceptions.InvalidStateError):
if self.proc.stderr and self.proc.stderr._waiter is not None:
with suppress(asyncio.exceptions.InvalidStateError):
self.proc.stderr._waiter.set_exception(asyncio.CancelledError())
+ await asyncio.sleep(0) # yield to loop
# make sure the process is really cleaned up.
# especially with pipes this can cause deadlocks if not properly guarded
try:
# use communicate to flush all pipe buffers
await asyncio.wait_for(self.proc.communicate(), 5)
+ except RuntimeError as err:
+ if "read() called while another coroutine" in str(err):
+ # race condition
+ continue
+ raise
except TimeoutError:
self.logger.debug(
"Process %s with PID %s did not stop in time. Sending terminate...",
args += ["--start-position", str(int(seek_position))]
if self._ap_workaround:
args += ["--ap-port", "12345"]
- bytes_sent = 0
async with AsyncProcess(args, stdout=True, name="librespot") as librespot_proc:
async for chunk in librespot_proc.iter_any():
yield chunk
- bytes_sent += len(chunk)
async def _parse_artist(self, artist_obj):
"""Parse spotify artist object to generic layout."""