from aiofiles.os import wrap
from aiohttp import web
-from music_assistant.common.helpers.util import (
- get_ip,
- select_free_port,
- try_parse_bool,
- try_parse_duration,
-)
+from music_assistant.common.helpers.util import get_ip, select_free_port, try_parse_bool
from music_assistant.common.models.config_entries import (
ConfigEntry,
ConfigValueOption,
FFMpeg,
check_audio_support,
crossfade_pcm_parts,
- get_chunksize,
get_ffmpeg_stream,
get_hls_stream,
get_icy_stream,
self.manifest.name = "Streamserver"
self.manifest.description = (
"Music Assistant's core controller that is responsible for "
- "streaming audio to players on the local network as well as "
- "some player specific local control callbacks."
+ "streaming audio to players on the local network."
)
self.manifest.icon = "cast-audio"
self.announcements: dict[str, str] = {}
queue.display_name,
)
queue.index_in_buffer = self.mass.player_queues.index_by_id(queue_id, queue_item_id)
- async for chunk in self.get_media_stream(
- streamdetails=queue_item.streamdetails,
+ pcm_format = AudioFormat(
+ content_type=ContentType.from_bit_depth(output_format.bit_depth),
+ sample_rate=queue_item.streamdetails.audio_format.sample_rate,
+ bit_depth=queue_item.streamdetails.audio_format.bit_depth,
+ channels=2,
+ )
+ async for chunk in get_ffmpeg_stream(
+ audio_input=self.get_media_stream(
+ streamdetails=queue_item.streamdetails,
+ pcm_format=pcm_format,
+ ),
+ input_format=pcm_format,
output_format=output_format,
- extra_filter_params=get_player_filter_params(self.mass, queue_player.player_id),
+ filter_params=get_player_filter_params(self.mass, queue_player.player_id),
+ extra_input_args=[
+ # use readrate to limit buffering ahead too much
+ "-readrate",
+ "1.2",
+ ],
):
try:
await resp.write(chunk)
output_format=output_format,
filter_params=get_player_filter_params(self.mass, queue_player.player_id),
chunk_size=icy_meta_interval if enable_icy else None,
+ extra_input_args=[
+ # use readrate to limit buffering ahead too much
+ "-readrate",
+ "1.2",
+ ],
):
try:
await resp.write(chunk)
# handle incoming audio chunks
async for chunk in self.get_media_stream(
queue_track.streamdetails,
- output_format=pcm_format,
+ pcm_format=pcm_format,
):
# buffer size needs to be big enough to include the crossfade part
# allow it to be a bit smaller when playback just starts
async def get_media_stream(
self,
streamdetails: StreamDetails,
- output_format: AudioFormat,
- extra_filter_params: list[str] | None = None,
+ pcm_format: AudioFormat,
) -> AsyncGenerator[tuple[bool, bytes], None]:
- """Get the audio stream for the given streamdetails."""
+ """Get the audio stream for the given streamdetails as raw pcm chunks."""
logger = self.logger.getChild("media_stream")
is_radio = streamdetails.media_type == MediaType.RADIO or not streamdetails.duration
if is_radio:
streamdetails.seek_position = 0
# collect all arguments for ffmpeg
- filter_params = extra_filter_params or []
+ filter_params = []
if streamdetails.target_loudness is not None:
# add loudnorm filters
filter_rule = f"loudnorm=I={streamdetails.target_loudness}:TP=-1.5:LRA=11"
async with FFMpeg(
audio_input=audio_source,
input_format=streamdetails.audio_format,
- output_format=output_format,
+ output_format=pcm_format,
filter_params=filter_params,
extra_input_args=[
*extra_input_args,
# we criple ffmpeg a bit on purpose with the filter_threads
# option so it doesn't consume all cpu when calculating loudnorm
"-filter_threads",
- "1",
+ "2",
],
collect_log_history=True,
logger=logger,
) as ffmpeg_proc:
try:
- async for chunk in ffmpeg_proc.iter_any(get_chunksize(output_format)):
+ async for chunk in ffmpeg_proc.iter_any(pcm_format.pcm_sample_size):
bytes_sent += len(chunk)
yield chunk
del chunk
await ffmpeg_proc.close()
# try to determine how many seconds we've streamed
- seconds_streamed = 0
- if output_format.content_type.is_pcm():
- seconds_streamed = (
- bytes_sent / output_format.pcm_sample_size if bytes_sent else 0
- )
- elif line := next((x for x in ffmpeg_proc.log_history if "time=" in x), None):
- duration_str = line.split("time=")[1].split(" ")[0]
- seconds_streamed = try_parse_duration(duration_str)
-
+ seconds_streamed = bytes_sent / pcm_format.pcm_sample_size if bytes_sent else 0
logger.debug(
"stream %s (with code %s) for %s - seconds streamed: %s",
"finished" if finished else "aborted",
streamdetails.uri,
seconds_streamed,
)
-
if seconds_streamed:
streamdetails.seconds_streamed = seconds_streamed
# store accurate duration
streamdetails.item_id, streamdetails.provider, loudness_details
)
)
-
# report playback
- # TODO: Move this to the queue controller ?
if finished or seconds_streamed > 30:
self.mass.create_task(
self.mass.music.mark_item_played(
"""Yield chunks of n size from the process stdout."""
while True:
chunk = await self.readexactly(n)
- yield chunk
if len(chunk) == 0:
break
+ yield chunk
async def iter_any(self, n: int = DEFAULT_CHUNKSIZE) -> AsyncGenerator[bytes, None]:
"""Yield chunks as they come in from process stdout."""
while True:
chunk = await self.read(n)
- yield chunk
if len(chunk) == 0:
break
+ yield chunk
async def readexactly(self, n: int) -> bytes:
"""Read exactly n bytes from the process stdout (or less if eof)."""
+ if self._close_called:
+ return b""
try:
return await self.proc.stdout.readexactly(n)
except asyncio.IncompleteReadError as err:
and may return less or equal bytes than requested, but at least one byte.
If EOF was received before any byte is read, this function returns empty byte object.
"""
+ if self._close_called:
+ return b""
return await self.proc.stdout.read(n)
async def write(self, data: bytes) -> None:
async def read_stderr(self) -> bytes:
"""Read line from stderr."""
- if self.closed:
+ if self._close_called:
return b""
try:
return await self.proc.stderr.readline()
if self.proc.stdin and not self.proc.stdin.is_closing():
self.proc.stdin.close()
# abort existing readers on stderr/stdout first before we send communicate
- if self.proc.stdout and self.proc.stdout._waiter is not None:
- with suppress(asyncio.exceptions.InvalidStateError):
- self.proc.stdout._waiter.set_exception(asyncio.CancelledError())
- if self.proc.stderr and self.proc.stderr._waiter is not None:
- with suppress(asyncio.exceptions.InvalidStateError):
- self.proc.stderr._waiter.set_exception(asyncio.CancelledError())
+ waiter: asyncio.Future
+ if self.proc.stdout and (waiter := self.proc.stdout._waiter):
+ self.proc.stdout._waiter = None
+ if waiter and not waiter.done():
+ waiter.set_exception(asyncio.CancelledError())
+ if self.proc.stderr and (waiter := self.proc.stderr._waiter):
+ self.proc.stderr._waiter = None
+ if waiter and not waiter.done():
+ waiter.set_exception(asyncio.CancelledError())
await asyncio.sleep(0) # yield to loop
# make sure the process is really cleaned up.