CONF_CROSSFADE_DURATION,
CONF_OUTPUT_CHANNELS,
CONF_PUBLISH_IP,
- ROOT_LOGGER_NAME,
SILENCE_FILE,
UGP_PREFIX,
VERBOSE_LOG_LEVEL,
)
+from music_assistant.server.helpers.audio import LOGGER as AUDIO_LOGGER
from music_assistant.server.helpers.audio import (
check_audio_support,
crossfade_pcm_parts,
"with libsoxr support" if libsoxr_support else "",
)
# copy log level to audio module
- logging.getLogger(f"{ROOT_LOGGER_NAME}.audio").setLevel(self.logger.level)
+ AUDIO_LOGGER.setLevel(self.logger.level)
# start the webserver
self.publish_port = config.get_value(CONF_BIND_PORT)
self.publish_ip = config.get_value(CONF_PUBLISH_IP)
is_radio = streamdetails.media_type == MediaType.RADIO or not streamdetails.duration
if is_radio or streamdetails.seek_position:
strip_silence_begin = False
- # chunk size = 1 second of pcm audio
- pcm_sample_size = pcm_format.pcm_sample_size
- chunk_size = pcm_sample_size # chunk size = sample size (= 1 second)
- expected_chunks = int(((streamdetails.duration or 0) * pcm_sample_size) / chunk_size)
+ # pcm_sample_size = chunk size = 1 second of pcm audio
+ chunk_size = pcm_format.pcm_sample_size
+ expected_chunks = int(
+ ((streamdetails.duration or 0) * pcm_format.pcm_sample_size) / chunk_size
+ )
if expected_chunks < 10:
strip_silence_end = False
await state_data["finished"].wait()
finished = ffmpeg_proc.returncode == 0 and state_data["finished"].is_set()
bytes_sent = state_data["bytes_sent"]
- seconds_streamed = bytes_sent / pcm_sample_size if bytes_sent else 0
+ seconds_streamed = bytes_sent / pcm_format.pcm_sample_size if bytes_sent else 0
streamdetails.seconds_streamed = seconds_streamed
state_str = "finished" if finished else "aborted"
logger.debug(
# collect this chunk for next round
prev_chunk = chunk
+
# if we did not receive any data, something went (terribly) wrong
# raise here to prevent an (endless) loop elsewhere
if state_data["bytes_sent"] == 0:
- del prev_chunk
raise AudioError(f"stream error on {streamdetails.uri}")
# all chunks received, strip silence of last part if needed and yield remaining bytes
async def iter_chunked(self, n: int = DEFAULT_CHUNKSIZE) -> AsyncGenerator[bytes, None]:
"""Yield chunks of n size from the process stdout."""
- while self.returncode is None:
+ while not self._close_called:
chunk = await self.readexactly(n)
if chunk == b"":
break
async def iter_any(self, n: int = DEFAULT_CHUNKSIZE) -> AsyncGenerator[bytes, None]:
"""Yield chunks as they come in from process stdout."""
- while self.returncode is None:
+ while not self._close_called:
chunk = await self.read(n)
if chunk == b"":
break
async def readexactly(self, n: int) -> bytes:
"""Read exactly n bytes from the process stdout (or less if eof)."""
- if not self.proc.stdout or self.proc.stdout.at_eof():
- return b""
try:
return await self.proc.stdout.readexactly(n)
except asyncio.IncompleteReadError as err:
and may return less or equal bytes than requested, but at least one byte.
If EOF was received before any byte is read, this function returns empty byte object.
"""
- if not self.proc.stdout or self.proc.stdout.at_eof():
- return b""
return await self.proc.stdout.read(n)
async def write(self, data: bytes) -> None:
"""Write data to process stdin."""
- if self.returncode is not None or self.proc.stdin.is_closing():
- raise asyncio.CancelledError("write called while process already done")
+ if self._close_called:
+ raise RuntimeError("write called while process already done")
self.proc.stdin.write(data)
with suppress(BrokenPipeError, ConnectionResetError):
await self.proc.stdin.drain()
async def write_eof(self) -> None:
"""Write end of file to to process stdin."""
- if self.returncode is not None or self.proc.stdin.is_closing():
+ if self._close_called:
return
try:
if self.proc.stdin.can_write_eof():
# make sure the process is really cleaned up.
# especially with pipes this can cause deadlocks if not properly guarded
# we need to ensure stdout and stderr are flushed and stdin closed
- while self.returncode is None:
+ while True:
try:
- async with asyncio.timeout(30):
+ async with asyncio.timeout(5):
# abort existing readers on stderr/stdout first before we send communicate
if self.proc.stdout and self.proc.stdout._waiter is not None:
self.proc.stdout._waiter.set_exception(asyncio.CancelledError())
self.proc.stderr._waiter = None
# use communicate to flush all pipe buffers
await self.proc.communicate()
+ if self.returncode is not None:
+ break
except TimeoutError:
LOGGER.debug(
"Process %s with PID %s did not stop in time. Sending terminate...",
async def write_eof(self) -> None:
"""Write EOF to the ffmpeg stdin."""
- if not self.running:
- return
await self._buffer.put(b"")
async def send_cli_command(self, command: str) -> None:
input_format = AIRPLAY_PCM_FORMAT
audio_source = self.mass.streams.get_announcement_stream(
queue_item.streamdetails.data["url"],
- pcm_format=AIRPLAY_PCM_FORMAT,
+ output_format=AIRPLAY_PCM_FORMAT,
use_pre_announce=queue_item.streamdetails.data["use_pre_announce"],
)
else: