From d304047c6b976600c17064b9d5533599bfa3fe49 Mon Sep 17 00:00:00 2001 From: Marcel van der Veldt Date: Mon, 2 Aug 2021 19:07:45 +0200 Subject: [PATCH] streamer fixes - use 64 bit float for internal PCM queue stream - fix radio playback through ffmpeg - fix SoX executable stuck after stream disconnect --- music_assistant/helpers/audio.py | 68 +++++++++++++++- music_assistant/helpers/process.py | 26 +++--- music_assistant/managers/tasks.py | 2 +- music_assistant/models/streamdetails.py | 24 +++++- music_assistant/web/stream.py | 104 +++++++++--------------- 5 files changed, 135 insertions(+), 89 deletions(-) diff --git a/music_assistant/helpers/audio.py b/music_assistant/helpers/audio.py index d54b6e3e..d2432009 100644 --- a/music_assistant/helpers/audio.py +++ b/music_assistant/helpers/audio.py @@ -4,7 +4,7 @@ import asyncio import logging import struct from io import BytesIO -from typing import List, Tuple +from typing import List, Optional, Tuple from music_assistant.helpers.process import AsyncProcess from music_assistant.helpers.typing import MusicAssistant, QueueItem @@ -255,3 +255,69 @@ def create_wave_header(samplerate=44100, channels=2, bitspersample=16, duration= # return file.getvalue(), all_chunks_size + 8 return file.getvalue() + + +def get_sox_args( + streamdetails: StreamDetails, + output_format: Optional[ContentType] = None, + resample: Optional[int] = None, +): + """Collect all args to send to the sox (or ffmpeg) process.""" + stream_path = streamdetails.path + stream_type = StreamType(streamdetails.type) + content_type = streamdetails.content_type + if output_format is None: + output_format = streamdetails.content_type + + # use ffmpeg if content not supported by SoX (e.g. AAC radio streams) + if not streamdetails.content_type.sox_supported(): + # collect input args + input_args = ["ffmpeg", "-hide_banner", "-loglevel", "error", "-i", stream_path] + # collect output args + if output_format.is_pcm(): + output_args = [ + "-f", + output_format.value, + "-c:a", + output_format.name.lower(), + "-", + ] + else: + output_args = ["-f", output_format.value, "-"] + # collect filter args + filter_args = [] + if streamdetails.gain_correct: + filter_args += ["-filter:a", "volume=%sdB" % streamdetails.gain_correct] + if resample: + filter_args += ["-ar", str(resample)] + return input_args + filter_args + output_args + + # Prefer SoX for all other (=highest quality) + if stream_type == StreamType.EXECUTABLE: + # stream from executable + input_args = [ + stream_path, + "|", + "sox", + "-t", + content_type.sox_format(), + "-", + ] + else: + input_args = ["sox", "-t", content_type.sox_format(), stream_path] + # collect output args + if output_format.is_pcm(): + output_args = ["-t", output_format.sox_format(), "-c", "2", "-"] + elif output_format == ContentType.FLAC: + output_args = ["-t", "flac", "-C", "0", "-"] + else: + output_args = ["-t", output_format.sox_format(), "-"] + # collect filter args + filter_args = [] + if streamdetails.gain_correct: + filter_args += ["vol", str(streamdetails.gain_correct), "dB"] + if resample: + filter_args += ["rate", "-v", str(resample)] + # TODO: still not sure about the order of the filter arguments in the chain + # assumption is they need to be at the end of the chain + return input_args + output_args + filter_args diff --git a/music_assistant/helpers/process.py b/music_assistant/helpers/process.py index 236499e3..13ce82b2 100644 --- a/music_assistant/helpers/process.py +++ b/music_assistant/helpers/process.py @@ -29,22 +29,24 @@ class AsyncProcess: async def __aenter__(self) -> "AsyncProcess": """Enter context manager.""" if "|" in self._args: - self._args = " ".join(self._args) + args = " ".join(self._args) + else: + args = self._args - if isinstance(self._args, str): + if isinstance(args, str): self._proc = await asyncio.create_subprocess_shell( - self._args, + args, stdin=asyncio.subprocess.PIPE if self._enable_write else None, stdout=asyncio.subprocess.PIPE, - limit=4000000, + limit=DEFAULT_CHUNKSIZE, close_fds=True, ) else: self._proc = await asyncio.create_subprocess_exec( - *self._args, + *args, stdin=asyncio.subprocess.PIPE if self._enable_write else None, stdout=asyncio.subprocess.PIPE, - limit=4000000, + limit=DEFAULT_CHUNKSIZE, close_fds=True, ) return self @@ -53,7 +55,8 @@ class AsyncProcess: """Exit context manager.""" if self._proc.returncode is None: # prevent subprocess deadlocking, send terminate and read remaining bytes - await self.write_eof() + if self._enable_write: + self._proc.stdin.close() try: self._proc.terminate() await self._proc.stdout.read() @@ -96,15 +99,6 @@ class AsyncProcess: except AttributeError: raise asyncio.CancelledError() - async def write_eof(self) -> None: - """Write eof to process.""" - if not (self._enable_write and self._proc.stdin.can_write_eof()): - return - try: - self._proc.stdin.write_eof() - except BrokenPipeError: - pass - async def communicate(self, input_data: Optional[bytes] = None) -> bytes: """Write bytes to process and read back results.""" return await self._proc.communicate(input_data) diff --git a/music_assistant/managers/tasks.py b/music_assistant/managers/tasks.py index acbefb33..583c1be0 100644 --- a/music_assistant/managers/tasks.py +++ b/music_assistant/managers/tasks.py @@ -54,7 +54,7 @@ class TaskInfo: def __str__(self): """Return string representation, used for logging.""" - return f"{self.name} ({id})" + return f"{self.name} ({self.id})" def to_dict(self) -> Dict[str, Any]: """Return serializable dict.""" diff --git a/music_assistant/models/streamdetails.py b/music_assistant/models/streamdetails.py index e0cf0405..59720ca6 100644 --- a/music_assistant/models/streamdetails.py +++ b/music_assistant/models/streamdetails.py @@ -18,16 +18,32 @@ class StreamType(Enum): class ContentType(Enum): - """Enum with stream content types.""" + """Enum with audio content types supported by ffmpeg.""" OGG = "ogg" FLAC = "flac" MP3 = "mp3" AAC = "aac" MPEG = "mpeg" - S24 = "s24" - S32 = "s32" - S64 = "s64" + PCM_S16LE = "s16le" # PCM signed 16-bit little-endian + PCM_S24LE = "s24le" # PCM signed 24-bit little-endian + PCM_S32LE = "s32le" # PCM signed 32-bit little-endian + PCM_F32LE = "f32le" # PCM 32-bit floating-point little-endian + PCM_F64LE = "f64le" # PCM 64-bit floating-point little-endian + + def is_pcm(self): + """Return if contentype is PCM.""" + return self.name.startswith("PCM") + + def sox_supported(self): + """Return if ContentType is supported by SoX.""" + return self not in [ContentType.AAC, ContentType.MPEG] + + def sox_format(self): + """Convert the ContentType to SoX compatible format.""" + if not self.sox_supported(): + raise NotImplementedError + return self.value.replace("le", "") @dataclass diff --git a/music_assistant/web/stream.py b/music_assistant/web/stream.py index a3d617d0..b160b89f 100644 --- a/music_assistant/web/stream.py +++ b/music_assistant/web/stream.py @@ -21,6 +21,7 @@ from music_assistant.constants import ( from music_assistant.helpers.audio import ( analyze_audio, crossfade_pcm_parts, + get_sox_args, get_stream_details, strip_silence, ) @@ -29,7 +30,7 @@ from music_assistant.helpers.typing import MusicAssistant from music_assistant.helpers.util import create_task from music_assistant.helpers.web import require_local_subnet from music_assistant.models.player_queue import PlayerQueue -from music_assistant.models.streamdetails import ContentType, StreamDetails, StreamType +from music_assistant.models.streamdetails import ContentType, StreamDetails routes = RouteTableDef() @@ -45,7 +46,6 @@ async def stream_queue(request: Request): player_queue = mass.players.get_player_queue(player_id) if not player_queue: raise HTTPNotFound(reason="invalid player_id") - LOGGER.info("Start Queue Stream for player %s ", player_queue.player.name) # prepare request resp = StreamResponse( @@ -54,12 +54,13 @@ async def stream_queue(request: Request): await resp.prepare(request) player_conf = player_queue.player.config + pcm_format = "f64" sample_rate = min(player_conf.get(CONF_MAX_SAMPLE_RATE, 96000), 96000) args = [ "sox", "-t", - "s32", + pcm_format, "-c", "2", "-r", @@ -71,11 +72,16 @@ async def stream_queue(request: Request): ] async with AsyncProcess(args, enable_write=True) as sox_proc: + LOGGER.info( + "Start Queue Stream for player %s", + player_queue.player.name, + ) + # feed stdin with pcm samples async def fill_buffer(): """Feed audio data into sox stdin for processing.""" async for audio_chunk in get_queue_stream( - mass, player_queue, sample_rate, 32 + mass, player_queue, sample_rate, pcm_format ): await sox_proc.write(audio_chunk) del audio_chunk @@ -138,7 +144,7 @@ async def stream_single_queue_item(request: Request): player_queue.player.name, ) - async for _, audio_chunk in get_media_stream(mass, streamdetails): + async for _, audio_chunk in get_media_stream(mass, streamdetails, ContentType.FLAC): await resp.write(audio_chunk) del audio_chunk LOGGER.debug( @@ -181,68 +187,18 @@ async def get_media_stream( chunk_size: Optional[int] = None, ) -> AsyncGenerator[Tuple[bool, bytes], None]: """Get the audio stream for the given streamdetails.""" - input_format = streamdetails.content_type.value - stream_path = streamdetails.path - stream_type = StreamType(streamdetails.type) - if output_format is None: - output_format = ContentType.FLAC - - # collect all args for sox/ffmpeg - if output_format in [ - ContentType.S24, - ContentType.S32, - ContentType.S64, - ]: - output_args = ["-t", output_format.value, "-c", "2", "-"] - elif output_format == ContentType.FLAC: - output_args = ["-t", output_format.value] + ["-C", "0", "-"] - else: - output_args = ["-t", output_format.value, "-"] - # stream from URL or file - if stream_type in [StreamType.URL, StreamType.FILE]: - # input_args = ["sox", "-t", input_format, stream_path] - input_args = ["sox", stream_path] - # stream from executable - else: - input_args = [stream_path, "|", "sox", "-t", input_format, "-"] - - filter_args = [] - if streamdetails.gain_correct: - filter_args += ["vol", str(streamdetails.gain_correct), "dB"] - if resample: - filter_args += ["rate", "-v", str(resample)] - - if streamdetails.content_type in [ContentType.AAC, ContentType.MPEG]: - # use ffmpeg for processing radio streams - args = [ - "ffmpeg", - "-hide_banner", - "-loglevel", - "error", - "-i", - stream_path, - "-filter:a", - "volume=%sdB" % streamdetails.gain_correct, - "-f", - "flac", - "-", - ] - else: - # regular sox processing - args = input_args + output_args + filter_args - - # signal start of stream event mass.eventbus.signal(EVENT_STREAM_STARTED, streamdetails) - LOGGER.debug( - "start media stream for: %s/%s (%s)", - streamdetails.provider, - streamdetails.item_id, - streamdetails.type, - ) - + args = get_sox_args(streamdetails, output_format, resample) async with AsyncProcess(args) as sox_proc: + LOGGER.debug( + "start media stream for: %s/%s (%s)", + streamdetails.provider, + streamdetails.item_id, + streamdetails.type, + ) + # yield chunks from stdout # we keep 1 chunk behind to detect end of stream properly try: @@ -280,15 +236,29 @@ async def get_media_stream( async def get_queue_stream( - mass: MusicAssistant, player_queue: PlayerQueue, sample_rate=96000, bit_depth=32 + mass: MusicAssistant, + player_queue: PlayerQueue, + sample_rate=96000, + pcm_format: str = "f64", + channels: int = 2, ) -> AsyncGenerator[bytes, None]: """Stream the PlayerQueue's tracks as constant feed in PCM raw audio.""" last_fadeout_data = b"" queue_index = None # get crossfade details fade_length = player_queue.crossfade_duration - pcm_args = ["s32", "-c", "2", "-r", str(sample_rate)] - sample_size = int(sample_rate * (bit_depth / 8) * 2) # 1 second + if pcm_format in ["s64", "f64"]: + bit_depth = 64 + elif pcm_format in ["s32", "f32"]: + bit_depth = 32 + elif pcm_format == "s16": + bit_depth = 16 + elif pcm_format == "s24": + bit_depth = 24 + else: + raise NotImplementedError("Unsupported PCM format: %s" % pcm_format) + pcm_args = [pcm_format, "-c", "2", "-r", str(sample_rate)] + sample_size = int(sample_rate * (bit_depth / 8) * channels) # 1 second buffer_size = sample_size * fade_length if fade_length else sample_size * 10 # stream queue tracks one by one while True: @@ -322,7 +292,7 @@ async def get_queue_stream( async for is_last_chunk, chunk in get_media_stream( mass, streamdetails, - ContentType.S32, + ContentType.PCM_F64LE, resample=sample_rate, chunk_size=buffer_size, ): -- 2.34.1