streamer fixes
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Mon, 2 Aug 2021 17:07:45 +0000 (19:07 +0200)
committerMarcel van der Veldt <m.vanderveldt@outlook.com>
Mon, 2 Aug 2021 17:07:45 +0000 (19:07 +0200)
- use 64 bit float for internal PCM queue stream
- fix radio playback through ffmpeg
- fix SoX executable stuck after stream disconnect

music_assistant/helpers/audio.py
music_assistant/helpers/process.py
music_assistant/managers/tasks.py
music_assistant/models/streamdetails.py
music_assistant/web/stream.py

index d54b6e3edfaf698bb9247c1ea57d47f406031a6f..d2432009c7b62454d412d91c11c7f5705c5e4254 100644 (file)
@@ -4,7 +4,7 @@ import asyncio
 import logging
 import struct
 from io import BytesIO
-from typing import List, Tuple
+from typing import List, Optional, Tuple
 
 from music_assistant.helpers.process import AsyncProcess
 from music_assistant.helpers.typing import MusicAssistant, QueueItem
@@ -255,3 +255,69 @@ def create_wave_header(samplerate=44100, channels=2, bitspersample=16, duration=
 
     # return file.getvalue(), all_chunks_size + 8
     return file.getvalue()
+
+
+def get_sox_args(
+    streamdetails: StreamDetails,
+    output_format: Optional[ContentType] = None,
+    resample: Optional[int] = None,
+):
+    """Collect all args to send to the sox (or ffmpeg) process."""
+    stream_path = streamdetails.path
+    stream_type = StreamType(streamdetails.type)
+    content_type = streamdetails.content_type
+    if output_format is None:
+        output_format = streamdetails.content_type
+
+    # use ffmpeg if content not supported by SoX (e.g. AAC radio streams)
+    if not streamdetails.content_type.sox_supported():
+        # collect input args
+        input_args = ["ffmpeg", "-hide_banner", "-loglevel", "error", "-i", stream_path]
+        # collect output args
+        if output_format.is_pcm():
+            output_args = [
+                "-f",
+                output_format.value,
+                "-c:a",
+                output_format.name.lower(),
+                "-",
+            ]
+        else:
+            output_args = ["-f", output_format.value, "-"]
+        # collect filter args
+        filter_args = []
+        if streamdetails.gain_correct:
+            filter_args += ["-filter:a", "volume=%sdB" % streamdetails.gain_correct]
+        if resample:
+            filter_args += ["-ar", str(resample)]
+        return input_args + filter_args + output_args
+
+    # Prefer SoX for all other (=highest quality)
+    if stream_type == StreamType.EXECUTABLE:
+        # stream from executable
+        input_args = [
+            stream_path,
+            "|",
+            "sox",
+            "-t",
+            content_type.sox_format(),
+            "-",
+        ]
+    else:
+        input_args = ["sox", "-t", content_type.sox_format(), stream_path]
+    # collect output args
+    if output_format.is_pcm():
+        output_args = ["-t", output_format.sox_format(), "-c", "2", "-"]
+    elif output_format == ContentType.FLAC:
+        output_args = ["-t", "flac", "-C", "0", "-"]
+    else:
+        output_args = ["-t", output_format.sox_format(), "-"]
+    # collect filter args
+    filter_args = []
+    if streamdetails.gain_correct:
+        filter_args += ["vol", str(streamdetails.gain_correct), "dB"]
+    if resample:
+        filter_args += ["rate", "-v", str(resample)]
+    # TODO: still not sure about the order of the filter arguments in the chain
+    # assumption is they need to be at the end of the chain
+    return input_args + output_args + filter_args
index 236499e3919953ce8d8e0c9a33020e5692fcabc3..13ce82b23ca1c4ebea20694089ed102ee437b88a 100644 (file)
@@ -29,22 +29,24 @@ class AsyncProcess:
     async def __aenter__(self) -> "AsyncProcess":
         """Enter context manager."""
         if "|" in self._args:
-            self._args = " ".join(self._args)
+            args = " ".join(self._args)
+        else:
+            args = self._args
 
-        if isinstance(self._args, str):
+        if isinstance(args, str):
             self._proc = await asyncio.create_subprocess_shell(
-                self._args,
+                args,
                 stdin=asyncio.subprocess.PIPE if self._enable_write else None,
                 stdout=asyncio.subprocess.PIPE,
-                limit=4000000,
+                limit=DEFAULT_CHUNKSIZE,
                 close_fds=True,
             )
         else:
             self._proc = await asyncio.create_subprocess_exec(
-                *self._args,
+                *args,
                 stdin=asyncio.subprocess.PIPE if self._enable_write else None,
                 stdout=asyncio.subprocess.PIPE,
-                limit=4000000,
+                limit=DEFAULT_CHUNKSIZE,
                 close_fds=True,
             )
         return self
@@ -53,7 +55,8 @@ class AsyncProcess:
         """Exit context manager."""
         if self._proc.returncode is None:
             # prevent subprocess deadlocking, send terminate and read remaining bytes
-            await self.write_eof()
+            if self._enable_write:
+                self._proc.stdin.close()
             try:
                 self._proc.terminate()
                 await self._proc.stdout.read()
@@ -96,15 +99,6 @@ class AsyncProcess:
         except AttributeError:
             raise asyncio.CancelledError()
 
-    async def write_eof(self) -> None:
-        """Write eof to process."""
-        if not (self._enable_write and self._proc.stdin.can_write_eof()):
-            return
-        try:
-            self._proc.stdin.write_eof()
-        except BrokenPipeError:
-            pass
-
     async def communicate(self, input_data: Optional[bytes] = None) -> bytes:
         """Write bytes to process and read back results."""
         return await self._proc.communicate(input_data)
index acbefb3312cb9616ae07034e7e638bc144915a36..583c1be05170cb1b5ee5eb4e39914ae705cb152f 100644 (file)
@@ -54,7 +54,7 @@ class TaskInfo:
 
     def __str__(self):
         """Return string representation, used for logging."""
-        return f"{self.name} ({id})"
+        return f"{self.name} ({self.id})"
 
     def to_dict(self) -> Dict[str, Any]:
         """Return serializable dict."""
index e0cf04055fd8d9066797fcc689c44478fa60de23..59720ca6d1b345cedec03b340d5bad3c2dae170d 100644 (file)
@@ -18,16 +18,32 @@ class StreamType(Enum):
 
 
 class ContentType(Enum):
-    """Enum with stream content types."""
+    """Enum with audio content types supported by ffmpeg."""
 
     OGG = "ogg"
     FLAC = "flac"
     MP3 = "mp3"
     AAC = "aac"
     MPEG = "mpeg"
-    S24 = "s24"
-    S32 = "s32"
-    S64 = "s64"
+    PCM_S16LE = "s16le"  # PCM signed 16-bit little-endian
+    PCM_S24LE = "s24le"  # PCM signed 24-bit little-endian
+    PCM_S32LE = "s32le"  # PCM signed 32-bit little-endian
+    PCM_F32LE = "f32le"  # PCM 32-bit floating-point little-endian
+    PCM_F64LE = "f64le"  # PCM 64-bit floating-point little-endian
+
+    def is_pcm(self):
+        """Return if contentype is PCM."""
+        return self.name.startswith("PCM")
+
+    def sox_supported(self):
+        """Return if ContentType is supported by SoX."""
+        return self not in [ContentType.AAC, ContentType.MPEG]
+
+    def sox_format(self):
+        """Convert the ContentType to SoX compatible format."""
+        if not self.sox_supported():
+            raise NotImplementedError
+        return self.value.replace("le", "")
 
 
 @dataclass
index a3d617d0c6c8bf3c87548f009b7b4428d62a2dde..b160b89f915dfe9c47fcc568cc9308ab8d445f72 100644 (file)
@@ -21,6 +21,7 @@ from music_assistant.constants import (
 from music_assistant.helpers.audio import (
     analyze_audio,
     crossfade_pcm_parts,
+    get_sox_args,
     get_stream_details,
     strip_silence,
 )
@@ -29,7 +30,7 @@ from music_assistant.helpers.typing import MusicAssistant
 from music_assistant.helpers.util import create_task
 from music_assistant.helpers.web import require_local_subnet
 from music_assistant.models.player_queue import PlayerQueue
-from music_assistant.models.streamdetails import ContentType, StreamDetails, StreamType
+from music_assistant.models.streamdetails import ContentType, StreamDetails
 
 routes = RouteTableDef()
 
@@ -45,7 +46,6 @@ async def stream_queue(request: Request):
     player_queue = mass.players.get_player_queue(player_id)
     if not player_queue:
         raise HTTPNotFound(reason="invalid player_id")
-    LOGGER.info("Start Queue Stream for player %s ", player_queue.player.name)
 
     # prepare request
     resp = StreamResponse(
@@ -54,12 +54,13 @@ async def stream_queue(request: Request):
     await resp.prepare(request)
 
     player_conf = player_queue.player.config
+    pcm_format = "f64"
     sample_rate = min(player_conf.get(CONF_MAX_SAMPLE_RATE, 96000), 96000)
 
     args = [
         "sox",
         "-t",
-        "s32",
+        pcm_format,
         "-c",
         "2",
         "-r",
@@ -71,11 +72,16 @@ async def stream_queue(request: Request):
     ]
     async with AsyncProcess(args, enable_write=True) as sox_proc:
 
+        LOGGER.info(
+            "Start Queue Stream for player %s",
+            player_queue.player.name,
+        )
+
         # feed stdin with pcm samples
         async def fill_buffer():
             """Feed audio data into sox stdin for processing."""
             async for audio_chunk in get_queue_stream(
-                mass, player_queue, sample_rate, 32
+                mass, player_queue, sample_rate, pcm_format
             ):
                 await sox_proc.write(audio_chunk)
                 del audio_chunk
@@ -138,7 +144,7 @@ async def stream_single_queue_item(request: Request):
         player_queue.player.name,
     )
 
-    async for _, audio_chunk in get_media_stream(mass, streamdetails):
+    async for _, audio_chunk in get_media_stream(mass, streamdetails, ContentType.FLAC):
         await resp.write(audio_chunk)
         del audio_chunk
     LOGGER.debug(
@@ -181,68 +187,18 @@ async def get_media_stream(
     chunk_size: Optional[int] = None,
 ) -> AsyncGenerator[Tuple[bool, bytes], None]:
     """Get the audio stream for the given streamdetails."""
-    input_format = streamdetails.content_type.value
-    stream_path = streamdetails.path
-    stream_type = StreamType(streamdetails.type)
-    if output_format is None:
-        output_format = ContentType.FLAC
-
-    # collect all args for sox/ffmpeg
-    if output_format in [
-        ContentType.S24,
-        ContentType.S32,
-        ContentType.S64,
-    ]:
-        output_args = ["-t", output_format.value, "-c", "2", "-"]
-    elif output_format == ContentType.FLAC:
-        output_args = ["-t", output_format.value] + ["-C", "0", "-"]
-    else:
-        output_args = ["-t", output_format.value, "-"]
 
-    # stream from URL or file
-    if stream_type in [StreamType.URL, StreamType.FILE]:
-        # input_args = ["sox", "-t", input_format, stream_path]
-        input_args = ["sox", stream_path]
-    # stream from executable
-    else:
-        input_args = [stream_path, "|", "sox", "-t", input_format, "-"]
-
-    filter_args = []
-    if streamdetails.gain_correct:
-        filter_args += ["vol", str(streamdetails.gain_correct), "dB"]
-    if resample:
-        filter_args += ["rate", "-v", str(resample)]
-
-    if streamdetails.content_type in [ContentType.AAC, ContentType.MPEG]:
-        # use ffmpeg for processing radio streams
-        args = [
-            "ffmpeg",
-            "-hide_banner",
-            "-loglevel",
-            "error",
-            "-i",
-            stream_path,
-            "-filter:a",
-            "volume=%sdB" % streamdetails.gain_correct,
-            "-f",
-            "flac",
-            "-",
-        ]
-    else:
-        # regular sox processing
-        args = input_args + output_args + filter_args
-
-    # signal start of stream event
     mass.eventbus.signal(EVENT_STREAM_STARTED, streamdetails)
-    LOGGER.debug(
-        "start media stream for: %s/%s (%s)",
-        streamdetails.provider,
-        streamdetails.item_id,
-        streamdetails.type,
-    )
-
+    args = get_sox_args(streamdetails, output_format, resample)
     async with AsyncProcess(args) as sox_proc:
 
+        LOGGER.debug(
+            "start media stream for: %s/%s (%s)",
+            streamdetails.provider,
+            streamdetails.item_id,
+            streamdetails.type,
+        )
+
         # yield chunks from stdout
         # we keep 1 chunk behind to detect end of stream properly
         try:
@@ -280,15 +236,29 @@ async def get_media_stream(
 
 
 async def get_queue_stream(
-    mass: MusicAssistant, player_queue: PlayerQueue, sample_rate=96000, bit_depth=32
+    mass: MusicAssistant,
+    player_queue: PlayerQueue,
+    sample_rate=96000,
+    pcm_format: str = "f64",
+    channels: int = 2,
 ) -> AsyncGenerator[bytes, None]:
     """Stream the PlayerQueue's tracks as constant feed in PCM raw audio."""
     last_fadeout_data = b""
     queue_index = None
     # get crossfade details
     fade_length = player_queue.crossfade_duration
-    pcm_args = ["s32", "-c", "2", "-r", str(sample_rate)]
-    sample_size = int(sample_rate * (bit_depth / 8) * 2)  # 1 second
+    if pcm_format in ["s64", "f64"]:
+        bit_depth = 64
+    elif pcm_format in ["s32", "f32"]:
+        bit_depth = 32
+    elif pcm_format == "s16":
+        bit_depth = 16
+    elif pcm_format == "s24":
+        bit_depth = 24
+    else:
+        raise NotImplementedError("Unsupported PCM format: %s" % pcm_format)
+    pcm_args = [pcm_format, "-c", "2", "-r", str(sample_rate)]
+    sample_size = int(sample_rate * (bit_depth / 8) * channels)  # 1 second
     buffer_size = sample_size * fade_length if fade_length else sample_size * 10
     # stream queue tracks one by one
     while True:
@@ -322,7 +292,7 @@ async def get_queue_stream(
         async for is_last_chunk, chunk in get_media_stream(
             mass,
             streamdetails,
-            ContentType.S32,
+            ContentType.PCM_F64LE,
             resample=sample_rate,
             chunk_size=buffer_size,
         ):