# write eof when last packet is received
sox_proc.write_eof()
- self.mass.create_task(writer)
+ sox_proc.attach_task(writer())
# read bytes from final output
async for audio_chunk in sox_proc.iterate_chunks():
self.logger.info(
"Starting Queue audio stream for Queue %s (PCM format: %s - sample rate: %s)",
queue.player.name,
- pcm_fmt,
+ pcm_fmt.value,
sample_rate,
)
queue_index = await queue.queue_stream_next(queue_index)
queue_track = queue.get_item(queue_index)
if not queue_track:
- self.logger.debug("no (more) tracks in queue %s", queue.queue_id)
+ self.logger.debug(
+ "Abort Queue stream %s: no (more) tracks in queue", queue.queue_id
+ )
break
# get streamdetails
try:
)
except MediaNotFoundError as err:
self.logger.warning(
- "Skip track due to missing streamdetails", exc_info=err
+ "Skip track %s due to missing streamdetails",
+ queue_track.name,
+ exc_info=err,
)
continue
# check the PCM samplerate/bitrate
if not resample and streamdetails.bit_depth > bit_depth:
await queue.queue_stream_signal_next()
- self.logger.info("Abort queue stream due to bit depth mismatch")
+ self.logger.debug(
+ "Abort queue stream %s due to bit depth mismatch", queue.player.name
+ )
break
if (
not resample
and streamdetails.sample_rate > sample_rate
and streamdetails.sample_rate <= queue.max_sample_rate
):
- self.logger.info("Abort queue stream due to sample rate mismatch")
+ self.logger.debug(
+ "Abort queue stream %s due to sample rate mismatch",
+ queue.player.name,
+ )
await queue.queue_stream_signal_next()
break
use_crossfade = False
buffer_size = sample_size
- self.logger.debug(
+ self.logger.info(
"Start Streaming queue track: %s (%s) for queue %s",
- queue_track.item_id,
+ queue_track.uri,
queue_track.name,
queue.player.name,
)
# HANDLE FIRST PART OF TRACK
if not chunk and bytes_written == 0 and is_last_chunk:
# stream error: got empy first chunk
- self.logger.warning("Stream error on track %s", queue_track.item_id)
+ self.logger.warning("Stream error on %s", queue_track.uri)
# prevent player queue get stuck by just skipping to the next track
queue_track.duration = 0
continue
queue_track.duration = accurate_duration
self.logger.debug(
"Finished Streaming queue track: %s (%s) on queue %s",
- queue_track.item_id,
+ queue_track.uri,
queue_track.name,
queue.player.name,
)
# end of queue reached, pass last fadeout bits to final output
yield last_fadeout_data
# END OF QUEUE STREAM
- self.logger.info("Queue stream for Queue %s finished.", queue.queue_id)
+ self.logger.info("Queue stream for Queue %s finished.", queue.player.name)
import asyncio
import logging
-from typing import AsyncGenerator, List, Optional, Tuple, Union
+from typing import AsyncGenerator, Coroutine, List, Optional, Tuple, Union
from async_timeout import timeout as _timeout
self._proc = None
self._args = args
self._enable_write = enable_write
+ self._attached_task: asyncio.Task = None
self.closed = False
async def __aenter__(self) -> "AsyncProcess":
async def __aexit__(self, exc_type, exc_value, traceback) -> bool:
"""Exit context manager."""
self.closed = True
+ if self._attached_task:
+ # cancel the attached reader/writer task
+ self._attached_task.cancel()
if self._proc.returncode is None:
# prevent subprocess deadlocking, send terminate and read remaining bytes
- if self._enable_write:
- self._proc.stdin.close()
try:
self._proc.terminate()
+ # close stdin and let it drain
+ if self._enable_write:
+ await self._proc.stdin.drain()
+ self._proc.stdin.close()
+ # read remaining bytes
await self._proc.stdout.read()
+ # we really want to make this thing die ;-)
self._proc.kill()
- except (ProcessLookupError, BrokenPipeError, RuntimeError):
+ except (
+ ProcessLookupError,
+ BrokenPipeError,
+ RuntimeError,
+ ConnectionResetError,
+ ):
pass
del self._proc
async def write(self, data: bytes) -> None:
"""Write data to process stdin."""
+ if self.closed:
+ return
try:
self._proc.stdin.write(data)
await self._proc.stdin.drain()
"""Write bytes to process and read back results."""
return await self._proc.communicate(input_data)
+ def attach_task(self, coro: Coroutine) -> None:
+ """Attach given coro func as reader/writer task to properly cancel it when needed."""
+ self._attached_task = asyncio.create_task(coro)
+
async def check_output(shell_cmd: str) -> Tuple[int, bytes]:
"""Run shell subprocess and return output."""