Fix: Audio streaming hangs suddenly at the end of a track (#1872)
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Tue, 14 Jan 2025 19:00:15 +0000 (20:00 +0100)
committerGitHub <noreply@github.com>
Tue, 14 Jan 2025 19:00:15 +0000 (20:00 +0100)
Dockerfile
Dockerfile.base
music_assistant/helpers/audio.py
music_assistant/helpers/ffmpeg.py

index 8fa94bc1fa00c50437a953021784dbd7b08dac44..2d2412bd371313f7bcf64d5bf85063c5d2d56ff9 100644 (file)
@@ -18,13 +18,11 @@ ADD dist dist
 COPY requirements_all.txt .
 RUN uv pip install \
     --no-cache \
-    --find-links "https://wheels.home-assistant.io/musllinux/" \
     -r requirements_all.txt
 
 # Install Music Assistant from prebuilt wheel
 RUN uv pip install \
     --no-cache \
-    --find-links "https://wheels.home-assistant.io/musllinux/" \
     "music-assistant@dist/music_assistant-${MASS_VERSION}-py3-none-any.whl"
 
 # Set some labels
index 5aa8d1dfb459dd18bbd17b586f747fa01daae250..d0338fed235c8ef9b260c28136a4a2fde2dff143 100644 (file)
@@ -1,29 +1,37 @@
 # syntax=docker/dockerfile:1
 
 # BASE docker image for music assistant container
-
-FROM python:3.12-alpine3.20
+# Based on Debian Trixie (unstable) because we need a newer version of ffmpeg (and snapcast)
+# TODO: Switch back to regular python stable debian image + manually build ffmpeg and snapcast
+FROM debian:trixie
 
 ARG TARGETPLATFORM
 
+
 RUN set -x \
-    && apk add --no-cache \
+    && apt-get update \
+    && apt-get install -y --no-install-recommends \
         ca-certificates \
-        jemalloc \
         curl \
         git \
         wget \
         tzdata \
+        python3 \
+        python3-venv \
+        python3-pip \
+        libsox-fmt-all \
+        libsox3 \
+        ffmpeg \
         sox \
+        openssl \
         cifs-utils \
-    # install ffmpeg from community repo
-    && apk add --no-cache ffmpeg --repository=https://dl-cdn.alpinelinux.org/alpine/v3.20/community \
-    # install snapcast from community repo
-    && apk add --no-cache snapcast --repository=https://dl-cdn.alpinelinux.org/alpine/edge/community \
-    # install libnfs from community repo
-    && apk add --no-cache libnfs --repository=https://dl-cdn.alpinelinux.org/alpine/v3.20/community \
-    # install openssl-dev (needed for airplay)
-    && apk add --no-cache openssl-dev
+        libnfs-utils \
+        libjemalloc2 \
+        snapserver \
+    # cleanup
+    && rm -rf /tmp/* \
+    && rm -rf /var/lib/apt/lists/*
+
 
 # Copy widevine client files to container
 RUN mkdir -p /usr/local/bin/widevine_cdm
@@ -31,14 +39,16 @@ COPY widevine_cdm/* /usr/local/bin/widevine_cdm/
 
 WORKDIR /app
 
-# Configure runtime environmental variables
-ENV LD_PRELOAD="/usr/lib/libjemalloc.so.2"
-ENV VIRTUAL_ENV=/app/venv
+# Enable jemalloc
+RUN \
+    export LD_PRELOAD="$(find /usr/lib/ -name *libjemalloc.so.2)" \
+    export MALLOC_CONF="background_thread:true,metadata_thp:auto,dirty_decay_ms:20000,muzzy_decay_ms:20000"
 
 # create python venv
-RUN python3 -m venv $VIRTUAL_ENV && \
-    source $VIRTUAL_ENV/bin/activate && \
-    pip install --upgrade pip \
+ENV VIRTUAL_ENV=/app/venv
+RUN python3 -m venv $VIRTUAL_ENV
+ENV PATH="$VIRTUAL_ENV/bin:$PATH"
+RUN pip install --upgrade pip \
     && pip install uv==0.4.17
 
 # we need to set (very permissive) permissions to the workdir
@@ -48,7 +58,6 @@ RUN python3 -m venv $VIRTUAL_ENV && \
 RUN chmod -R 777 /app \
     && chmod -R 777 /tmp
 
-ENV PATH="$VIRTUAL_ENV/bin:$PATH"
 WORKDIR $VIRTUAL_ENV
 
 LABEL \
index c9af669380bbf1f1707d6f187929cb04f690c64c..3d82d92d0475e3726ccc28b813bea4bfca4ea4e8 100644 (file)
@@ -46,7 +46,7 @@ from .dsp import filter_to_ffmpeg_params
 from .ffmpeg import FFMpeg, get_ffmpeg_stream
 from .playlists import IsHLSPlaylist, PlaylistItem, fetch_playlist, parse_m3u
 from .process import AsyncProcess, check_output, communicate
-from .util import create_tempfile, detect_charset
+from .util import TimedAsyncGenerator, create_tempfile, detect_charset
 
 if TYPE_CHECKING:
     from music_assistant_models.config_entries import CoreConfig, PlayerConfig
@@ -124,7 +124,8 @@ async def crossfade_pcm_parts(
         return crossfaded_audio
     # no crossfade_data, return original data instead
     LOGGER.debug(
-        "crossfade of pcm chunks failed: not enough data? " "fade_in_part: %s - fade_out_part: %s",
+        "crossfade of pcm chunks failed: not enough data? "
+        "- fade_in_part: %s - fade_out_part: %s",
         len(fade_in_part),
         len(fade_out_part),
     )
@@ -287,7 +288,7 @@ async def get_media_stream(
 ) -> AsyncGenerator[bytes, None]:
     """Get PCM audio stream for given media details."""
     logger = LOGGER.getChild("media_stream")
-    logger.debug("start media stream for: %s", streamdetails.uri)
+    logger.log(VERBOSE_LOG_LEVEL, "Starting media stream for %s", streamdetails.uri)
     strip_silence_begin = streamdetails.strip_silence_begin
     strip_silence_end = streamdetails.strip_silence_end
     if streamdetails.fade_in:
@@ -297,7 +298,6 @@ async def get_media_stream(
     chunk_number = 0
     buffer: bytes = b""
     finished = False
-
     ffmpeg_proc = FFMpeg(
         audio_input=audio_source,
         input_format=streamdetails.audio_format,
@@ -305,10 +305,25 @@ async def get_media_stream(
         filter_params=filter_params,
         extra_input_args=extra_input_args,
         collect_log_history=True,
+        loglevel="debug" if LOGGER.isEnabledFor(VERBOSE_LOG_LEVEL) else "info",
     )
     try:
         await ffmpeg_proc.start()
-        async for chunk in ffmpeg_proc.iter_chunked(pcm_format.pcm_sample_size):
+        logger.debug(
+            "Started media stream for %s"
+            " - using streamtype: %s "
+            " - volume normalization: %s"
+            " - pcm format: %s"
+            " - ffmpeg PID: %s",
+            streamdetails.uri,
+            streamdetails.stream_type,
+            streamdetails.volume_normalization_mode,
+            pcm_format.content_type.value,
+            ffmpeg_proc.proc.pid,
+        )
+        async for chunk in TimedAsyncGenerator(
+            ffmpeg_proc.iter_chunked(pcm_format.pcm_sample_size), timeout=30
+        ):
             # for radio streams we just yield all chunks directly
             if streamdetails.media_type == MediaType.RADIO:
                 yield chunk
@@ -349,6 +364,7 @@ async def get_media_stream(
                 buffer = buffer[pcm_format.pcm_sample_size :]
 
         # end of audio/track reached
+        logger.log(VERBOSE_LOG_LEVEL, "End of stream reached.")
         if strip_silence_end and buffer:
             # strip silence from end of audio
             buffer = await strip_silence(
@@ -364,6 +380,7 @@ async def get_media_stream(
         finished = True
 
     finally:
+        logger.log(VERBOSE_LOG_LEVEL, "Closing ffmpeg...")
         await ffmpeg_proc.close()
 
         if bytes_sent == 0:
@@ -387,15 +404,17 @@ async def get_media_stream(
         if finished and not streamdetails.seek_position and seconds_streamed:
             streamdetails.duration = seconds_streamed
 
-        # parse loudnorm data if we have that collected
+        # parse loudnorm data if we have that collected (and enabled)
         if (
-            streamdetails.loudness is None
-            and streamdetails.volume_normalization_mode != VolumeNormalizationMode.DISABLED
+            (streamdetails.loudness is None or finished)
+            and streamdetails.volume_normalization_mode
+            in (VolumeNormalizationMode.DYNAMIC, VolumeNormalizationMode.FALLBACK_DYNAMIC)
             and (finished or (seconds_streamed >= 300))
         ):
             # if dynamic volume normalization is enabled and the entire track is streamed
             # the loudnorm filter will output the measuremeet in the log,
             # so we can use those directly instead of analyzing the audio
+            logger.log(VERBOSE_LOG_LEVEL, "Collecting loudness measurement...")
             if loudness_details := parse_loudnorm(" ".join(ffmpeg_proc.log_history)):
                 logger.debug(
                     "Loudness measurement for %s: %s dB",
@@ -882,16 +901,16 @@ def get_player_filter_params(
 def parse_loudnorm(raw_stderr: bytes | str) -> float | None:
     """Parse Loudness measurement from ffmpeg stderr output."""
     stderr_data = raw_stderr.decode() if isinstance(raw_stderr, bytes) else raw_stderr
-    if "[Parsed_loudnorm_" not in stderr_data:
+    if "[Parsed_loudnorm_0 @" not in stderr_data:
         return None
-    stderr_data = stderr_data.split("[Parsed_loudnorm_")[1]
-    stderr_data = "{" + stderr_data.rsplit("{")[-1].strip()
-    stderr_data = stderr_data.rsplit("}")[0].strip() + "}"
-    try:
-        loudness_data = json_loads(stderr_data)
-    except JSON_DECODE_EXCEPTIONS:
-        return None
-    return float(loudness_data["input_i"])
+    for jsun_chunk in stderr_data.split(" { "):
+        try:
+            stderr_data = "{" + jsun_chunk.rsplit("}")[0].strip() + "}"
+            loudness_data = json_loads(stderr_data)
+            return float(loudness_data["input_i"])
+        except (*JSON_DECODE_EXCEPTIONS, KeyError, ValueError, IndexError):
+            continue
+    return None
 
 
 async def analyze_loudness(
index 31fbdd8b561b7557c0a2ed68a9554585cdb8e7fb..99e3d8970065ef093e45c665760ceda6ca150646 100644 (file)
@@ -4,6 +4,7 @@ from __future__ import annotations
 
 import asyncio
 import logging
+import time
 from collections import deque
 from collections.abc import AsyncGenerator
 from typing import TYPE_CHECKING
@@ -15,7 +16,7 @@ from music_assistant_models.helpers import get_global_cache_value
 from music_assistant.constants import VERBOSE_LOG_LEVEL
 
 from .process import AsyncProcess
-from .util import close_async_generator
+from .util import TimedAsyncGenerator, close_async_generator
 
 if TYPE_CHECKING:
     from music_assistant_models.media_items import AudioFormat
@@ -37,6 +38,7 @@ class FFMpeg(AsyncProcess):
         extra_input_args: list[str] | None = None,
         audio_output: str | int = "-",
         collect_log_history: bool = False,
+        loglevel: str = "error",
     ) -> None:
         """Initialize AsyncProcess."""
         ffmpeg_args = get_ffmpeg_args(
@@ -47,7 +49,7 @@ class FFMpeg(AsyncProcess):
             input_path=audio_input if isinstance(audio_input, str) else "-",
             output_path=audio_output if isinstance(audio_output, str) else "-",
             extra_input_args=extra_input_args or [],
-            loglevel="info",
+            loglevel=loglevel,
         )
         self.audio_input = audio_input
         self.input_format = input_format
@@ -127,36 +129,27 @@ class FFMpeg(AsyncProcess):
         if TYPE_CHECKING:
             self.audio_input: AsyncGenerator[bytes, None]
         generator_exhausted = False
-        audio_received = asyncio.Event()
-
-        async def stdin_watchdog() -> None:
-            # this is a simple watchdog to ensure we don't get stuck forever waiting for audio data
-            try:
-                await asyncio.wait_for(audio_received.wait(), timeout=30)
-            except TimeoutError:
-                self.logger.error("No audio data received from source after timeout")
-                self._stdin_task.cancel()
-
-        asyncio.create_task(stdin_watchdog())
-
+        cancelled = False
         try:
-            async for chunk in self.audio_input:
-                if not audio_received.is_set():
-                    audio_received.set()
+            start = time.time()
+            self.logger.log(VERBOSE_LOG_LEVEL, "Start reading audio data from source...")
+            async for chunk in TimedAsyncGenerator(self.audio_input, timeout=30):
                 await self.write(chunk)
+            self.logger.log(
+                VERBOSE_LOG_LEVEL, "Audio data source exhausted in %.2fs", time.time() - start
+            )
             generator_exhausted = True
-            if not audio_received.is_set():
-                raise AudioError("No audio data received from source")
         except Exception as err:
-            if isinstance(err, asyncio.CancelledError):
-                return
-            self.logger.error(
-                "Stream error: %s",
-                str(err) or err.__class__.__name__,
-                exc_info=err if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL) else None,
-            )
+            cancelled = isinstance(err, asyncio.CancelledError)
+            if not cancelled:
+                self.logger.error(
+                    "Stream error: %s",
+                    str(err) or err.__class__.__name__,
+                    exc_info=err if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL) else None,
+                )
         finally:
-            await self.write_eof()
+            if not cancelled:
+                await self.write_eof()
             # we need to ensure that we close the async generator
             # if we get cancelled otherwise it keeps lingering forever
             if not generator_exhausted: