From c5cc6c58cd48b486239e86ce615d52ae6b8ba17d Mon Sep 17 00:00:00 2001 From: Marcel van der Veldt Date: Tue, 14 Jan 2025 20:00:15 +0100 Subject: [PATCH] Fix: Audio streaming hangs suddenly at the end of a track (#1872) --- Dockerfile | 2 -- Dockerfile.base | 47 ++++++++++++++++----------- music_assistant/helpers/audio.py | 53 +++++++++++++++++++++---------- music_assistant/helpers/ffmpeg.py | 47 ++++++++++++--------------- 4 files changed, 84 insertions(+), 65 deletions(-) diff --git a/Dockerfile b/Dockerfile index 8fa94bc1..2d2412bd 100644 --- a/Dockerfile +++ b/Dockerfile @@ -18,13 +18,11 @@ ADD dist dist COPY requirements_all.txt . RUN uv pip install \ --no-cache \ - --find-links "https://wheels.home-assistant.io/musllinux/" \ -r requirements_all.txt # Install Music Assistant from prebuilt wheel RUN uv pip install \ --no-cache \ - --find-links "https://wheels.home-assistant.io/musllinux/" \ "music-assistant@dist/music_assistant-${MASS_VERSION}-py3-none-any.whl" # Set some labels diff --git a/Dockerfile.base b/Dockerfile.base index 5aa8d1df..d0338fed 100644 --- a/Dockerfile.base +++ b/Dockerfile.base @@ -1,29 +1,37 @@ # syntax=docker/dockerfile:1 # BASE docker image for music assistant container - -FROM python:3.12-alpine3.20 +# Based on Debian Trixie (unstable) because we need a newer version of ffmpeg (and snapcast) +# TODO: Switch back to regular python stable debian image + manually build ffmpeg and snapcast +FROM debian:trixie ARG TARGETPLATFORM + RUN set -x \ - && apk add --no-cache \ + && apt-get update \ + && apt-get install -y --no-install-recommends \ ca-certificates \ - jemalloc \ curl \ git \ wget \ tzdata \ + python3 \ + python3-venv \ + python3-pip \ + libsox-fmt-all \ + libsox3 \ + ffmpeg \ sox \ + openssl \ cifs-utils \ - # install ffmpeg from community repo - && apk add --no-cache ffmpeg --repository=https://dl-cdn.alpinelinux.org/alpine/v3.20/community \ - # install snapcast from community repo - && apk add --no-cache snapcast --repository=https://dl-cdn.alpinelinux.org/alpine/edge/community \ - # install libnfs from community repo - && apk add --no-cache libnfs --repository=https://dl-cdn.alpinelinux.org/alpine/v3.20/community \ - # install openssl-dev (needed for airplay) - && apk add --no-cache openssl-dev + libnfs-utils \ + libjemalloc2 \ + snapserver \ + # cleanup + && rm -rf /tmp/* \ + && rm -rf /var/lib/apt/lists/* + # Copy widevine client files to container RUN mkdir -p /usr/local/bin/widevine_cdm @@ -31,14 +39,16 @@ COPY widevine_cdm/* /usr/local/bin/widevine_cdm/ WORKDIR /app -# Configure runtime environmental variables -ENV LD_PRELOAD="/usr/lib/libjemalloc.so.2" -ENV VIRTUAL_ENV=/app/venv +# Enable jemalloc +RUN \ + export LD_PRELOAD="$(find /usr/lib/ -name *libjemalloc.so.2)" \ + export MALLOC_CONF="background_thread:true,metadata_thp:auto,dirty_decay_ms:20000,muzzy_decay_ms:20000" # create python venv -RUN python3 -m venv $VIRTUAL_ENV && \ - source $VIRTUAL_ENV/bin/activate && \ - pip install --upgrade pip \ +ENV VIRTUAL_ENV=/app/venv +RUN python3 -m venv $VIRTUAL_ENV +ENV PATH="$VIRTUAL_ENV/bin:$PATH" +RUN pip install --upgrade pip \ && pip install uv==0.4.17 # we need to set (very permissive) permissions to the workdir @@ -48,7 +58,6 @@ RUN python3 -m venv $VIRTUAL_ENV && \ RUN chmod -R 777 /app \ && chmod -R 777 /tmp -ENV PATH="$VIRTUAL_ENV/bin:$PATH" WORKDIR $VIRTUAL_ENV LABEL \ diff --git a/music_assistant/helpers/audio.py b/music_assistant/helpers/audio.py index c9af6693..3d82d92d 100644 --- a/music_assistant/helpers/audio.py +++ b/music_assistant/helpers/audio.py @@ -46,7 +46,7 @@ from .dsp import filter_to_ffmpeg_params from .ffmpeg import FFMpeg, get_ffmpeg_stream from .playlists import IsHLSPlaylist, PlaylistItem, fetch_playlist, parse_m3u from .process import AsyncProcess, check_output, communicate -from .util import create_tempfile, detect_charset +from .util import TimedAsyncGenerator, create_tempfile, detect_charset if TYPE_CHECKING: from music_assistant_models.config_entries import CoreConfig, PlayerConfig @@ -124,7 +124,8 @@ async def crossfade_pcm_parts( return crossfaded_audio # no crossfade_data, return original data instead LOGGER.debug( - "crossfade of pcm chunks failed: not enough data? " "fade_in_part: %s - fade_out_part: %s", + "crossfade of pcm chunks failed: not enough data? " + "- fade_in_part: %s - fade_out_part: %s", len(fade_in_part), len(fade_out_part), ) @@ -287,7 +288,7 @@ async def get_media_stream( ) -> AsyncGenerator[bytes, None]: """Get PCM audio stream for given media details.""" logger = LOGGER.getChild("media_stream") - logger.debug("start media stream for: %s", streamdetails.uri) + logger.log(VERBOSE_LOG_LEVEL, "Starting media stream for %s", streamdetails.uri) strip_silence_begin = streamdetails.strip_silence_begin strip_silence_end = streamdetails.strip_silence_end if streamdetails.fade_in: @@ -297,7 +298,6 @@ async def get_media_stream( chunk_number = 0 buffer: bytes = b"" finished = False - ffmpeg_proc = FFMpeg( audio_input=audio_source, input_format=streamdetails.audio_format, @@ -305,10 +305,25 @@ async def get_media_stream( filter_params=filter_params, extra_input_args=extra_input_args, collect_log_history=True, + loglevel="debug" if LOGGER.isEnabledFor(VERBOSE_LOG_LEVEL) else "info", ) try: await ffmpeg_proc.start() - async for chunk in ffmpeg_proc.iter_chunked(pcm_format.pcm_sample_size): + logger.debug( + "Started media stream for %s" + " - using streamtype: %s " + " - volume normalization: %s" + " - pcm format: %s" + " - ffmpeg PID: %s", + streamdetails.uri, + streamdetails.stream_type, + streamdetails.volume_normalization_mode, + pcm_format.content_type.value, + ffmpeg_proc.proc.pid, + ) + async for chunk in TimedAsyncGenerator( + ffmpeg_proc.iter_chunked(pcm_format.pcm_sample_size), timeout=30 + ): # for radio streams we just yield all chunks directly if streamdetails.media_type == MediaType.RADIO: yield chunk @@ -349,6 +364,7 @@ async def get_media_stream( buffer = buffer[pcm_format.pcm_sample_size :] # end of audio/track reached + logger.log(VERBOSE_LOG_LEVEL, "End of stream reached.") if strip_silence_end and buffer: # strip silence from end of audio buffer = await strip_silence( @@ -364,6 +380,7 @@ async def get_media_stream( finished = True finally: + logger.log(VERBOSE_LOG_LEVEL, "Closing ffmpeg...") await ffmpeg_proc.close() if bytes_sent == 0: @@ -387,15 +404,17 @@ async def get_media_stream( if finished and not streamdetails.seek_position and seconds_streamed: streamdetails.duration = seconds_streamed - # parse loudnorm data if we have that collected + # parse loudnorm data if we have that collected (and enabled) if ( - streamdetails.loudness is None - and streamdetails.volume_normalization_mode != VolumeNormalizationMode.DISABLED + (streamdetails.loudness is None or finished) + and streamdetails.volume_normalization_mode + in (VolumeNormalizationMode.DYNAMIC, VolumeNormalizationMode.FALLBACK_DYNAMIC) and (finished or (seconds_streamed >= 300)) ): # if dynamic volume normalization is enabled and the entire track is streamed # the loudnorm filter will output the measuremeet in the log, # so we can use those directly instead of analyzing the audio + logger.log(VERBOSE_LOG_LEVEL, "Collecting loudness measurement...") if loudness_details := parse_loudnorm(" ".join(ffmpeg_proc.log_history)): logger.debug( "Loudness measurement for %s: %s dB", @@ -882,16 +901,16 @@ def get_player_filter_params( def parse_loudnorm(raw_stderr: bytes | str) -> float | None: """Parse Loudness measurement from ffmpeg stderr output.""" stderr_data = raw_stderr.decode() if isinstance(raw_stderr, bytes) else raw_stderr - if "[Parsed_loudnorm_" not in stderr_data: + if "[Parsed_loudnorm_0 @" not in stderr_data: return None - stderr_data = stderr_data.split("[Parsed_loudnorm_")[1] - stderr_data = "{" + stderr_data.rsplit("{")[-1].strip() - stderr_data = stderr_data.rsplit("}")[0].strip() + "}" - try: - loudness_data = json_loads(stderr_data) - except JSON_DECODE_EXCEPTIONS: - return None - return float(loudness_data["input_i"]) + for jsun_chunk in stderr_data.split(" { "): + try: + stderr_data = "{" + jsun_chunk.rsplit("}")[0].strip() + "}" + loudness_data = json_loads(stderr_data) + return float(loudness_data["input_i"]) + except (*JSON_DECODE_EXCEPTIONS, KeyError, ValueError, IndexError): + continue + return None async def analyze_loudness( diff --git a/music_assistant/helpers/ffmpeg.py b/music_assistant/helpers/ffmpeg.py index 31fbdd8b..99e3d897 100644 --- a/music_assistant/helpers/ffmpeg.py +++ b/music_assistant/helpers/ffmpeg.py @@ -4,6 +4,7 @@ from __future__ import annotations import asyncio import logging +import time from collections import deque from collections.abc import AsyncGenerator from typing import TYPE_CHECKING @@ -15,7 +16,7 @@ from music_assistant_models.helpers import get_global_cache_value from music_assistant.constants import VERBOSE_LOG_LEVEL from .process import AsyncProcess -from .util import close_async_generator +from .util import TimedAsyncGenerator, close_async_generator if TYPE_CHECKING: from music_assistant_models.media_items import AudioFormat @@ -37,6 +38,7 @@ class FFMpeg(AsyncProcess): extra_input_args: list[str] | None = None, audio_output: str | int = "-", collect_log_history: bool = False, + loglevel: str = "error", ) -> None: """Initialize AsyncProcess.""" ffmpeg_args = get_ffmpeg_args( @@ -47,7 +49,7 @@ class FFMpeg(AsyncProcess): input_path=audio_input if isinstance(audio_input, str) else "-", output_path=audio_output if isinstance(audio_output, str) else "-", extra_input_args=extra_input_args or [], - loglevel="info", + loglevel=loglevel, ) self.audio_input = audio_input self.input_format = input_format @@ -127,36 +129,27 @@ class FFMpeg(AsyncProcess): if TYPE_CHECKING: self.audio_input: AsyncGenerator[bytes, None] generator_exhausted = False - audio_received = asyncio.Event() - - async def stdin_watchdog() -> None: - # this is a simple watchdog to ensure we don't get stuck forever waiting for audio data - try: - await asyncio.wait_for(audio_received.wait(), timeout=30) - except TimeoutError: - self.logger.error("No audio data received from source after timeout") - self._stdin_task.cancel() - - asyncio.create_task(stdin_watchdog()) - + cancelled = False try: - async for chunk in self.audio_input: - if not audio_received.is_set(): - audio_received.set() + start = time.time() + self.logger.log(VERBOSE_LOG_LEVEL, "Start reading audio data from source...") + async for chunk in TimedAsyncGenerator(self.audio_input, timeout=30): await self.write(chunk) + self.logger.log( + VERBOSE_LOG_LEVEL, "Audio data source exhausted in %.2fs", time.time() - start + ) generator_exhausted = True - if not audio_received.is_set(): - raise AudioError("No audio data received from source") except Exception as err: - if isinstance(err, asyncio.CancelledError): - return - self.logger.error( - "Stream error: %s", - str(err) or err.__class__.__name__, - exc_info=err if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL) else None, - ) + cancelled = isinstance(err, asyncio.CancelledError) + if not cancelled: + self.logger.error( + "Stream error: %s", + str(err) or err.__class__.__name__, + exc_info=err if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL) else None, + ) finally: - await self.write_eof() + if not cancelled: + await self.write_eof() # we need to ensure that we close the async generator # if we get cancelled otherwise it keeps lingering forever if not generator_exhausted: -- 2.34.1