import logging
import struct
from io import BytesIO
-from typing import List, Tuple
+from typing import List, Optional, Tuple
from music_assistant.helpers.process import AsyncProcess
from music_assistant.helpers.typing import MusicAssistant, QueueItem
# return file.getvalue(), all_chunks_size + 8
return file.getvalue()
+
+
+def get_sox_args(
+ streamdetails: StreamDetails,
+ output_format: Optional[ContentType] = None,
+ resample: Optional[int] = None,
+):
+ """Collect all args to send to the sox (or ffmpeg) process."""
+ stream_path = streamdetails.path
+ stream_type = StreamType(streamdetails.type)
+ content_type = streamdetails.content_type
+ if output_format is None:
+ output_format = streamdetails.content_type
+
+ # use ffmpeg if content not supported by SoX (e.g. AAC radio streams)
+ if not streamdetails.content_type.sox_supported():
+ # collect input args
+ input_args = ["ffmpeg", "-hide_banner", "-loglevel", "error", "-i", stream_path]
+ # collect output args
+ if output_format.is_pcm():
+ output_args = [
+ "-f",
+ output_format.value,
+ "-c:a",
+ output_format.name.lower(),
+ "-",
+ ]
+ else:
+ output_args = ["-f", output_format.value, "-"]
+ # collect filter args
+ filter_args = []
+ if streamdetails.gain_correct:
+ filter_args += ["-filter:a", "volume=%sdB" % streamdetails.gain_correct]
+ if resample:
+ filter_args += ["-ar", str(resample)]
+ return input_args + filter_args + output_args
+
+ # Prefer SoX for all other (=highest quality)
+ if stream_type == StreamType.EXECUTABLE:
+ # stream from executable
+ input_args = [
+ stream_path,
+ "|",
+ "sox",
+ "-t",
+ content_type.sox_format(),
+ "-",
+ ]
+ else:
+ input_args = ["sox", "-t", content_type.sox_format(), stream_path]
+ # collect output args
+ if output_format.is_pcm():
+ output_args = ["-t", output_format.sox_format(), "-c", "2", "-"]
+ elif output_format == ContentType.FLAC:
+ output_args = ["-t", "flac", "-C", "0", "-"]
+ else:
+ output_args = ["-t", output_format.sox_format(), "-"]
+ # collect filter args
+ filter_args = []
+ if streamdetails.gain_correct:
+ filter_args += ["vol", str(streamdetails.gain_correct), "dB"]
+ if resample:
+ filter_args += ["rate", "-v", str(resample)]
+ # TODO: still not sure about the order of the filter arguments in the chain
+ # assumption is they need to be at the end of the chain
+ return input_args + output_args + filter_args
async def __aenter__(self) -> "AsyncProcess":
"""Enter context manager."""
if "|" in self._args:
- self._args = " ".join(self._args)
+ args = " ".join(self._args)
+ else:
+ args = self._args
- if isinstance(self._args, str):
+ if isinstance(args, str):
self._proc = await asyncio.create_subprocess_shell(
- self._args,
+ args,
stdin=asyncio.subprocess.PIPE if self._enable_write else None,
stdout=asyncio.subprocess.PIPE,
- limit=4000000,
+ limit=DEFAULT_CHUNKSIZE,
close_fds=True,
)
else:
self._proc = await asyncio.create_subprocess_exec(
- *self._args,
+ *args,
stdin=asyncio.subprocess.PIPE if self._enable_write else None,
stdout=asyncio.subprocess.PIPE,
- limit=4000000,
+ limit=DEFAULT_CHUNKSIZE,
close_fds=True,
)
return self
"""Exit context manager."""
if self._proc.returncode is None:
# prevent subprocess deadlocking, send terminate and read remaining bytes
- await self.write_eof()
+ if self._enable_write:
+ self._proc.stdin.close()
try:
self._proc.terminate()
await self._proc.stdout.read()
except AttributeError:
raise asyncio.CancelledError()
- async def write_eof(self) -> None:
- """Write eof to process."""
- if not (self._enable_write and self._proc.stdin.can_write_eof()):
- return
- try:
- self._proc.stdin.write_eof()
- except BrokenPipeError:
- pass
-
async def communicate(self, input_data: Optional[bytes] = None) -> bytes:
"""Write bytes to process and read back results."""
return await self._proc.communicate(input_data)
from music_assistant.helpers.audio import (
analyze_audio,
crossfade_pcm_parts,
+ get_sox_args,
get_stream_details,
strip_silence,
)
from music_assistant.helpers.util import create_task
from music_assistant.helpers.web import require_local_subnet
from music_assistant.models.player_queue import PlayerQueue
-from music_assistant.models.streamdetails import ContentType, StreamDetails, StreamType
+from music_assistant.models.streamdetails import ContentType, StreamDetails
routes = RouteTableDef()
player_queue = mass.players.get_player_queue(player_id)
if not player_queue:
raise HTTPNotFound(reason="invalid player_id")
- LOGGER.info("Start Queue Stream for player %s ", player_queue.player.name)
# prepare request
resp = StreamResponse(
await resp.prepare(request)
player_conf = player_queue.player.config
+ pcm_format = "f64"
sample_rate = min(player_conf.get(CONF_MAX_SAMPLE_RATE, 96000), 96000)
args = [
"sox",
"-t",
- "s32",
+ pcm_format,
"-c",
"2",
"-r",
]
async with AsyncProcess(args, enable_write=True) as sox_proc:
+ LOGGER.info(
+ "Start Queue Stream for player %s",
+ player_queue.player.name,
+ )
+
# feed stdin with pcm samples
async def fill_buffer():
"""Feed audio data into sox stdin for processing."""
async for audio_chunk in get_queue_stream(
- mass, player_queue, sample_rate, 32
+ mass, player_queue, sample_rate, pcm_format
):
await sox_proc.write(audio_chunk)
del audio_chunk
player_queue.player.name,
)
- async for _, audio_chunk in get_media_stream(mass, streamdetails):
+ async for _, audio_chunk in get_media_stream(mass, streamdetails, ContentType.FLAC):
await resp.write(audio_chunk)
del audio_chunk
LOGGER.debug(
chunk_size: Optional[int] = None,
) -> AsyncGenerator[Tuple[bool, bytes], None]:
"""Get the audio stream for the given streamdetails."""
- input_format = streamdetails.content_type.value
- stream_path = streamdetails.path
- stream_type = StreamType(streamdetails.type)
- if output_format is None:
- output_format = ContentType.FLAC
-
- # collect all args for sox/ffmpeg
- if output_format in [
- ContentType.S24,
- ContentType.S32,
- ContentType.S64,
- ]:
- output_args = ["-t", output_format.value, "-c", "2", "-"]
- elif output_format == ContentType.FLAC:
- output_args = ["-t", output_format.value] + ["-C", "0", "-"]
- else:
- output_args = ["-t", output_format.value, "-"]
- # stream from URL or file
- if stream_type in [StreamType.URL, StreamType.FILE]:
- # input_args = ["sox", "-t", input_format, stream_path]
- input_args = ["sox", stream_path]
- # stream from executable
- else:
- input_args = [stream_path, "|", "sox", "-t", input_format, "-"]
-
- filter_args = []
- if streamdetails.gain_correct:
- filter_args += ["vol", str(streamdetails.gain_correct), "dB"]
- if resample:
- filter_args += ["rate", "-v", str(resample)]
-
- if streamdetails.content_type in [ContentType.AAC, ContentType.MPEG]:
- # use ffmpeg for processing radio streams
- args = [
- "ffmpeg",
- "-hide_banner",
- "-loglevel",
- "error",
- "-i",
- stream_path,
- "-filter:a",
- "volume=%sdB" % streamdetails.gain_correct,
- "-f",
- "flac",
- "-",
- ]
- else:
- # regular sox processing
- args = input_args + output_args + filter_args
-
- # signal start of stream event
mass.eventbus.signal(EVENT_STREAM_STARTED, streamdetails)
- LOGGER.debug(
- "start media stream for: %s/%s (%s)",
- streamdetails.provider,
- streamdetails.item_id,
- streamdetails.type,
- )
-
+ args = get_sox_args(streamdetails, output_format, resample)
async with AsyncProcess(args) as sox_proc:
+ LOGGER.debug(
+ "start media stream for: %s/%s (%s)",
+ streamdetails.provider,
+ streamdetails.item_id,
+ streamdetails.type,
+ )
+
# yield chunks from stdout
# we keep 1 chunk behind to detect end of stream properly
try:
async def get_queue_stream(
- mass: MusicAssistant, player_queue: PlayerQueue, sample_rate=96000, bit_depth=32
+ mass: MusicAssistant,
+ player_queue: PlayerQueue,
+ sample_rate=96000,
+ pcm_format: str = "f64",
+ channels: int = 2,
) -> AsyncGenerator[bytes, None]:
"""Stream the PlayerQueue's tracks as constant feed in PCM raw audio."""
last_fadeout_data = b""
queue_index = None
# get crossfade details
fade_length = player_queue.crossfade_duration
- pcm_args = ["s32", "-c", "2", "-r", str(sample_rate)]
- sample_size = int(sample_rate * (bit_depth / 8) * 2) # 1 second
+ if pcm_format in ["s64", "f64"]:
+ bit_depth = 64
+ elif pcm_format in ["s32", "f32"]:
+ bit_depth = 32
+ elif pcm_format == "s16":
+ bit_depth = 16
+ elif pcm_format == "s24":
+ bit_depth = 24
+ else:
+ raise NotImplementedError("Unsupported PCM format: %s" % pcm_format)
+ pcm_args = [pcm_format, "-c", "2", "-r", str(sample_rate)]
+ sample_size = int(sample_rate * (bit_depth / 8) * channels) # 1 second
buffer_size = sample_size * fade_length if fade_length else sample_size * 10
# stream queue tracks one by one
while True:
async for is_last_chunk, chunk in get_media_stream(
mass,
streamdetails,
- ContentType.S32,
+ ContentType.PCM_F64LE,
resample=sample_rate,
chunk_size=buffer_size,
):