Several bugfixes and enhancements to audio streaming (#1660)
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Sat, 14 Sep 2024 01:59:53 +0000 (03:59 +0200)
committerGitHub <noreply@github.com>
Sat, 14 Sep 2024 01:59:53 +0000 (03:59 +0200)
14 files changed:
music_assistant/common/models/enums.py
music_assistant/common/models/streamdetails.py
music_assistant/constants.py
music_assistant/server/controllers/player_queues.py
music_assistant/server/controllers/streams.py
music_assistant/server/helpers/audio.py
music_assistant/server/helpers/ffmpeg.py [new file with mode: 0644]
music_assistant/server/helpers/util.py
music_assistant/server/providers/airplay/__init__.py
music_assistant/server/providers/bluesound/__init__.py
music_assistant/server/providers/dlna/__init__.py
music_assistant/server/providers/snapcast/__init__.py
music_assistant/server/providers/sonos/__init__.py
music_assistant/server/providers/spotify/__init__.py

index 8374afa9c97debac1a057860945c6a070e675dc8..78cc15ae01f37aaa1e436280354ddac563d78143 100644 (file)
@@ -445,3 +445,14 @@ class CacheCategory(IntEnum):
     PLAYER_QUEUE_STATE = 7
     MEDIA_INFO = 8
     LIBRARY_ITEMS = 9
+
+
+class VolumeNormalizationMode(StrEnum):
+    """Enum with possible VolumeNormalization modes."""
+
+    DISABLED = "disabled"
+    DYNAMIC = "dynamic"
+    MEASUREMENT_ONLY = "measurement_only"
+    FALLBACK_FIXED_GAIN = "fallback_fixed_gain"
+    FIXED_GAIN = "fixed_gain"
+    FALLBACK_DYNAMIC = "fallback_dynamic"
index 5dc0c6b698d9b9164aed182bffdfdba473cc9b41..9740e9ce64c41632d6164e0719e3f72b16907cc6 100644 (file)
@@ -7,7 +7,7 @@ from typing import Any
 
 from mashumaro import DataClassDictMixin
 
-from music_assistant.common.models.enums import MediaType, StreamType
+from music_assistant.common.models.enums import MediaType, StreamType, VolumeNormalizationMode
 from music_assistant.common.models.media_items import AudioFormat
 
 
@@ -44,11 +44,10 @@ class StreamDetails(DataClassDictMixin):
     # the fields below will be set/controlled by the streamcontroller
     seek_position: int = 0
     fade_in: bool = False
-    enable_volume_normalization: bool = False
     loudness: float | None = None
     loudness_album: float | None = None
     prefer_album_loudness: bool = False
-    force_dynamic_volume_normalization: bool = False
+    volume_normalization_mode: VolumeNormalizationMode | None = None
     queue_id: str | None = None
     seconds_streamed: float | None = None
     target_loudness: float | None = None
index ea3c9feb2146f8f563bc846eb2e7aeb02d27b966..820f909aa07a850d24bf32f4dc9021a246f8e210 100644 (file)
@@ -71,6 +71,9 @@ CONF_PREVENT_SYNC_LEADER_OFF: Final[str] = "prevent_sync_leader_off"
 CONF_SYNCGROUP_DEFAULT_ON: Final[str] = "syncgroup_default_on"
 CONF_ENABLE_ICY_METADATA: Final[str] = "enable_icy_metadata"
 CONF_VOLUME_NORMALIZATION_RADIO: Final[str] = "volume_normalization_radio"
+CONF_VOLUME_NORMALIZATION_TRACKS: Final[str] = "volume_normalization_tracks"
+CONF_VOLUME_NORMALIZATION_FIXED_GAIN_RADIO: Final[str] = "volume_normalization_fixed_gain_radio"
+CONF_VOLUME_NORMALIZATION_FIXED_GAIN_TRACKS: Final[str] = "volume_normalization_fixed_gain_tracks"
 
 # config default values
 DEFAULT_HOST: Final[str] = "0.0.0.0"
index f02c416aac8c2f7837a48f54929af64f4f96974a..411f96e4ce175644d3b767de17044b05578b14f5 100644 (file)
@@ -76,6 +76,7 @@ class CompareState(TypedDict):
     current_index: int | None
     elapsed_time: int
     stream_title: str | None
+    content_type: str | None
 
 
 class PlayerQueuesController(CoreController):
@@ -659,7 +660,7 @@ class PlayerQueuesController(CoreController):
         while True:
             try:
                 if (next_index := self._get_next_index(queue_id, idx, True)) is not None:
-                    await self.play_index(queue_id, next_index)
+                    await self.play_index(queue_id, next_index, debounce=True)
                 break
             except MediaNotFoundError:
                 self.logger.warning(
@@ -683,7 +684,7 @@ class PlayerQueuesController(CoreController):
         current_index = self._queues[queue_id].current_index
         if current_index is None:
             return
-        await self.play_index(queue_id, max(current_index - 1, 0))
+        await self.play_index(queue_id, max(current_index - 1, 0), debounce=True)
 
     @api_command("player_queues/skip")
     async def skip(self, queue_id: str, seconds: int = 10) -> None:
@@ -765,6 +766,7 @@ class PlayerQueuesController(CoreController):
         index: int | str,
         seek_position: int = 0,
         fade_in: bool = False,
+        debounce: bool = False,
     ) -> None:
         """Play item at index (or item_id) X in queue."""
         queue = self._queues[queue_id]
@@ -786,6 +788,9 @@ class PlayerQueuesController(CoreController):
         queue.stream_finished = False
         queue.end_of_track_reached = False
 
+        queue.current_item = queue_item
+        self.signal_update(queue_id)
+
         # work out if we are playing an album and if we should prefer album loudness
         if (
             next_index is not None
@@ -821,13 +826,14 @@ class PlayerQueuesController(CoreController):
         # NOTE that we debounce this a bit to account for someone hitting the next button
         # like a madman. This will prevent the player from being overloaded with requests.
         self.mass.call_later(
-            0.25,
+            1 if debounce else 0.1,
             self.mass.players.play_media,
             player_id=queue_id,
             # transform into PlayerMedia to send to the actual player implementation
             media=self.player_media_from_queue_item(queue_item, queue.flow_mode),
             task_id=f"play_media_{queue_id}",
         )
+        self.signal_update(queue_id)
 
     @api_command("player_queues/transfer")
     async def transfer_queue(
@@ -981,6 +987,9 @@ class PlayerQueuesController(CoreController):
             stream_title=queue.current_item.streamdetails.stream_title
             if queue.current_item and queue.current_item.streamdetails
             else None,
+            content_type=queue.current_item.streamdetails.audio_format.output_format_str
+            if queue.current_item and queue.current_item.streamdetails
+            else None,
         )
         changed_keys = get_changed_keys(prev_state, new_state)
         # return early if nothing changed
@@ -1148,10 +1157,6 @@ class PlayerQueuesController(CoreController):
         # enqueue the next track as soon as the player reports
         # it has started buffering the given queue item
         task_id = f"enqueue_next_{queue_id}"
-        self.mass.call_later(0.2, self._enqueue_next, queue, item_id, task_id=task_id)
-        # we repeat this task once more after 2 seconds to ensure the player
-        # received the command as it may be missed at the first attempt
-        # due to a race condition
         self.mass.call_later(2, self._enqueue_next, queue, item_id, task_id=task_id)
 
     # Main queue manipulation methods
@@ -1225,7 +1230,7 @@ class PlayerQueuesController(CoreController):
             if queue.index_in_buffer is not None:
                 task_id = f"enqueue_next_{queue.queue_id}"
                 self.mass.call_later(
-                    1, self._enqueue_next, queue, queue.index_in_buffer, task_id=task_id
+                    5, self._enqueue_next, queue, queue.index_in_buffer, task_id=task_id
                 )
 
         # always send the base event
index e51d7765166aee7e6eb4ca15c7272428abaaf9a5..88731633a8b825c2769d1febe214f69f00099d41 100644 (file)
@@ -24,7 +24,13 @@ from music_assistant.common.models.config_entries import (
     ConfigValueOption,
     ConfigValueType,
 )
-from music_assistant.common.models.enums import ConfigEntryType, ContentType, MediaType, StreamType
+from music_assistant.common.models.enums import (
+    ConfigEntryType,
+    ContentType,
+    MediaType,
+    StreamType,
+    VolumeNormalizationMode,
+)
 from music_assistant.common.models.errors import QueueEmpty
 from music_assistant.common.models.media_items import AudioFormat
 from music_assistant.common.models.streamdetails import StreamDetails
@@ -39,7 +45,10 @@ from music_assistant.constants import (
     CONF_PUBLISH_IP,
     CONF_SAMPLE_RATES,
     CONF_VOLUME_NORMALIZATION,
+    CONF_VOLUME_NORMALIZATION_FIXED_GAIN_RADIO,
+    CONF_VOLUME_NORMALIZATION_FIXED_GAIN_TRACKS,
     CONF_VOLUME_NORMALIZATION_RADIO,
+    CONF_VOLUME_NORMALIZATION_TRACKS,
     MASS_LOGO_ONLINE,
     SILENCE_FILE,
     VERBOSE_LOG_LEVEL,
@@ -49,7 +58,6 @@ from music_assistant.server.helpers.audio import (
     check_audio_support,
     crossfade_pcm_parts,
     get_chunksize,
-    get_ffmpeg_stream,
     get_hls_radio_stream,
     get_hls_substream,
     get_icy_radio_stream,
@@ -58,6 +66,8 @@ from music_assistant.server.helpers.audio import (
     get_silence,
     get_stream_details,
 )
+from music_assistant.server.helpers.ffmpeg import LOGGER as FFMPEG_LOGGER
+from music_assistant.server.helpers.ffmpeg import get_ffmpeg_stream
 from music_assistant.server.helpers.util import get_ips
 from music_assistant.server.helpers.webserver import Webserver
 from music_assistant.server.models.core_controller import CoreController
@@ -70,12 +80,13 @@ if TYPE_CHECKING:
 
 
 DEFAULT_STREAM_HEADERS = {
+    "Server": "Music Assistant",
     "transferMode.dlna.org": "Streaming",
     "contentFeatures.dlna.org": "DLNA.ORG_OP=00;DLNA.ORG_CI=0;DLNA.ORG_FLAGS=0d500000000000000000000000000000",  # noqa: E501
     "Cache-Control": "no-cache,must-revalidate",
     "Pragma": "no-cache",
-    "Connection": "close",
     "Accept-Ranges": "none",
+    "Connection": "close",
 }
 ICY_HEADERS = {
     "icy-name": "Music Assistant",
@@ -146,6 +157,44 @@ class StreamsController(CoreController):
                 "Make sure that this server can be reached "
                 "on the given IP and TCP port by players on the local network.",
             ),
+            ConfigEntry(
+                key=CONF_VOLUME_NORMALIZATION_RADIO,
+                type=ConfigEntryType.STRING,
+                default_value=VolumeNormalizationMode.FALLBACK_DYNAMIC,
+                label="Volume normalization method for radio streams",
+                options=(
+                    ConfigValueOption(x.value.replace("_", " ").title(), x.value)
+                    for x in VolumeNormalizationMode
+                ),
+                category="audio",
+            ),
+            ConfigEntry(
+                key=CONF_VOLUME_NORMALIZATION_TRACKS,
+                type=ConfigEntryType.STRING,
+                default_value=VolumeNormalizationMode.FALLBACK_DYNAMIC,
+                label="Volume normalization method for tracks",
+                options=(
+                    ConfigValueOption(x.value.replace("_", " ").title(), x.value)
+                    for x in VolumeNormalizationMode
+                ),
+                category="audio",
+            ),
+            ConfigEntry(
+                key=CONF_VOLUME_NORMALIZATION_FIXED_GAIN_RADIO,
+                type=ConfigEntryType.FLOAT,
+                range=(-20, 10),
+                default_value=-6,
+                label="Fixed/fallback gain adjustment for radio streams",
+                category="audio",
+            ),
+            ConfigEntry(
+                key=CONF_VOLUME_NORMALIZATION_FIXED_GAIN_TRACKS,
+                type=ConfigEntryType.FLOAT,
+                range=(-20, 10),
+                default_value=-6,
+                label="Fixed/fallback gain adjustment for tracks",
+                category="audio",
+            ),
             ConfigEntry(
                 key=CONF_PUBLISH_IP,
                 type=ConfigEntryType.STRING,
@@ -171,26 +220,6 @@ class StreamsController(CoreController):
                 "not be adjusted in regular setups.",
                 category="advanced",
             ),
-            ConfigEntry(
-                key=CONF_VOLUME_NORMALIZATION_RADIO,
-                type=ConfigEntryType.STRING,
-                default_value="standard",
-                label="Volume normalization method to use for radio streams",
-                description="Radio streams often have varying loudness levels, especially "
-                "during announcements and commercials. \n"
-                "You can choose to enforce dynamic volume normalization to radio streams, "
-                "even if a (average) loudness measurement for the radio station exists. \n\n"
-                "Options: \n"
-                "- Disabled - do not apply volume normalization at all \n"
-                "- Force dynamic - Enforce dynamic volume levelling at all times \n"
-                "- Standard - use normalization based on previous measurement, ",
-                options=(
-                    ConfigValueOption("Disabled", "disabled"),
-                    ConfigValueOption("Force dynamic", "dynamic"),
-                    ConfigValueOption("Standard", "standard"),
-                ),
-                category="advanced",
-            ),
         )
 
     async def setup(self, config: CoreConfig) -> None:
@@ -211,8 +240,9 @@ class StreamsController(CoreController):
             version,
             "with libsoxr support" if libsoxr_support else "",
         )
-        # copy log level to audio module
+        # copy log level to audio/ffmpeg loggers
         AUDIO_LOGGER.setLevel(self.logger.level)
+        FFMPEG_LOGGER.setLevel(self.logger.level)
         # start the webserver
         self.publish_port = config.get_value(CONF_BIND_PORT)
         self.publish_ip = config.get_value(CONF_PUBLISH_IP)
@@ -305,9 +335,6 @@ class StreamsController(CoreController):
         headers = {
             **DEFAULT_STREAM_HEADERS,
             "Content-Type": f"audio/{output_format.output_format_str}",
-            "Accept-Ranges": "none",
-            "Cache-Control": "no-cache",
-            "Connection": "close",
             "icy-name": queue_item.name,
         }
         resp = web.StreamResponse(
@@ -769,23 +796,41 @@ class StreamsController(CoreController):
         filter_params = []
         extra_input_args = []
         # handle volume normalization
-        if streamdetails.enable_volume_normalization and streamdetails.target_loudness is not None:
-            if streamdetails.force_dynamic_volume_normalization or streamdetails.loudness is None:
-                # volume normalization with unknown loudness measurement
-                # use loudnorm filter in dynamic mode
-                # which also collects the measurement on the fly during playback
-                # more info: https://k.ylo.ph/2016/04/04/loudnorm.html
-                filter_rule = (
-                    f"loudnorm=I={streamdetails.target_loudness}:TP=-2.0:LRA=10.0:offset=0.0"
-                )
-                filter_rule += ":print_format=json"
-                filter_params.append(filter_rule)
-            else:
-                # volume normalization with known loudness measurement
-                # apply fixed volume/gain correction
-                gain_correct = streamdetails.target_loudness - streamdetails.loudness
-                gain_correct = round(gain_correct, 2)
-                filter_params.append(f"volume={gain_correct}dB")
+        enable_volume_normalization = (
+            streamdetails.target_loudness is not None
+            and streamdetails.volume_normalization_mode != VolumeNormalizationMode.DISABLED
+        )
+        dynamic_volume_normalization = (
+            streamdetails.volume_normalization_mode == VolumeNormalizationMode.DYNAMIC
+            and enable_volume_normalization
+        )
+        if dynamic_volume_normalization:
+            # volume normalization using loudnorm filter (in dynamic mode)
+            # which also collects the measurement on the fly during playback
+            # more info: https://k.ylo.ph/2016/04/04/loudnorm.html
+            filter_rule = f"loudnorm=I={streamdetails.target_loudness}:TP=-2.0:LRA=10.0:offset=0.0"
+            filter_rule += ":print_format=json"
+            filter_params.append(filter_rule)
+        elif (
+            enable_volume_normalization
+            and streamdetails.volume_normalization_mode == VolumeNormalizationMode.FIXED_GAIN
+        ):
+            # apply used defined fixed volume/gain correction
+            gain_correct: float = await self.mass.config.get_core_config_value(
+                CONF_VOLUME_NORMALIZATION_FIXED_GAIN_RADIO
+                if streamdetails.media_type == MediaType.RADIO
+                else CONF_VOLUME_NORMALIZATION_FIXED_GAIN_TRACKS,
+            )
+            gain_correct = round(gain_correct, 2)
+            filter_params.append(f"volume={gain_correct}dB")
+        elif enable_volume_normalization and streamdetails.loudness is not None:
+            # volume normalization with known loudness measurement
+            # apply volume/gain correction
+            gain_correct = streamdetails.target_loudness - streamdetails.loudness
+            gain_correct = round(gain_correct, 2)
+            filter_params.append(f"volume={gain_correct}dB")
+
+        # work out audio source for these streamdetails
         if streamdetails.stream_type == StreamType.CUSTOM:
             audio_source = self.mass.get_provider(streamdetails.provider).get_audio_stream(
                 streamdetails,
@@ -819,8 +864,9 @@ class StreamsController(CoreController):
         if streamdetails.media_type == MediaType.RADIO:
             # pad some silence before the radio stream starts to create some headroom
             # for radio stations that do not provide any look ahead buffer
-            # without this, some radio streams jitter a lot
-            async for chunk in get_silence(2, pcm_format):
+            # without this, some radio streams jitter a lot, especially with dynamic normalization
+            pad_seconds = 5 if dynamic_volume_normalization else 2
+            async for chunk in get_silence(pad_seconds, pcm_format):
                 yield chunk
 
         async for chunk in get_media_stream(
index 4809b8ee7e93efa1e271e8587069f46412461661..84f08adda9a306f7d697871e6b82720a2288621a 100644 (file)
@@ -10,23 +10,18 @@ import struct
 import time
 from collections import deque
 from collections.abc import AsyncGenerator
-from contextlib import suppress
 from io import BytesIO
-from signal import SIGINT
 from typing import TYPE_CHECKING
 
 import aiofiles
 from aiohttp import ClientTimeout
 
-from music_assistant.common.helpers.global_cache import (
-    get_global_cache_value,
-    set_global_cache_values,
-)
+from music_assistant.common.helpers.global_cache import set_global_cache_values
 from music_assistant.common.helpers.json import JSON_DECODE_EXCEPTIONS, json_loads
 from music_assistant.common.helpers.util import clean_stream_title
-from music_assistant.common.models.enums import MediaType, StreamType
+from music_assistant.common.models.config_entries import CoreConfig, PlayerConfig
+from music_assistant.common.models.enums import MediaType, StreamType, VolumeNormalizationMode
 from music_assistant.common.models.errors import (
-    AudioError,
     InvalidDataError,
     MediaNotFoundError,
     MusicAssistantError,
@@ -41,20 +36,16 @@ from music_assistant.constants import (
     CONF_VOLUME_NORMALIZATION,
     CONF_VOLUME_NORMALIZATION_RADIO,
     CONF_VOLUME_NORMALIZATION_TARGET,
+    CONF_VOLUME_NORMALIZATION_TRACKS,
     MASS_LOGGER_NAME,
     VERBOSE_LOG_LEVEL,
 )
-from music_assistant.server.helpers.playlists import (
-    HLS_CONTENT_TYPES,
-    IsHLSPlaylist,
-    PlaylistItem,
-    fetch_playlist,
-    parse_m3u,
-)
-from music_assistant.server.helpers.tags import parse_tags
-from music_assistant.server.helpers.throttle_retry import BYPASS_THROTTLER
 
+from .ffmpeg import FFMpeg, get_ffmpeg_stream
+from .playlists import HLS_CONTENT_TYPES, IsHLSPlaylist, PlaylistItem, fetch_playlist, parse_m3u
 from .process import AsyncProcess, check_output, communicate
+from .tags import parse_tags
+from .throttle_retry import BYPASS_THROTTLER
 from .util import create_tempfile
 
 if TYPE_CHECKING:
@@ -69,142 +60,6 @@ HTTP_HEADERS = {"User-Agent": "Lavf/60.16.100.MusicAssistant"}
 HTTP_HEADERS_ICY = {**HTTP_HEADERS, "Icy-MetaData": "1"}
 
 
-class FFMpeg(AsyncProcess):
-    """FFMpeg wrapped as AsyncProcess."""
-
-    def __init__(
-        self,
-        audio_input: AsyncGenerator[bytes, None] | str | int,
-        input_format: AudioFormat,
-        output_format: AudioFormat,
-        filter_params: list[str] | None = None,
-        extra_args: list[str] | None = None,
-        extra_input_args: list[str] | None = None,
-        audio_output: str | int = "-",
-        collect_log_history: bool = False,
-        logger: logging.Logger | None = None,
-    ) -> None:
-        """Initialize AsyncProcess."""
-        ffmpeg_args = get_ffmpeg_args(
-            input_format=input_format,
-            output_format=output_format,
-            filter_params=filter_params or [],
-            extra_args=extra_args or [],
-            input_path=audio_input if isinstance(audio_input, str) else "-",
-            output_path=audio_output if isinstance(audio_output, str) else "-",
-            extra_input_args=extra_input_args or [],
-            loglevel="info",
-        )
-        self.audio_input = audio_input
-        self.input_format = input_format
-        self.collect_log_history = collect_log_history
-        self.log_history: deque[str] = deque(maxlen=100)
-        self._stdin_task: asyncio.Task | None = None
-        self._logger_task: asyncio.Task | None = None
-        super().__init__(
-            ffmpeg_args,
-            stdin=True if isinstance(audio_input, str | AsyncGenerator) else audio_input,
-            stdout=True if isinstance(audio_output, str) else audio_output,
-            stderr=True,
-        )
-        self.logger = logger or LOGGER.getChild("ffmpeg")
-        clean_args = []
-        for arg in ffmpeg_args[1:]:
-            if arg.startswith("http"):
-                clean_args.append("<URL>")
-            elif "/" in arg and "." in arg:
-                clean_args.append("<FILE>")
-            else:
-                clean_args.append(arg)
-        args_str = " ".join(clean_args)
-        self.logger.log(VERBOSE_LOG_LEVEL, "starting ffmpeg with args: %s", args_str)
-
-    async def start(self) -> None:
-        """Perform Async init of process."""
-        await super().start()
-        self._logger_task = asyncio.create_task(self._log_reader_task())
-        if isinstance(self.audio_input, AsyncGenerator):
-            self._stdin_task = asyncio.create_task(self._feed_stdin())
-
-    async def close(self, send_signal: bool = True) -> None:
-        """Close/terminate the process and wait for exit."""
-        if self._stdin_task and not self._stdin_task.done():
-            self._stdin_task.cancel()
-        if not self.collect_log_history:
-            await super().close(send_signal)
-            return
-        # override close logic to make sure we catch all logging
-        self._close_called = True
-        if send_signal and self.returncode is None:
-            self.proc.send_signal(SIGINT)
-        if self.proc.stdin and not self.proc.stdin.is_closing():
-            self.proc.stdin.close()
-            await asyncio.sleep(0)  # yield to loop
-        # abort existing readers on stdout first before we send communicate
-        waiter: asyncio.Future
-        if self.proc.stdout and (waiter := self.proc.stdout._waiter):
-            self.proc.stdout._waiter = None
-            if waiter and not waiter.done():
-                waiter.set_exception(asyncio.CancelledError())
-            # read remaining bytes to unblock pipe
-            await self.read(-1)
-        # wait for log task to complete that reads the remaining data from stderr
-        with suppress(TimeoutError):
-            await asyncio.wait_for(self._logger_task, 5)
-        await super().close(False)
-
-    async def _log_reader_task(self) -> None:
-        """Read ffmpeg log from stderr."""
-        decode_errors = 0
-        async for line in self.iter_stderr():
-            if self.collect_log_history:
-                self.log_history.append(line)
-            if "error" in line or "warning" in line:
-                self.logger.debug(line)
-            elif "critical" in line:
-                self.logger.warning(line)
-            else:
-                self.logger.log(VERBOSE_LOG_LEVEL, line)
-
-            if "Invalid data found when processing input" in line:
-                decode_errors += 1
-            if decode_errors >= 50:
-                self.logger.error(line)
-                await super().close(True)
-
-            # if streamdetails contenttype is unknown, try parse it from the ffmpeg log
-            if line.startswith("Stream #") and ": Audio: " in line:
-                if self.input_format.content_type == ContentType.UNKNOWN:
-                    content_type_raw = line.split(": Audio: ")[1].split(" ")[0]
-                    content_type = ContentType.try_parse(content_type_raw)
-                    self.logger.debug(
-                        "Detected (input) content type: %s (%s)", content_type, content_type_raw
-                    )
-                    self.input_format.content_type = content_type
-            del line
-
-    async def _feed_stdin(self) -> None:
-        """Feed stdin with audio chunks from an AsyncGenerator."""
-        if TYPE_CHECKING:
-            self.audio_input: AsyncGenerator[bytes, None]
-        try:
-            async for chunk in self.audio_input:
-                await self.write(chunk)
-            # write EOF once we've reached the end of the input stream
-            await self.write_eof()
-        except Exception as err:
-            if isinstance(err, asyncio.CancelledError):
-                return
-            # make sure we dont swallow any exceptions and we bail out
-            # once our audio source fails.
-            self.logger.error(
-                "Stream error: %s",
-                str(err),
-                exc_info=err if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL) else None,
-            )
-            await self.write_eof()
-
-
 async def crossfade_pcm_parts(
     fade_in_part: bytes,
     fade_out_part: bytes,
@@ -389,6 +244,7 @@ async def get_stream_details(
     streamdetails.fade_in = fade_in
     if not streamdetails.duration:
         streamdetails.duration = queue_item.duration
+
     # handle volume normalization details
     if result := await mass.music.get_loudness(
         streamdetails.item_id,
@@ -398,16 +254,11 @@ async def get_stream_details(
         streamdetails.loudness, streamdetails.loudness_album = result
     streamdetails.prefer_album_loudness = prefer_album_loudness
     player_settings = await mass.config.get_player_config(streamdetails.queue_id)
-    streamdetails.enable_volume_normalization = player_settings.get_value(CONF_VOLUME_NORMALIZATION)
-    streamdetails.target_loudness = player_settings.get_value(CONF_VOLUME_NORMALIZATION_TARGET)
-
-    radio_norm_pref = await mass.config.get_core_config_value(
-        "streams", CONF_VOLUME_NORMALIZATION_RADIO
+    core_config = await mass.config.get_core_config("streams")
+    streamdetails.volume_normalization_mode = _get_normalization_mode(
+        core_config, player_settings, streamdetails
     )
-    if streamdetails.media_type == MediaType.RADIO and radio_norm_pref == "disabled":
-        streamdetails.enable_volume_normalization = False
-    elif streamdetails.media_type == MediaType.RADIO and radio_norm_pref == "dynamic":
-        streamdetails.force_dynamic_volume_normalization = True
+    streamdetails.target_loudness = player_settings.get_value(CONF_VOLUME_NORMALIZATION_TARGET)
 
     process_time = int((time.time() - time_start) * 1000)
     LOGGER.debug("retrieved streamdetails for %s in %s milliseconds", queue_item.uri, process_time)
@@ -434,90 +285,85 @@ async def get_media_stream(
     chunk_number = 0
     buffer: bytes = b""
     finished = False
+
+    ffmpeg_proc = FFMpeg(
+        audio_input=audio_source,
+        input_format=streamdetails.audio_format,
+        output_format=pcm_format,
+        filter_params=filter_params,
+        extra_input_args=extra_input_args,
+        collect_log_history=True,
+    )
     try:
-        async with FFMpeg(
-            audio_input=audio_source,
-            input_format=streamdetails.audio_format,
-            output_format=pcm_format,
-            filter_params=filter_params,
-            extra_input_args=extra_input_args,
-            collect_log_history=True,
-            logger=logger,
-        ) as ffmpeg_proc:
-            async for chunk in ffmpeg_proc.iter_chunked(pcm_format.pcm_sample_size):
-                # for radio streams we just yield all chunks directly
-                if streamdetails.media_type == MediaType.RADIO:
-                    yield chunk
-                    bytes_sent += len(chunk)
-                    continue
+        await ffmpeg_proc.start()
+        async for chunk in ffmpeg_proc.iter_chunked(pcm_format.pcm_sample_size):
+            # for radio streams we just yield all chunks directly
+            if streamdetails.media_type == MediaType.RADIO:
+                yield chunk
+                bytes_sent += len(chunk)
+                continue
 
-                chunk_number += 1
-                # determine buffer size dynamically
-                if chunk_number < 5 and strip_silence_begin:
-                    req_buffer_size = int(pcm_format.pcm_sample_size * 4)
-                elif chunk_number > 30 and strip_silence_end:
-                    req_buffer_size = int(pcm_format.pcm_sample_size * 8)
-                else:
-                    req_buffer_size = int(pcm_format.pcm_sample_size * 2)
-
-                # always append to buffer
-                buffer += chunk
-                del chunk
-
-                if len(buffer) < req_buffer_size:
-                    # buffer is not full enough, move on
-                    continue
+            chunk_number += 1
+            # determine buffer size dynamically
+            if chunk_number < 5 and strip_silence_begin:
+                req_buffer_size = int(pcm_format.pcm_sample_size * 4)
+            elif chunk_number > 30 and strip_silence_end:
+                req_buffer_size = int(pcm_format.pcm_sample_size * 8)
+            else:
+                req_buffer_size = int(pcm_format.pcm_sample_size * 2)
 
-                if chunk_number == 5 and strip_silence_begin:
-                    # strip silence from begin of audio
-                    chunk = await strip_silence(  # noqa: PLW2901
-                        mass, buffer, pcm_format=pcm_format
-                    )
-                    bytes_sent += len(chunk)
-                    yield chunk
-                    buffer = b""
-                    continue
+            # always append to buffer
+            buffer += chunk
+            del chunk
+
+            if len(buffer) < req_buffer_size:
+                # buffer is not full enough, move on
+                continue
 
-                #### OTHER: enough data in buffer, feed to output
-                while len(buffer) > req_buffer_size:
-                    yield buffer[: pcm_format.pcm_sample_size]
-                    bytes_sent += pcm_format.pcm_sample_size
-                    buffer = buffer[pcm_format.pcm_sample_size :]
-
-            # end of audio/track reached
-            if strip_silence_end and buffer:
-                # strip silence from end of audio
-                buffer = await strip_silence(
-                    mass,
-                    buffer,
-                    pcm_format=pcm_format,
-                    reverse=True,
+            if chunk_number == 5 and strip_silence_begin:
+                # strip silence from begin of audio
+                chunk = await strip_silence(  # noqa: PLW2901
+                    mass, buffer, pcm_format=pcm_format
                 )
-            # send remaining bytes in buffer
-            bytes_sent += len(buffer)
-            yield buffer
-            del buffer
-
-            if bytes_sent == 0:
-                # edge case: no audio data was sent
-                streamdetails.stream_error = True
-                finished = False
-                logger.warning("Stream error on %s", streamdetails.uri)
-                # we send a bit of silence so players get at least some data
-                # without it, some players refuse to skip to the next track
-                async for chunk in get_silence(6, pcm_format):
-                    yield chunk
-                    bytes_sent += len(chunk)
-            else:
-                finished = True
+                bytes_sent += len(chunk)
+                yield chunk
+                buffer = b""
+                continue
+
+            #### OTHER: enough data in buffer, feed to output
+            while len(buffer) > req_buffer_size:
+                yield buffer[: pcm_format.pcm_sample_size]
+                bytes_sent += pcm_format.pcm_sample_size
+                buffer = buffer[pcm_format.pcm_sample_size :]
+
+        # end of audio/track reached
+        if strip_silence_end and buffer:
+            # strip silence from end of audio
+            buffer = await strip_silence(
+                mass,
+                buffer,
+                pcm_format=pcm_format,
+                reverse=True,
+            )
+        # send remaining bytes in buffer
+        bytes_sent += len(buffer)
+        yield buffer
+        del buffer
+
+        if bytes_sent == 0:
+            # edge case: no audio data was sent
+            streamdetails.stream_error = True
+            finished = False
+            logger.warning("Stream error on %s", streamdetails.uri)
+            # we send a bit of silence so players get at least some data
+            # without it, some players refuse to skip to the next track
+            async for chunk in get_silence(6, pcm_format):
+                yield chunk
+                bytes_sent += len(chunk)
+        else:
+            finished = True
     finally:
-        if "ffmpeg_proc" not in locals():
-            # edge case: ffmpeg process was not yet started
-            return  # noqa: B012
-        if finished and not ffmpeg_proc.closed:
-            await asyncio.wait_for(ffmpeg_proc.wait(), 60)
-        elif not ffmpeg_proc.closed:
-            await ffmpeg_proc.close()
+        await ffmpeg_proc.close()
 
         # try to determine how many seconds we've streamed
         seconds_streamed = bytes_sent / pcm_format.pcm_sample_size if bytes_sent else 0
@@ -535,10 +381,15 @@ async def get_media_stream(
             streamdetails.duration = seconds_streamed
 
         # parse loudnorm data if we have that collected
-        required_seconds = 600 if streamdetails.media_type == MediaType.RADIO else 120
-        if streamdetails.loudness is None and (finished or (seconds_streamed >= required_seconds)):
-            loudness_details = parse_loudnorm(" ".join(ffmpeg_proc.log_history))
-            if loudness_details is not None:
+        if (
+            streamdetails.loudness is None
+            and streamdetails.volume_normalization_mode != VolumeNormalizationMode.DISABLED
+            and (finished or (seconds_streamed >= 60))
+        ):
+            # if dynamic volume normalization is enabled and the entire track is streamed
+            # the loudnorm filter will output the measuremeet in the log,
+            # so we can use those directly instead of analyzing the audio
+            if loudness_details := parse_loudnorm(" ".join(ffmpeg_proc.log_history)):
                 logger.debug(
                     "Loudness measurement for %s: %s dB",
                     streamdetails.uri,
@@ -553,6 +404,12 @@ async def get_media_stream(
                         media_type=streamdetails.media_type,
                     )
                 )
+            else:
+                # no data from loudnorm filter found, we need to analyze the audio
+                # add background task to start analyzing the audio
+                task_id = f"analyze_loudness_{streamdetails.uri}"
+                mass.create_task(analyze_loudness, mass, streamdetails, task_id=task_id)
+
         # report playback
         if finished or seconds_streamed > 30:
             mass.create_task(
@@ -950,41 +807,6 @@ async def get_file_stream(
             yield data
 
 
-async def get_ffmpeg_stream(
-    audio_input: AsyncGenerator[bytes, None] | str,
-    input_format: AudioFormat,
-    output_format: AudioFormat,
-    filter_params: list[str] | None = None,
-    extra_args: list[str] | None = None,
-    chunk_size: int | None = None,
-    extra_input_args: list[str] | None = None,
-    logger: logging.Logger | None = None,
-) -> AsyncGenerator[bytes, None]:
-    """
-    Get the ffmpeg audio stream as async generator.
-
-    Takes care of resampling and/or recoding if needed,
-    according to player preferences.
-    """
-    async with FFMpeg(
-        audio_input=audio_input,
-        input_format=input_format,
-        output_format=output_format,
-        filter_params=filter_params,
-        extra_args=extra_args,
-        extra_input_args=extra_input_args,
-        logger=logger,
-    ) as ffmpeg_proc:
-        # read final chunks from stdout
-        iterator = (
-            ffmpeg_proc.iter_chunked(chunk_size)
-            if chunk_size
-            else ffmpeg_proc.iter_any(get_chunksize(output_format))
-        )
-        async for chunk in iterator:
-            yield chunk
-
-
 async def check_audio_support() -> tuple[bool, bool, str]:
     """Check if ffmpeg is present (with/without libsoxr support)."""
     # check for FFmpeg presence
@@ -1072,7 +894,7 @@ def get_chunksize(
     if fmt.content_type in (ContentType.WAV, ContentType.AIFF, ContentType.DSF):
         return pcm_size
     if fmt.content_type in (ContentType.FLAC, ContentType.WAVPACK, ContentType.ALAC):
-        return int(pcm_size * 0.5)
+        return int(pcm_size * 0.8)
     if fmt.content_type in (ContentType.MP3, ContentType.OGG):
         return int((320000 / 8) * seconds)
     if fmt.content_type in (ContentType.AAC, ContentType.M4A):
@@ -1112,142 +934,6 @@ def get_player_filter_params(
     return filter_params
 
 
-def get_ffmpeg_args(
-    input_format: AudioFormat,
-    output_format: AudioFormat,
-    filter_params: list[str],
-    extra_args: list[str] | None = None,
-    input_path: str = "-",
-    output_path: str = "-",
-    extra_input_args: list[str] | None = None,
-    loglevel: str = "error",
-) -> list[str]:
-    """Collect all args to send to the ffmpeg process."""
-    if extra_args is None:
-        extra_args = []
-    ffmpeg_present, libsoxr_support, version = get_global_cache_value("ffmpeg_support")
-    if not ffmpeg_present:
-        msg = (
-            "FFmpeg binary is missing from system."
-            "Please install ffmpeg on your OS to enable playback."
-        )
-        raise AudioError(
-            msg,
-        )
-
-    major_version = int("".join(char for char in version.split(".")[0] if not char.isalpha()))
-
-    # generic args
-    generic_args = [
-        "ffmpeg",
-        "-hide_banner",
-        "-loglevel",
-        loglevel,
-        "-nostats",
-        "-ignore_unknown",
-        "-protocol_whitelist",
-        "file,hls,http,https,tcp,tls,crypto,pipe,data,fd,rtp,udp",
-    ]
-    # collect input args
-    input_args = []
-    if extra_input_args:
-        input_args += extra_input_args
-    if input_path.startswith("http"):
-        # append reconnect options for direct stream from http
-        input_args += [
-            "-reconnect",
-            "1",
-            "-reconnect_streamed",
-            "1",
-        ]
-        if major_version > 4:
-            # these options are only supported in ffmpeg > 5
-            input_args += [
-                "-reconnect_on_network_error",
-                "1",
-                "-reconnect_on_http_error",
-                "5xx,4xx",
-            ]
-    if input_format.content_type.is_pcm():
-        input_args += [
-            "-ac",
-            str(input_format.channels),
-            "-channel_layout",
-            "mono" if input_format.channels == 1 else "stereo",
-            "-ar",
-            str(input_format.sample_rate),
-            "-acodec",
-            input_format.content_type.name.lower(),
-            "-f",
-            input_format.content_type.value,
-            "-i",
-            input_path,
-        ]
-    else:
-        # let ffmpeg auto detect the content type from the metadata/headers
-        input_args += ["-i", input_path]
-
-    # collect output args
-    output_args = []
-    if output_path.upper() == "NULL":
-        # devnull stream
-        output_args = ["-f", "null", "-"]
-    elif output_format.content_type == ContentType.UNKNOWN:
-        raise RuntimeError("Invalid output format specified")
-    elif output_format.content_type == ContentType.AAC:
-        output_args = ["-f", "adts", "-c:a", "aac", "-b:a", "256k", output_path]
-    elif output_format.content_type == ContentType.MP3:
-        output_args = ["-f", "mp3", "-b:a", "320k", output_path]
-    else:
-        if output_format.content_type.is_pcm():
-            output_args += ["-acodec", output_format.content_type.name.lower()]
-        # use explicit format identifier for all other
-        output_args += [
-            "-f",
-            output_format.content_type.value,
-            "-ar",
-            str(output_format.sample_rate),
-            "-ac",
-            str(output_format.channels),
-        ]
-        if output_format.output_format_str == "flac":
-            output_args += ["-compression_level", "6"]
-        output_args += [output_path]
-
-    # edge case: source file is not stereo - downmix to stereo
-    if input_format.channels > 2 and output_format.channels == 2:
-        filter_params = [
-            "pan=stereo|FL=1.0*FL+0.707*FC+0.707*SL+0.707*LFE|FR=1.0*FR+0.707*FC+0.707*SR+0.707*LFE",
-            *filter_params,
-        ]
-
-    # determine if we need to do resampling
-    if (
-        input_format.sample_rate != output_format.sample_rate
-        or input_format.bit_depth > output_format.bit_depth
-    ):
-        # prefer resampling with libsoxr due to its high quality
-        if libsoxr_support:
-            resample_filter = "aresample=resampler=soxr:precision=30"
-        else:
-            resample_filter = "aresample=resampler=swr"
-
-        # sample rate conversion
-        if input_format.sample_rate != output_format.sample_rate:
-            resample_filter += f":osr={output_format.sample_rate}"
-
-        # bit depth conversion: apply dithering when going down to 16 bits
-        if output_format.bit_depth == 16 and input_format.bit_depth > 16:
-            resample_filter += ":osf=s16:dither_method=triangular_hp"
-
-        filter_params.append(resample_filter)
-
-    if filter_params and "-filter_complex" not in extra_args:
-        extra_args += ["-af", ",".join(filter_params)]
-
-    return generic_args + input_args + extra_args + output_args
-
-
 def parse_loudnorm(raw_stderr: bytes | str) -> float | None:
     """Parse Loudness measurement from ffmpeg stderr output."""
     stderr_data = raw_stderr.decode() if isinstance(raw_stderr, bytes) else raw_stderr
@@ -1261,3 +947,108 @@ def parse_loudnorm(raw_stderr: bytes | str) -> float | None:
     except JSON_DECODE_EXCEPTIONS:
         return None
     return float(loudness_data["input_i"])
+
+
+async def analyze_loudness(
+    mass: MusicAssistant,
+    streamdetails: StreamDetails,
+) -> None:
+    """Analyze media item's audio, to calculate EBU R128 loudness."""
+    if result := await mass.music.get_loudness(
+        streamdetails.item_id,
+        streamdetails.provider,
+        media_type=streamdetails.media_type,
+    ):
+        # only when needed we do the analyze job
+        streamdetails.loudness = result
+        return
+
+    logger = LOGGER.getChild("analyze_loudness")
+    logger.debug("Start analyzing audio for %s", streamdetails.uri)
+
+    extra_input_args = [
+        # limit to 10 minutes to reading too much in memory
+        "-t",
+        "600",
+    ]
+    if streamdetails.stream_type == StreamType.CUSTOM:
+        audio_source = mass.get_provider(streamdetails.provider).get_audio_stream(
+            streamdetails,
+        )
+    elif streamdetails.stream_type == StreamType.HLS:
+        substream = await get_hls_substream(mass, streamdetails.path)
+        audio_source = substream.path
+    elif streamdetails.stream_type == StreamType.ENCRYPTED_HTTP:
+        audio_source = streamdetails.path
+        extra_input_args += ["-decryption_key", streamdetails.decryption_key]
+    else:
+        audio_source = streamdetails.path
+
+    # calculate BS.1770 R128 integrated loudness with ffmpeg
+    async with FFMpeg(
+        audio_input=audio_source,
+        input_format=streamdetails.audio_format,
+        output_format=streamdetails.audio_format,
+        audio_output="NULL",
+        filter_params=["ebur128=framelog=verbose"],
+        extra_input_args=extra_input_args,
+        collect_log_history=True,
+    ) as ffmpeg_proc:
+        await ffmpeg_proc.wait()
+        log_lines = ffmpeg_proc.log_history
+        log_lines_str = "\n".join(log_lines)
+        try:
+            loudness_str = (
+                log_lines_str.split("Integrated loudness")[1].split("I:")[1].split("LUFS")[0]
+            )
+            loudness = float(loudness_str.strip())
+        except (IndexError, ValueError, AttributeError):
+            LOGGER.warning(
+                "Could not determine integrated loudness of %s - %s",
+                streamdetails.uri,
+                log_lines_str or "received empty value",
+            )
+        else:
+            streamdetails.loudness = loudness
+            await mass.music.set_loudness(
+                streamdetails.item_id,
+                streamdetails.provider,
+                loudness,
+                media_type=streamdetails.media_type,
+            )
+            logger.debug(
+                "Integrated loudness of %s is: %s",
+                streamdetails.uri,
+                loudness,
+            )
+
+
+def _get_normalization_mode(
+    core_config: CoreConfig, player_config: PlayerConfig, streamdetails: StreamDetails
+) -> VolumeNormalizationMode:
+    if not player_config.get_value(CONF_VOLUME_NORMALIZATION):
+        # disabled for this player
+        return VolumeNormalizationMode.DISABLED
+    # work out preference for track or radio
+    preference = VolumeNormalizationMode(
+        core_config.get_value(
+            CONF_VOLUME_NORMALIZATION_RADIO
+            if streamdetails.media_type == MediaType.RADIO
+            else CONF_VOLUME_NORMALIZATION_TRACKS,
+        )
+    )
+
+    # handle no measurement available but fallback to dynamic mode is allowed
+    if streamdetails.loudness is None and preference == VolumeNormalizationMode.FALLBACK_DYNAMIC:
+        return VolumeNormalizationMode.DYNAMIC
+
+    # handle no measurement available and no fallback allowed
+    if streamdetails.loudness is None and preference == VolumeNormalizationMode.MEASUREMENT_ONLY:
+        return VolumeNormalizationMode.DISABLED
+
+    # handle no measurement available and fallback to fixed gain is allowed
+    if streamdetails.loudness is None and preference == VolumeNormalizationMode.FALLBACK_FIXED_GAIN:
+        return VolumeNormalizationMode.FIXED_GAIN
+
+    # simply return the preference
+    return preference
diff --git a/music_assistant/server/helpers/ffmpeg.py b/music_assistant/server/helpers/ffmpeg.py
new file mode 100644 (file)
index 0000000..490beaf
--- /dev/null
@@ -0,0 +1,305 @@
+"""FFMpeg related helpers."""
+
+from __future__ import annotations
+
+import asyncio
+import logging
+from collections import deque
+from collections.abc import AsyncGenerator
+from typing import TYPE_CHECKING
+
+from music_assistant.common.helpers.global_cache import get_global_cache_value
+from music_assistant.common.models.errors import AudioError
+from music_assistant.common.models.media_items import AudioFormat, ContentType
+from music_assistant.constants import VERBOSE_LOG_LEVEL
+
+from .process import AsyncProcess
+from .util import close_async_generator
+
+LOGGER = logging.getLogger("ffmpeg")
+
+
+class FFMpeg(AsyncProcess):
+    """FFMpeg wrapped as AsyncProcess."""
+
+    def __init__(
+        self,
+        audio_input: AsyncGenerator[bytes, None] | str | int,
+        input_format: AudioFormat,
+        output_format: AudioFormat,
+        filter_params: list[str] | None = None,
+        extra_args: list[str] | None = None,
+        extra_input_args: list[str] | None = None,
+        audio_output: str | int = "-",
+        collect_log_history: bool = False,
+    ) -> None:
+        """Initialize AsyncProcess."""
+        ffmpeg_args = get_ffmpeg_args(
+            input_format=input_format,
+            output_format=output_format,
+            filter_params=filter_params or [],
+            extra_args=extra_args or [],
+            input_path=audio_input if isinstance(audio_input, str) else "-",
+            output_path=audio_output if isinstance(audio_output, str) else "-",
+            extra_input_args=extra_input_args or [],
+            loglevel="info",
+        )
+        self.audio_input = audio_input
+        self.input_format = input_format
+        self.collect_log_history = collect_log_history
+        self.log_history: deque[str] = deque(maxlen=100)
+        self._stdin_task: asyncio.Task | None = None
+        self._logger_task: asyncio.Task | None = None
+        super().__init__(
+            ffmpeg_args,
+            stdin=True if isinstance(audio_input, str | AsyncGenerator) else audio_input,
+            stdout=True if isinstance(audio_output, str) else audio_output,
+            stderr=True,
+        )
+        self.logger = LOGGER
+
+    async def start(self) -> None:
+        """Perform Async init of process."""
+        await super().start()
+        if self.proc:
+            self.logger = LOGGER.getChild(str(self.proc.pid))
+        clean_args = []
+        for arg in self._args[1:]:
+            if arg.startswith("http"):
+                clean_args.append("<URL>")
+            elif "/" in arg and "." in arg:
+                clean_args.append("<FILE>")
+            else:
+                clean_args.append(arg)
+        args_str = " ".join(clean_args)
+        self.logger.log(VERBOSE_LOG_LEVEL, "started with args: %s", args_str)
+        self._logger_task = asyncio.create_task(self._log_reader_task())
+        if isinstance(self.audio_input, AsyncGenerator):
+            self._stdin_task = asyncio.create_task(self._feed_stdin())
+
+    async def close(self, send_signal: bool = True) -> None:
+        """Close/terminate the process and wait for exit."""
+        if self.closed:
+            return
+        if self._stdin_task and not self._stdin_task.done():
+            self._stdin_task.cancel()
+        await super().close(send_signal)
+
+    async def _log_reader_task(self) -> None:
+        """Read ffmpeg log from stderr."""
+        decode_errors = 0
+        async for line in self.iter_stderr():
+            if self.collect_log_history:
+                self.log_history.append(line)
+            if "error" in line or "warning" in line:
+                self.logger.debug(line)
+            elif "critical" in line:
+                self.logger.warning(line)
+            else:
+                self.logger.log(VERBOSE_LOG_LEVEL, line)
+
+            if "Invalid data found when processing input" in line:
+                decode_errors += 1
+            if decode_errors >= 50:
+                self.logger.error(line)
+                await super().close(True)
+
+            # if streamdetails contenttype is unknown, try parse it from the ffmpeg log
+            if line.startswith("Stream #") and ": Audio: " in line:
+                if self.input_format.content_type == ContentType.UNKNOWN:
+                    content_type_raw = line.split(": Audio: ")[1].split(" ")[0]
+                    content_type = ContentType.try_parse(content_type_raw)
+                    self.logger.debug(
+                        "Detected (input) content type: %s (%s)", content_type, content_type_raw
+                    )
+                    self.input_format.content_type = content_type
+            del line
+
+    async def _feed_stdin(self) -> None:
+        """Feed stdin with audio chunks from an AsyncGenerator."""
+        if TYPE_CHECKING:
+            self.audio_input: AsyncGenerator[bytes, None]
+        generator_exhausted = False
+        try:
+            async for chunk in self.audio_input:
+                await self.write(chunk)
+            generator_exhausted = True
+        except Exception as err:
+            if isinstance(err, asyncio.CancelledError):
+                return
+            self.logger.error(
+                "Stream error: %s",
+                str(err),
+                exc_info=err if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL) else None,
+            )
+        finally:
+            await self.write_eof()
+            # we need to ensure that we close the async generator
+            # if we get cancelled otherwise it keeps lingering forever
+            if not generator_exhausted:
+                await close_async_generator(self.audio_input)
+
+
+async def get_ffmpeg_stream(
+    audio_input: AsyncGenerator[bytes, None] | str,
+    input_format: AudioFormat,
+    output_format: AudioFormat,
+    filter_params: list[str] | None = None,
+    extra_args: list[str] | None = None,
+    chunk_size: int | None = None,
+    extra_input_args: list[str] | None = None,
+) -> AsyncGenerator[bytes, None]:
+    """
+    Get the ffmpeg audio stream as async generator.
+
+    Takes care of resampling and/or recoding if needed,
+    according to player preferences.
+    """
+    async with FFMpeg(
+        audio_input=audio_input,
+        input_format=input_format,
+        output_format=output_format,
+        filter_params=filter_params,
+        extra_args=extra_args,
+        extra_input_args=extra_input_args,
+    ) as ffmpeg_proc:
+        # read final chunks from stdout
+        iterator = ffmpeg_proc.iter_chunked(chunk_size) if chunk_size else ffmpeg_proc.iter_any()
+        async for chunk in iterator:
+            yield chunk
+
+
+def get_ffmpeg_args(
+    input_format: AudioFormat,
+    output_format: AudioFormat,
+    filter_params: list[str],
+    extra_args: list[str] | None = None,
+    input_path: str = "-",
+    output_path: str = "-",
+    extra_input_args: list[str] | None = None,
+    loglevel: str = "error",
+) -> list[str]:
+    """Collect all args to send to the ffmpeg process."""
+    if extra_args is None:
+        extra_args = []
+    ffmpeg_present, libsoxr_support, version = get_global_cache_value("ffmpeg_support")
+    if not ffmpeg_present:
+        msg = (
+            "FFmpeg binary is missing from system."
+            "Please install ffmpeg on your OS to enable playback."
+        )
+        raise AudioError(
+            msg,
+        )
+
+    major_version = int("".join(char for char in version.split(".")[0] if not char.isalpha()))
+
+    # generic args
+    generic_args = [
+        "ffmpeg",
+        "-hide_banner",
+        "-loglevel",
+        loglevel,
+        "-nostats",
+        "-ignore_unknown",
+        "-protocol_whitelist",
+        "file,hls,http,https,tcp,tls,crypto,pipe,data,fd,rtp,udp",
+    ]
+    # collect input args
+    input_args = []
+    if extra_input_args:
+        input_args += extra_input_args
+    if input_path.startswith("http"):
+        # append reconnect options for direct stream from http
+        input_args += [
+            "-reconnect",
+            "1",
+            "-reconnect_streamed",
+            "1",
+        ]
+        if major_version > 4:
+            # these options are only supported in ffmpeg > 5
+            input_args += [
+                "-reconnect_on_network_error",
+                "1",
+                "-reconnect_on_http_error",
+                "5xx,4xx",
+            ]
+    if input_format.content_type.is_pcm():
+        input_args += [
+            "-ac",
+            str(input_format.channels),
+            "-channel_layout",
+            "mono" if input_format.channels == 1 else "stereo",
+            "-ar",
+            str(input_format.sample_rate),
+            "-acodec",
+            input_format.content_type.name.lower(),
+            "-f",
+            input_format.content_type.value,
+            "-i",
+            input_path,
+        ]
+    else:
+        # let ffmpeg auto detect the content type from the metadata/headers
+        input_args += ["-i", input_path]
+
+    # collect output args
+    output_args = []
+    if output_path.upper() == "NULL":
+        # devnull stream
+        output_args = ["-f", "null", "-"]
+    elif output_format.content_type == ContentType.UNKNOWN:
+        raise RuntimeError("Invalid output format specified")
+    elif output_format.content_type == ContentType.AAC:
+        output_args = ["-f", "adts", "-c:a", "aac", "-b:a", "256k", output_path]
+    elif output_format.content_type == ContentType.MP3:
+        output_args = ["-f", "mp3", "-b:a", "320k", output_path]
+    else:
+        if output_format.content_type.is_pcm():
+            output_args += ["-acodec", output_format.content_type.name.lower()]
+        # use explicit format identifier for all other
+        output_args += [
+            "-f",
+            output_format.content_type.value,
+            "-ar",
+            str(output_format.sample_rate),
+            "-ac",
+            str(output_format.channels),
+        ]
+        if output_format.output_format_str == "flac":
+            output_args += ["-compression_level", "6"]
+        output_args += [output_path]
+
+    # edge case: source file is not stereo - downmix to stereo
+    if input_format.channels > 2 and output_format.channels == 2:
+        filter_params = [
+            "pan=stereo|FL=1.0*FL+0.707*FC+0.707*SL+0.707*LFE|FR=1.0*FR+0.707*FC+0.707*SR+0.707*LFE",
+            *filter_params,
+        ]
+
+    # determine if we need to do resampling
+    if (
+        input_format.sample_rate != output_format.sample_rate
+        or input_format.bit_depth > output_format.bit_depth
+    ):
+        # prefer resampling with libsoxr due to its high quality
+        if libsoxr_support:
+            resample_filter = "aresample=resampler=soxr:precision=30"
+        else:
+            resample_filter = "aresample=resampler=swr"
+
+        # sample rate conversion
+        if input_format.sample_rate != output_format.sample_rate:
+            resample_filter += f":osr={output_format.sample_rate}"
+
+        # bit depth conversion: apply dithering when going down to 16 bits
+        if output_format.bit_depth == 16 and input_format.bit_depth > 16:
+            resample_filter += ":osf=s16:dither_method=triangular_hp"
+
+        filter_params.append(resample_filter)
+
+    if filter_params and "-filter_complex" not in extra_args:
+        extra_args += ["-af", ",".join(filter_params)]
+
+    return generic_args + input_args + extra_args + output_args
index 41e055744f467b053fc28c3c16ca2aacb6391de3..82c2f3dba25b99fdc71fa8f7f229dd314dfde929 100644 (file)
@@ -11,7 +11,8 @@ import tempfile
 import urllib.error
 import urllib.parse
 import urllib.request
-from collections.abc import Awaitable, Callable, Coroutine
+from collections.abc import AsyncGenerator, Awaitable, Callable, Coroutine
+from contextlib import suppress
 from functools import lru_cache
 from importlib.metadata import PackageNotFoundError
 from importlib.metadata import version as pkg_version
@@ -157,6 +158,15 @@ def get_port_from_zeroconf(discovery_info: AsyncServiceInfo) -> str | None:
     return discovery_info.port
 
 
+async def close_async_generator(agen: AsyncGenerator[Any, None]) -> None:
+    """Force close an async generator."""
+    task = asyncio.create_task(agen.__anext__())
+    task.cancel()
+    with suppress(asyncio.CancelledError):
+        await task
+    await agen.aclose()
+
+
 class TaskManager:
     """
     Helper class to run many tasks at once.
index 7d35ff48a760d8401135b157717d5920c02b9eee..d4add27ba213b3e8f246ec380b4682caea02dfa9 100644 (file)
@@ -272,7 +272,6 @@ class RaopStream:
             output_format=AIRPLAY_PCM_FORMAT,
             filter_params=get_player_filter_params(self.mass, player_id),
             audio_output=write,
-            logger=self.airplay_player.logger.getChild("ffmpeg"),
         )
         await self._ffmpeg_proc.start()
         await asyncio.to_thread(os.close, write)
index bbfefd0f1d6051c34e568546b189229143e702cc..06e9745948f5cd85b03fefa61004a8cb98cedcec 100644 (file)
@@ -12,10 +12,9 @@ from zeroconf import ServiceStateChange
 
 from music_assistant.common.models.config_entries import (
     CONF_ENTRY_CROSSFADE,
-    CONF_ENTRY_CROSSFADE_FLOW_MODE_REQUIRED,
     CONF_ENTRY_ENABLE_ICY_METADATA,
     CONF_ENTRY_ENFORCE_MP3,
-    CONF_ENTRY_FLOW_MODE_DEFAULT_ENABLED,
+    CONF_ENTRY_FLOW_MODE_ENFORCED,
     CONF_ENTRY_HTTP_PROFILE_FORCED_2,
     ConfigEntry,
     ConfigValueType,
@@ -208,7 +207,7 @@ class BluesoundPlayerProvider(PlayerProvider):
 
     async def handle_async_init(self) -> None:
         """Handle async initialization of the provider."""
-        self.bluos_players: dict[str, BluosPlayer] = {}
+        self.bluos_players: dict[str, BluesoundPlayer] = {}
 
     async def on_mdns_service_state_change(
         self, name: str, state_change: ServiceStateChange, info: AsyncServiceInfo | None
@@ -291,16 +290,15 @@ class BluesoundPlayerProvider(PlayerProvider):
     ) -> tuple[ConfigEntry, ...]:
         """Return Config Entries for the given player."""
         base_entries = await super().get_player_config_entries(self.player_id)
-        if not self.bluos_players.get(self.player_id):
+        if not self.bluos_players.get(player_id):
             # TODO fix player entries
             return (*base_entries, CONF_ENTRY_CROSSFADE)
         return (
             *base_entries,
             CONF_ENTRY_HTTP_PROFILE_FORCED_2,
             CONF_ENTRY_CROSSFADE,
-            CONF_ENTRY_CROSSFADE_FLOW_MODE_REQUIRED,
             CONF_ENTRY_ENFORCE_MP3,
-            CONF_ENTRY_FLOW_MODE_DEFAULT_ENABLED,
+            CONF_ENTRY_FLOW_MODE_ENFORCED,
             CONF_ENTRY_ENABLE_ICY_METADATA,
         )
 
@@ -349,13 +347,11 @@ class BluesoundPlayerProvider(PlayerProvider):
             mass_player.volume_mute = muted
             await bluos_player.update_attributes()
 
-    async def play_media(
-        self, player_id: str, media: PlayerMedia, timeout: float | None = None
-    ) -> None:
+    async def play_media(self, player_id: str, media: PlayerMedia) -> None:
         """Handle PLAY MEDIA for BluOS player using the provided URL."""
         mass_player = self.mass.players.get(player_id)
         if bluos_player := self.bluos_players[player_id]:
-            play_status = await bluos_player.client.play_url(media.uri, timeout=timeout)
+            play_status = await bluos_player.client.play_url(media.uri)
             if play_status == "stream":
                 # Update media info then optimistically override playback state and source
                 await bluos_player.update_attributes()
index 7566825e1d4060cd8c4c5ceaa67385da8fe51f8d..1484c318ebc02654886447792b7ba5b3d5a1a79b 100644 (file)
@@ -351,12 +351,21 @@ class DLNAPlayerProvider(PlayerProvider):
             media.uri = media.uri.replace(".flac", ".mp3")
         didl_metadata = create_didl_metadata(media)
         title = media.title or media.uri
-        await dlna_player.device.async_set_next_transport_uri(media.uri, title, didl_metadata)
-        self.logger.debug(
-            "Enqued next track (%s) to player %s",
-            title,
-            dlna_player.player.display_name,
-        )
+        try:
+            await dlna_player.device.async_set_next_transport_uri(media.uri, title, didl_metadata)
+        except UpnpError:
+            self.logger.error(
+                "Enqueuing the next track failed for player %s - "
+                "the player probably doesn't support this. "
+                "Enable 'flow mode' for this player.",
+                dlna_player.player.display_name,
+            )
+        else:
+            self.logger.debug(
+                "Enqued next track (%s) to player %s",
+                title,
+                dlna_player.player.display_name,
+            )
 
     @catch_request_errors
     async def cmd_pause(self, player_id: str) -> None:
index 2b4930c248725bcaf4f596289d58f96d2163d9e1..82814e83f0d440e72d4361addfe95a73b6cc1c1c 100644 (file)
@@ -517,7 +517,6 @@ class SnapCastProvider(PlayerProvider):
                     output_format=DEFAULT_SNAPCAST_FORMAT,
                     filter_params=get_player_filter_params(self.mass, player_id),
                     audio_output=stream_path,
-                    logger=self.logger.getChild("ffmpeg"),
                 ) as ffmpeg_proc:
                     player.state = PlayerState.PLAYING
                     player.current_media = media
index 1e8b03fe54ab7755254fe03dfecec237205f4e02..7fd60e95345c1618204a546bbac75a42cb913e0c 100644 (file)
@@ -330,7 +330,7 @@ class SonosPlayer:
         else:
             # player is group child (synced to another player)
             group_parent = self.prov.sonos_players.get(self.client.player.group.coordinator_id)
-            if not group_parent:
+            if not group_parent or not group_parent.client:
                 # handle race condition where the group parent is not yet discovered
                 return
             active_group = group_parent.client.player.group
index 0bd10b5e902bfe6dea9ac46c0a7f362d3a25e535..910fe80d7936bc0d6d52ff2515a3d94eef99325b 100644 (file)
@@ -585,14 +585,23 @@ class SpotifyProvider(MusicProvider):
         chunk_size = get_chunksize(streamdetails.audio_format)
         stderr = None if self.logger.isEnabledFor(VERBOSE_LOG_LEVEL) else False
         self.logger.log(VERBOSE_LOG_LEVEL, f"Start streaming {spotify_uri} using librespot")
-        async with AsyncProcess(
-            args,
-            stdout=True,
-            stderr=stderr,
-            name="librespot",
-        ) as librespot_proc:
-            async for chunk in librespot_proc.iter_any(chunk_size):
-                yield chunk
+        for attempt in range(1, 3):
+            async with AsyncProcess(
+                args,
+                stdout=True,
+                stderr=stderr,
+                name="librespot",
+            ) as librespot_proc:
+                chunks_received = 0
+                async for chunk in librespot_proc.iter_any(chunk_size):
+                    yield chunk
+                    chunks_received += 1
+            if chunks_received:
+                break
+            self.logger.warning(
+                "librespot failed to stream track, retrying... (attempt %s/3)", attempt
+            )
+            await asyncio.sleep(0.5)
 
     def _parse_artist(self, artist_obj):
         """Parse spotify artist object to generic layout."""