Finalize stream caching + fix several bugs (#2029)
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Tue, 11 Mar 2025 23:41:52 +0000 (00:41 +0100)
committerGitHub <noreply@github.com>
Tue, 11 Mar 2025 23:41:52 +0000 (00:41 +0100)
music_assistant/constants.py
music_assistant/controllers/streams.py
music_assistant/helpers/audio.py
music_assistant/helpers/ffmpeg.py
music_assistant/helpers/util.py
music_assistant/providers/audiobookshelf/__init__.py
music_assistant/providers/filesystem_local/__init__.py
pyproject.toml
requirements_all.txt

index 459c1e8c384771d0ed5cbc0cd78d01a12735a60c..fcb346bbe03adf68ecd5b1411c00e4b5dd5ce200 100644 (file)
@@ -84,13 +84,15 @@ CONF_POWER_CONTROL: Final[str] = "power_control"
 CONF_VOLUME_CONTROL: Final[str] = "volume_control"
 CONF_MUTE_CONTROL: Final[str] = "mute_control"
 CONF_OUTPUT_CODEC: Final[str] = "output_codec"
-CONF_ALLOW_MEMORY_CACHE: Final[str] = "allow_memory_cache"
+CONF_ALLOW_AUDIO_CACHE: Final[str] = "allow_audio_cache"
+CONF_AUDIO_CACHE_MAX_SIZE: Final[str] = "audio_cache_max_size"
 
 
 # config default values
 DEFAULT_HOST: Final[str] = "0.0.0.0"
 DEFAULT_PORT: Final[int] = 8095
-DEFAULT_ALLOW_MEMORY_CACHE: Final[bool] = True
+DEFAULT_ALLOW_AUDIO_CACHE: Final[str] = "auto"
+DEFAULT_AUDIO_CACHE_MAX_SIZE: Final[int] = 5  # 5gb
 
 # common db tables
 DB_TABLE_PLAYLOG: Final[str] = "playlog"
index 3f3efa87873c7e15e28dc4ab7d069ec6c0237161..552299d9f096605d4d7e26002fc6f7a9f3741113 100644 (file)
@@ -30,7 +30,8 @@ from music_assistant_models.player_queue import PlayLogEntry
 
 from music_assistant.constants import (
     ANNOUNCE_ALERT_FILE,
-    CONF_ALLOW_MEMORY_CACHE,
+    CONF_ALLOW_AUDIO_CACHE,
+    CONF_AUDIO_CACHE_MAX_SIZE,
     CONF_BIND_IP,
     CONF_BIND_PORT,
     CONF_CROSSFADE,
@@ -45,7 +46,8 @@ from music_assistant.constants import (
     CONF_VOLUME_NORMALIZATION_FIXED_GAIN_TRACKS,
     CONF_VOLUME_NORMALIZATION_RADIO,
     CONF_VOLUME_NORMALIZATION_TRACKS,
-    DEFAULT_ALLOW_MEMORY_CACHE,
+    DEFAULT_ALLOW_AUDIO_CACHE,
+    DEFAULT_AUDIO_CACHE_MAX_SIZE,
     DEFAULT_PCM_FORMAT,
     DEFAULT_STREAM_HEADERS,
     ICY_HEADERS,
@@ -63,7 +65,13 @@ from music_assistant.helpers.audio import (
 )
 from music_assistant.helpers.ffmpeg import LOGGER as FFMPEG_LOGGER
 from music_assistant.helpers.ffmpeg import check_ffmpeg_version, get_ffmpeg_stream
-from music_assistant.helpers.util import get_ip, get_ips, select_free_port, try_parse_bool
+from music_assistant.helpers.util import (
+    clean_old_files,
+    get_ip,
+    get_ips,
+    select_free_port,
+    try_parse_bool,
+)
 from music_assistant.helpers.webserver import Webserver
 from music_assistant.models.core_controller import CoreController
 from music_assistant.models.plugin import PluginProvider
@@ -107,12 +115,21 @@ class StreamsController(CoreController):
         )
         self.manifest.icon = "cast-audio"
         self.announcements: dict[str, str] = {}
+        # create cache dir if needed
+        self._audio_cache_dir = audio_cache_dir = os.path.join(self.mass.cache_path, ".audio")
+        if not os.path.isdir(audio_cache_dir):
+            os.makedirs(audio_cache_dir)
 
     @property
     def base_url(self) -> str:
         """Return the base_url for the streamserver."""
         return self._server.base_url
 
+    @property
+    def audio_cache_dir(self) -> str:
+        """Return the directory where audio cache files are stored."""
+        return self._audio_cache_dir
+
     async def get_config_entries(
         self,
         action: str | None = None,
@@ -198,16 +215,36 @@ class StreamsController(CoreController):
                 required=False,
             ),
             ConfigEntry(
-                key=CONF_ALLOW_MEMORY_CACHE,
-                type=ConfigEntryType.BOOLEAN,
-                default_value=DEFAULT_ALLOW_MEMORY_CACHE,
-                label="Allow (in-memory) caching of audio streams",
-                description="To ensure smooth playback as well as fast seeking, "
-                "Music Assistant by default caches audio streams (in memory). "
-                "On systems with limited memory, this can be disabled, "
-                "but may result in less smooth playback.",
+                key=CONF_ALLOW_AUDIO_CACHE,
+                type=ConfigEntryType.STRING,
+                default_value=DEFAULT_ALLOW_AUDIO_CACHE,
+                options=[
+                    ConfigValueOption("Always", "always"),
+                    ConfigValueOption("Disabled", "disabled"),
+                    ConfigValueOption("Auto", "auto"),
+                ],
+                label="Allow caching of remote/cloudbased audio streams",
+                description="To ensure smooth(er) playback as well as fast seeking, "
+                "Music Assistant can cache audio streams on disk. \n"
+                "On systems with limited diskspace, this can be disabled, "
+                "but may result in less smooth playback or slower seeking.\n\n"
+                "**Always:** Enforce caching of audio streams at all times "
+                "(as long as there is enough free space)."
+                "**Disabled:** Never cache audio streams.\n"
+                "**Auto:** Let Music Assistant decide if caching "
+                "should be used on a per-item base.",
+                category="advanced",
+                required=True,
+            ),
+            ConfigEntry(
+                key=CONF_AUDIO_CACHE_MAX_SIZE,
+                type=ConfigEntryType.INTEGER,
+                default_value=DEFAULT_AUDIO_CACHE_MAX_SIZE,
+                label="Maximum size of audio cache",
+                description="The maximum amount of diskspace (in GB) "
+                "the audio cache may consume (if enabled).",
+                range=(1, 50),
                 category="advanced",
-                required=False,
             ),
         )
 
@@ -218,6 +255,7 @@ class StreamsController(CoreController):
         FFMPEG_LOGGER.setLevel(self.logger.level)
         # perform check for ffmpeg version
         await check_ffmpeg_version()
+        await self._clean_audio_cache()
         # start the webserver
         self.publish_port = config.get_value(CONF_BIND_PORT)
         self.publish_ip = config.get_value(CONF_PUBLISH_IP)
@@ -1049,3 +1087,17 @@ class StreamsController(CoreController):
             bit_depth=DEFAULT_PCM_FORMAT.bit_depth,
             channels=2,
         )
+
+    async def _clean_audio_cache(self) -> None:
+        """Clean up audio cache periodically."""
+        max_cache_size = await self.mass.config.get_core_config_value(
+            self.domain, CONF_AUDIO_CACHE_MAX_SIZE
+        )
+        cache_enabled = await self.mass.config.get_core_config_value(
+            self.domain, CONF_ALLOW_AUDIO_CACHE
+        )
+        if cache_enabled == "disabled":
+            max_cache_size = 0.001
+        await clean_old_files(self.audio_cache_dir, max_cache_size)
+        # reschedule self
+        self.mass.call_later(3600, self._clean_audio_cache)
index a2542012b778752d4fdb4870a122c294d6fbc838..3be9304d72e0fab2440f86001d7ac9798f6b29a3 100644 (file)
@@ -34,14 +34,14 @@ from music_assistant_models.errors import (
 from music_assistant_models.streamdetails import AudioFormat
 
 from music_assistant.constants import (
-    CONF_ALLOW_MEMORY_CACHE,
+    CONF_ALLOW_AUDIO_CACHE,
     CONF_ENTRY_OUTPUT_LIMITER,
     CONF_OUTPUT_CHANNELS,
     CONF_VOLUME_NORMALIZATION,
     CONF_VOLUME_NORMALIZATION_RADIO,
     CONF_VOLUME_NORMALIZATION_TARGET,
     CONF_VOLUME_NORMALIZATION_TRACKS,
-    DEFAULT_ALLOW_MEMORY_CACHE,
+    DEFAULT_ALLOW_AUDIO_CACHE,
     MASS_LOGGER_NAME,
     VERBOSE_LOG_LEVEL,
 )
@@ -53,7 +53,7 @@ from .dsp import filter_to_ffmpeg_params
 from .ffmpeg import FFMpeg, get_ffmpeg_stream
 from .playlists import IsHLSPlaylist, PlaylistItem, fetch_playlist, parse_m3u
 from .process import AsyncProcess, communicate
-from .util import detect_charset
+from .util import detect_charset, has_enough_space
 
 if TYPE_CHECKING:
     from music_assistant_models.config_entries import CoreConfig, PlayerConfig
@@ -70,6 +70,8 @@ LOGGER = logging.getLogger(f"{MASS_LOGGER_NAME}.audio")
 HTTP_HEADERS = {"User-Agent": "Lavf/60.16.100.MusicAssistant"}
 HTTP_HEADERS_ICY = {**HTTP_HEADERS, "Icy-MetaData": "1"}
 
+REQUIRED_FREE_CACHE_SPACE = 5  # 5 GB
+
 
 async def remove_file(file_path: str) -> None:
     """Remove file path (if it exists)."""
@@ -83,143 +85,182 @@ class StreamCache:
     """
     StreamCache.
 
-    Basic class to handle (temporary) in-memory caching of audio streams.
+    Basic class to handle caching of audio streams to a (semi) temporary file.
     Useful in case of slow or unreliable network connections, faster seeking,
     or when the audio stream is slow itself.
     """
 
     async def create(self) -> None:
         """Create the cache file (if needed)."""
-        self.mass.cancel_timer(f"clear_cache_{self.cache_id}")
+        if (
+            self._cache_file is not None
+            and await asyncio.to_thread(os.path.exists, self._cache_file)
+            and self._first_part_received.is_set()
+        ):
+            # cache file already exists
+            return
+        # use cache controller to store the translation of uri-->cache file
+        if stored_cache_file := await self.mass.cache.get(
+            self.streamdetails.uri, base_key="audiocache"
+        ):
+            # cache file already exists in memory
+            self._cache_file = stored_cache_file
+            if await asyncio.to_thread(os.path.exists, self._cache_file):
+                # cache file already exists
+                return
+        else:
+            # create new cache file
+            cache_id = shortuuid.random(30)
+            self._cache_file = os.path.join(self.mass.streams.audio_cache_dir, cache_id)
+            await self.mass.cache.set(
+                self.streamdetails.uri,
+                self._cache_file,
+                base_key="audiocache",
+            )
+        # start fetch task if its not already running
         if self._fetch_task is None:
-            self._fetch_task = self.mass.create_task(self._fill_cache())
+            self._fetch_task = self.mass.create_task(self._create_cache_file())
         # wait until the first part of the file is received
         await self._first_part_received.wait()
 
-    async def get_audio_stream(self) -> AsyncGenerator[bytes, None]:
-        """Stream audio from cachedata (while it might even still being written)."""
-        try:
-            self._subscribers += 1
-            bytes_read = 0
-            chunksize = 64000
-            await self.create()
-            while True:
-                async with self._lock:
-                    chunk = self._data[bytes_read : bytes_read + chunksize]
-                    bytes_read += len(chunk)
-                if len(chunk) < chunksize and self._all_data_written.is_set():
-                    # reached EOF
-                    break
-                elif not chunk:
-                    # data is not yet available, wait a bit
-                    await asyncio.sleep(0.05)
-                else:
-                    yield chunk
-                del chunk
-                await asyncio.sleep(0)  # yield to eventloop
-        finally:
-            self._subscribers -= 1
-            if self._subscribers == 0:
-                # set a timer to remove the tempfile after 1 minute
-                # if the file is accessed again within this period,
-                # the timer will be cancelled
-                self.mass.call_later(60, self._clear, task_id=f"clear_cache_{self.cache_id}")
-
-    async def _fill_cache(self) -> None:
+    async def get_audio_stream(self) -> str | AsyncGenerator[bytes, None]:
+        """
+        Get the cached audio stream.
+
+        Returns a string with the path of the cachefile if the file is ready.
+        If the file is not yet ready, it will return an async generator that will
+        stream the (intermediate) audio data from the cache file.
+        """
+
+        async def _stream_from_cache() -> AsyncGenerator[bytes, None]:
+            chunksize = get_chunksize(self.streamdetails.audio_format, 1)
+            async with aiofiles.open(self._cache_file, "rb") as file:
+                while True:
+                    chunk = await file.read(chunksize)
+                    if chunk:
+                        yield chunk
+                        await asyncio.sleep(0)  # yield to eventloop
+                        del chunk
+                    elif self._all_data_written.is_set():
+                        # reached EOF
+                        break
+                    else:
+                        # data is not yet available, wait a bit
+                        await asyncio.sleep(0.05)
+
+        if await asyncio.to_thread(os.path.exists, self._cache_file):
+            if self._fetch_task is None:
+                # a complete cache file already exists on disk from a previous run
+                return self._cache_file
+            if self._all_data_written.is_set():
+                # cache file is ready
+                return self._cache_file
+        # cache file does not exist at all (or is still being written)
+        await self.create()
+        return _stream_from_cache()
+
+    async def _create_cache_file(self) -> None:
         time_start = time.time()
-        self.logger.debug("Fetching audio stream for %s", self.streamdetails.uri)
+        self.logger.debug("Creating audio cache for %s", self.streamdetails.uri)
+        extra_input_args = self.org_extra_input_args or []
         if self.org_stream_type == StreamType.CUSTOM:
             audio_source = self.mass.get_provider(self.streamdetails.provider).get_audio_stream(
                 self.streamdetails,
             )
-        elif self.org_stream_type in (StreamType.HTTP, StreamType.ENCRYPTED_HTTP, StreamType.HLS):
+        elif self.org_stream_type == StreamType.ICY:
+            raise NotImplementedError("Caching of this streamtype is not supported!")
+        elif self.org_stream_type == StreamType.HLS:
+            if self.streamdetails.media_type == MediaType.RADIO:
+                raise NotImplementedError("Caching of this streamtype is not supported!")
+            substream = await get_hls_substream(self.mass, self.org_path)
+            audio_source = substream.path
+        elif self.org_stream_type == StreamType.ENCRYPTED_HTTP:
             audio_source = self.org_path
+            extra_input_args += ["-decryption_key", self.streamdetails.decryption_key]
+        elif self.org_stream_type == StreamType.MULTI_FILE:
+            audio_source = get_multi_file_stream(self.mass, self.streamdetails)
         else:
-            raise NotImplementedError("Caching of this streamtype is not supported")
-
-        extra_input_args = self.org_extra_input_args or []
-        if self.streamdetails.decryption_key:
-            extra_input_args += [
-                "-decryption_key",
-                self.streamdetails.decryption_key,
-            ]
+            audio_source = self.org_path
 
-        # we always use an intermediate ffmpeg process to fetch the original audio source
+        # we always use ffmpeg to fetch the original audio source
         # this may feel a bit redundant, but it's the most reliable way to fetch the audio
         # because ffmpeg has all logic to handle different audio formats, codecs, etc.
         # and it also accounts for complicated cases such as encrypted streams or
         # m4a/mp4 streams with the moov atom at the end of the file.
-        # ffmpeg will produce a lossless copy of the original codec to stdout.
+        # ffmpeg will produce a lossless copy of the original codec.
         self._first_part_received.clear()
         self._all_data_written.clear()
-        self._data = b""
-        async for chunk in get_ffmpeg_stream(
-            audio_input=audio_source,
-            input_format=self.streamdetails.audio_format,
-            output_format=self.streamdetails.audio_format,
-            chunk_size=64000,
-            # apply readrate limiting to avoid buffering too much data too fast
-            # so we only allow reading into the cache max 5 times the normal speed
-            extra_input_args=["-readrate", "5", *extra_input_args],
-        ):
-            async with self._lock:
-                self._data += chunk
-                del chunk
-                await asyncio.sleep(0)  # yield to eventloop
-            if not self._first_part_received.is_set():
-                self._first_part_received.set()
-                self.logger.debug(
-                    "First part received for %s after %.2fs",
-                    self.streamdetails.uri,
-                    time.time() - time_start,
-                )
-        self._all_data_written.set()
-        self.logger.debug(
-            "Writing all data for %s done in %.2fs",
-            self.streamdetails.uri,
-            time.time() - time_start,
-        )
+        try:
+            ffmpeg_proc = FFMpeg(
+                audio_input=audio_source,
+                input_format=self.org_audio_format,
+                output_format=self.streamdetails.audio_format,
+                extra_input_args=extra_input_args,
+                audio_output=self._cache_file,
+                collect_log_history=True,
+            )
+            await ffmpeg_proc.start()
+            # wait until the first data is written to the cache file
+            while ffmpeg_proc.returncode is None:
+                await asyncio.sleep(0.1)
+                if not await asyncio.to_thread(os.path.exists, self._cache_file):
+                    continue
+                if await asyncio.to_thread(os.path.getsize, self._cache_file) > 64000:
+                    break
+
+            self._first_part_received.set()
+            self.logger.debug(
+                "First part received for %s after %.2fs",
+                self.streamdetails.uri,
+                time.time() - time_start,
+            )
+            # wait until ffmpeg is done
+            await ffmpeg_proc.wait()
+
+            if ffmpeg_proc.returncode != 0:
+                ffmpeg_proc.logger.warning("\n".join(ffmpeg_proc.log_history))
+                raise AudioError(f"FFMpeg error {ffmpeg_proc.returncode}")
+
+            self._all_data_written.set()
+            self.logger.debug(
+                "Writing all data for %s done in %.2fs",
+                self.streamdetails.uri,
+                time.time() - time_start,
+            )
+        except Exception as err:
+            self.logger.error("Error while creating cache for %s: %s", self.streamdetails.uri, err)
+            # remove the cache file
+            await remove_file(self._cache_file)
+        finally:
+            await ffmpeg_proc.close()
 
     def __init__(self, mass: MusicAssistant, streamdetails: StreamDetails) -> None:
         """Initialize the StreamCache."""
         self.mass = mass
         self.streamdetails = streamdetails
-        self.cache_id = shortuuid.random(20)
         self.logger = LOGGER.getChild("cache")
+        self._cache_file: str | None = None
         self._fetch_task: asyncio.Task | None = None
         self._subscribers: int = 0
         self._first_part_received = asyncio.Event()
         self._all_data_written = asyncio.Event()
-        self._data: bytes = b""
-        self._lock: asyncio.Lock = asyncio.Lock()
         self.org_path: str | None = streamdetails.path
         self.org_stream_type: StreamType | None = streamdetails.stream_type
         self.org_extra_input_args: list[str] | None = streamdetails.extra_input_args
+        self.org_audio_format = streamdetails.audio_format
+        streamdetails.audio_format = AudioFormat(
+            content_type=ContentType.NUT,
+            codec_type=streamdetails.audio_format.codec_type,
+            sample_rate=streamdetails.audio_format.sample_rate,
+            bit_depth=streamdetails.audio_format.bit_depth,
+            channels=streamdetails.audio_format.channels,
+        )
         streamdetails.path = "-"
         streamdetails.stream_type = StreamType.CACHE
         streamdetails.can_seek = True
         streamdetails.allow_seek = True
         streamdetails.extra_input_args = []
 
-    async def _clear(self) -> None:
-        """Clear the cache."""
-        self.logger.debug("Cleaning up cache %s", self.streamdetails.uri)
-        if self._fetch_task and not self._fetch_task.done():
-            self._fetch_task.cancel()
-        self._fetch_task = None
-        self._first_part_received.clear()
-        self._all_data_written.clear()
-        del self._data
-        self._data = b""
-
-    def __del__(self) -> None:
-        """Ensure the cache data gets cleaned up."""
-        if self.mass.closing:
-            # edge case: MA is closing
-            return
-        self.mass.cancel_timer(f"remove_file_{self.cache_id}")
-        del self._data
-
 
 async def crossfade_pcm_parts(
     fade_in_part: bytes,
@@ -527,24 +568,9 @@ async def get_stream_details(
         int((time.time() - time_start) * 1000),
     )
 
-    if streamdetails.decryption_key:
-        # using intermediate cache is mandatory for encrypted streams
-        streamdetails.enable_cache = True
-
     # determine if we may use caching for the audio stream
     if streamdetails.enable_cache is None:
-        allow_cache = mass.config.get_raw_core_config_value(
-            "streams", CONF_ALLOW_MEMORY_CACHE, DEFAULT_ALLOW_MEMORY_CACHE
-        )
-        streamdetails.enable_cache = (
-            allow_cache
-            and streamdetails.duration is not None
-            and streamdetails.media_type
-            in (MediaType.TRACK, MediaType.AUDIOBOOK, MediaType.PODCAST_EPISODE)
-            and streamdetails.stream_type
-            in (StreamType.HTTP, StreamType.ENCRYPTED_HTTP, StreamType.CUSTOM, StreamType.HLS)
-            and get_chunksize(streamdetails.audio_format, streamdetails.duration) < 100000000
-        )
+        streamdetails.enable_cache = await _is_cache_allowed(mass, streamdetails)
 
     # handle temporary cache support of audio stream
     if streamdetails.enable_cache:
@@ -563,6 +589,51 @@ async def get_stream_details(
     return streamdetails
 
 
+async def _is_cache_allowed(mass: MusicAssistant, streamdetails: StreamDetails) -> bool:
+    """Check if caching is allowed for the given streamdetails."""
+    if streamdetails.media_type not in (
+        MediaType.TRACK,
+        MediaType.AUDIOBOOK,
+        MediaType.PODCAST_EPISODE,
+    ):
+        return False
+    if streamdetails.stream_type in (StreamType.ICY, StreamType.LOCAL_FILE, StreamType.UNKNOWN):
+        return False
+    allow_cache = mass.config.get_raw_core_config_value(
+        "streams", CONF_ALLOW_AUDIO_CACHE, DEFAULT_ALLOW_AUDIO_CACHE
+    )
+    if allow_cache == "disabled":
+        return False
+    if not await has_enough_space(mass.streams.audio_cache_dir, REQUIRED_FREE_CACHE_SPACE):
+        return False
+    if allow_cache == "always":
+        return True
+    # auto mode
+    if streamdetails.stream_type == StreamType.ENCRYPTED_HTTP:
+        # always prefer cache for encrypted streams
+        return True
+    if not streamdetails.duration:
+        # we can't determine filesize without duration so play it safe and dont allow cache
+        return False
+    if streamdetails.stream_type == StreamType.MULTI_FILE:
+        # prefer cache to speedup multi-file streams
+        # (if total filesize smaller than 5GB)
+        max_filesize = 5 * 1024 * 1024 * 1024
+        return get_chunksize(streamdetails.audio_format, streamdetails.duration) < max_filesize
+    if streamdetails.stream_type == StreamType.CUSTOM:
+        # prefer cache for custom streams (to speedup seeking)
+        # (if total filesize smaller than 500MB)
+        max_filesize = 500 * 1024 * 1024
+        return get_chunksize(streamdetails.audio_format, streamdetails.duration) < max_filesize
+    if streamdetails.stream_type == StreamType.HLS:
+        # prefer cache for HLS streams (to speedup seeking)
+        # (if total filesize smaller than 500MB)
+        max_filesize = 500 * 1024 * 1024
+        return get_chunksize(streamdetails.audio_format, streamdetails.duration) < max_filesize
+    # deny for all other stream types
+    return False
+
+
 async def get_media_stream(
     mass: MusicAssistant,
     streamdetails: StreamDetails,
@@ -583,7 +654,9 @@ async def get_media_stream(
     stream_type = streamdetails.stream_type
     if stream_type == StreamType.CACHE:
         cache = cast(StreamCache, streamdetails.cache)
-        audio_source = cache.get_audio_stream()
+        audio_source = await cache.get_audio_stream()
+    elif stream_type == StreamType.MULTI_FILE:
+        audio_source = get_multi_file_stream(mass, streamdetails)
     elif stream_type == StreamType.CUSTOM:
         audio_source = mass.get_provider(streamdetails.provider).get_audio_stream(
             streamdetails,
@@ -599,6 +672,9 @@ async def get_media_stream(
             # with ffmpeg, where they just stop after some minutes,
             # so we tell ffmpeg to loop around in this case.
             extra_input_args += ["-stream_loop", "-1", "-re"]
+    elif stream_type == StreamType.ENCRYPTED_HTTP:
+        audio_source = streamdetails.path
+        extra_input_args += ["-decryption_key", streamdetails.decryption_key]
     else:
         audio_source = streamdetails.path
 
@@ -715,6 +791,8 @@ async def get_media_stream(
         if bytes_sent == 0:
             # edge case: no audio data was sent
             raise AudioError("No audio was received")
+        elif ffmpeg_proc.returncode not in (0, None):
+            raise AudioError(f"FFMpeg exited with code {ffmpeg_proc.returncode}")
         finished = True
     except (Exception, GeneratorExit) as err:
         if isinstance(err, asyncio.CancelledError | GeneratorExit):
@@ -722,27 +800,21 @@ async def get_media_stream(
             cancelled = True
             raise
         logger.error("Error while streaming %s: %s", streamdetails.uri, err)
+        # dump the last 10 lines of the log in case of an unclean exit
+        logger.warning("\n".join(list(ffmpeg_proc.log_history)[-10:]))
         streamdetails.stream_error = True
     finally:
         # always ensure close is called which also handles all cleanup
         await ffmpeg_proc.close()
-
         # try to determine how many seconds we've streamed
         seconds_streamed = bytes_sent / pcm_format.pcm_sample_size if bytes_sent else 0
-        if not cancelled and ffmpeg_proc.returncode not in (0, 255):
-            # dump the last 5 lines of the log in case of an unclean exit
-            log_tail = "\n" + "\n".join(list(ffmpeg_proc.log_history)[-5:])
-        else:
-            log_tail = ""
         logger.debug(
-            "stream %s (with code %s) for %s - seconds streamed: %s %s",
+            "stream %s (with code %s) for %s - seconds streamed: %s",
             "cancelled" if cancelled else "finished" if finished else "aborted",
             ffmpeg_proc.returncode,
             streamdetails.uri,
             seconds_streamed,
-            log_tail,
         )
-
         streamdetails.seconds_streamed = seconds_streamed
         # store accurate duration
         if finished and not streamdetails.seek_position and seconds_streamed:
@@ -1103,6 +1175,35 @@ async def get_file_stream(
             yield data
 
 
+async def get_multi_file_stream(
+    mass: MusicAssistant,  # noqa: ARG001
+    streamdetails: StreamDetails,
+) -> AsyncGenerator[bytes, None]:
+    """Return audio stream for a concatenation of multiple files."""
+    files_list: list[str] = streamdetails.data
+    # concat input files
+    temp_file = f"/tmp/{shortuuid.random(20)}.txt"  # noqa: S108
+    async with aiofiles.open(temp_file, "w") as f:
+        for path in files_list:
+            await f.write(f"file '{path}'\n")
+
+    try:
+        async for chunk in get_ffmpeg_stream(
+            audio_input=temp_file,
+            input_format=streamdetails.audio_format,
+            output_format=AudioFormat(
+                content_type=ContentType.NUT,
+                sample_rate=streamdetails.audio_format.sample_rate,
+                bit_depth=streamdetails.audio_format.bit_depth,
+                channels=streamdetails.audio_format.channels,
+            ),
+            extra_input_args=["-safe", "0", "-f", "concat", "-i", temp_file],
+        ):
+            yield chunk
+    finally:
+        await remove_file(temp_file)
+
+
 async def get_preview_stream(
     mass: MusicAssistant,
     provider_instance_id_or_domain: str,
@@ -1176,7 +1277,7 @@ def get_chunksize(
         return pcm_size
     if fmt.content_type in (ContentType.WAV, ContentType.AIFF, ContentType.DSF):
         return pcm_size
-    if fmt.bit_rate:
+    if fmt.bit_rate and fmt.bit_rate < 10000:
         return int(((fmt.bit_rate * 1000) / 8) * seconds)
     if fmt.content_type in (ContentType.FLAC, ContentType.WAVPACK, ContentType.ALAC):
         # assume 74.7% compression ratio (level 0)
@@ -1347,7 +1448,9 @@ async def analyze_loudness(
     ]
     if streamdetails.stream_type == StreamType.CACHE:
         cache = cast(StreamCache, streamdetails.cache)
-        audio_source = cache.get_audio_stream()
+        audio_source = await cache.get_audio_stream()
+    elif streamdetails.stream_type == StreamType.MULTI_FILE:
+        audio_source = get_multi_file_stream(mass, streamdetails)
     elif streamdetails.stream_type == StreamType.CUSTOM:
         audio_source = mass.get_provider(streamdetails.provider).get_audio_stream(
             streamdetails,
index bdf2fd96ceb081d30a8c31a93d96146d6a02e053..006d298ebcefaa49d08dc6b08898dd3c87daa766 100644 (file)
@@ -60,10 +60,15 @@ class FFMpeg(AsyncProcess):
         self._stdin_task: asyncio.Task | None = None
         self._logger_task: asyncio.Task | None = None
         self._input_codec_parsed = False
+        if audio_input == "-" or isinstance(audio_input, AsyncGenerator):
+            stdin = True
+        else:
+            stdin = audio_input if isinstance(audio_input, int) else False
+        stdout = audio_output if isinstance(audio_output, int) else bool(audio_output == "-")
         super().__init__(
             ffmpeg_args,
-            stdin=True if isinstance(audio_input, str | AsyncGenerator) else audio_input,
-            stdout=True if isinstance(audio_output, str) else audio_output,
+            stdin=stdin,
+            stdout=stdout,
             stderr=True,
         )
         self.logger = LOGGER
@@ -190,11 +195,16 @@ async def get_ffmpeg_stream(
         filter_params=filter_params,
         extra_args=extra_args,
         extra_input_args=extra_input_args,
+        collect_log_history=True,
     ) as ffmpeg_proc:
         # read final chunks from stdout
         iterator = ffmpeg_proc.iter_chunked(chunk_size) if chunk_size else ffmpeg_proc.iter_any()
         async for chunk in iterator:
             yield chunk
+        if ffmpeg_proc.returncode not in (None, 0):
+            # dump the last 5 lines of the log in case of an unclean exit
+            log_tail = "\n" + "\n".join(list(ffmpeg_proc.log_history)[-5:])
+            ffmpeg_proc.logger.error(log_tail)
 
 
 def get_ffmpeg_args(  # noqa: PLR0915
@@ -223,7 +233,6 @@ def get_ffmpeg_args(  # noqa: PLR0915
     ]
     # collect input args
     input_args = []
-
     if extra_input_args:
         input_args += extra_input_args
     if input_path.startswith("http"):
@@ -263,6 +272,9 @@ def get_ffmpeg_args(  # noqa: PLR0915
         ]
     elif input_format.codec_type != ContentType.UNKNOWN:
         input_args += ["-acodec", input_format.codec_type.name.lower(), "-i", input_path]
+    elif "-f" in extra_input_args:
+        # input format is already specified in the extra input args
+        pass
     else:
         # let ffmpeg auto detect the content type from the metadata/headers
         input_args += ["-i", input_path]
@@ -288,19 +300,8 @@ def get_ffmpeg_args(  # noqa: PLR0915
             "-f",
             output_format.content_type.value,
         ]
-    elif input_format == output_format and not filter_params and not extra_args:
-        # passthrough-mode (e.g. for creating the cache)
-        if output_format.content_type in (
-            ContentType.MP4,
-            ContentType.MP4A,
-            ContentType.M4A,
-            ContentType.M4B,
-        ):
-            fmt = "adts"
-        elif output_format.codec_type in (ContentType.UNKNOWN, ContentType.OGG):
-            fmt = "nut"  # use special nut container
-        else:
-            fmt = output_format.content_type.name.lower()
+    elif output_format.content_type == ContentType.NUT:
+        # passthrough-mode (for creating the cache) using NUT container
         output_args = [
             "-vn",
             "-dn",
@@ -308,7 +309,7 @@ def get_ffmpeg_args(  # noqa: PLR0915
             "-acodec",
             "copy",
             "-f",
-            fmt,
+            "nut",
         ]
     elif output_format.content_type == ContentType.AAC:
         output_args = ["-f", "adts", "-c:a", "aac", "-b:a", "256k"]
@@ -337,7 +338,6 @@ def get_ffmpeg_args(  # noqa: PLR0915
             "-compression_level",
             "0",
         ]
-
     else:
         raise RuntimeError("Invalid/unsupported output format specified")
 
index 534c7a27772cd6ec75083efd1e753a7c55cf8ff2..25606e40f0efaaec9983f0824df8d876acf9fd5b 100644 (file)
@@ -286,14 +286,36 @@ async def get_ip_pton(ip_string: str | None = None) -> bytes:
         return await asyncio.to_thread(socket.inet_pton, socket.AF_INET6, ip_string)
 
 
-def get_folder_size(folderpath: str) -> float:
+async def get_folder_size(folderpath: str) -> float:
     """Return folder size in gb."""
-    total_size = 0
-    for dirpath, _dirnames, filenames in os.walk(folderpath):
-        for _file in filenames:
-            _fp = os.path.join(dirpath, _file)
-            total_size += os.path.getsize(_fp)
-    return total_size / float(1 << 30)
+
+    def _get_folder_size(folderpath: str) -> float:
+        total_size = 0
+        for dirpath, _dirnames, filenames in os.walk(folderpath):
+            for _file in filenames:
+                _fp = os.path.join(dirpath, _file)
+                total_size += os.path.getsize(_fp)
+        return total_size / float(1 << 30)
+
+    return await asyncio.to_thread(_get_folder_size, folderpath)
+
+
+async def clean_old_files(folderpath: str, max_size: float) -> None:
+    """Clean old files in folder to make room for new files."""
+    foldersize = await get_folder_size(folderpath)
+    if foldersize < max_size:
+        return
+
+    def _clean_old_files(foldersize: float):
+        files: list[os.DirEntry] = [x for x in os.scandir(folderpath) if x.is_file()]
+        files.sort(key=lambda x: x.stat().st_mtime)
+        for _file in files:
+            foldersize -= _file.stat().st_size / float(1 << 30)
+            os.remove(_file.path)
+            if foldersize < max_size:
+                return
+
+    await asyncio.to_thread(_clean_old_files, foldersize)
 
 
 def get_changed_keys(
@@ -465,13 +487,23 @@ async def has_tmpfs_mount() -> bool:
     return False
 
 
-async def get_tmp_free_space() -> int:
-    """Return free space on tmp."""
+async def get_tmp_free_space() -> float:
+    """Return free space on tmp in GB's."""
+    return await get_free_space("/tmp")  # noqa: S108
+
+
+async def get_free_space(folder: str) -> float:
+    """Return free space on given folderpath in GB."""
     try:
-        if res := await asyncio.to_thread(shutil.disk_usage, "/tmp"):  # noqa: S108
-            return res.free
+        if res := await asyncio.to_thread(shutil.disk_usage, folder):
+            return res.free / float(1 << 30)
     except (FileNotFoundError, OSError, PermissionError):
-        return 0
+        return 0.0
+
+
+async def has_enough_space(folder: str, size: int) -> bool:
+    """Check if folder has enough free space."""
+    return await get_free_space(folder) > size
 
 
 def divide_chunks(data: bytes, chunk_size: int) -> Iterator[bytes]:
index 9f7de87b4a385d38e98087632b83144ae5eb7f52..09d7c93b98af2ce166805592a0bab08820333939 100644 (file)
@@ -41,7 +41,6 @@ from music_assistant_models.media_items import (
 )
 from music_assistant_models.streamdetails import StreamDetails
 
-from music_assistant.helpers.ffmpeg import get_ffmpeg_stream
 from music_assistant.models.music_provider import MusicProvider
 from music_assistant.providers.audiobookshelf.parsers import (
     parse_audiobook,
@@ -448,47 +447,35 @@ class Audiobookshelf(MusicProvider):
         base_url = str(self.config.get_value(CONF_URL))
         if len(tracks) == 0:
             raise MediaNotFoundError("Stream not found")
+
+        content_type = ContentType.UNKNOWN
+        if abs_audiobook.media.tracks[0].metadata is not None:
+            content_type = ContentType.try_parse(abs_audiobook.media.tracks[0].metadata.ext)
+
         if len(tracks) > 1:
             self.logger.debug("Using playback for multiple file audiobook.")
-            multiple_files = []
+            multiple_files: list[str] = []
             for track in tracks:
-                media_url = track.content_url
-                stream_url = f"{base_url}{media_url}?token={token}"
-                content_type = ContentType.UNKNOWN
-                if track.metadata is not None:
-                    content_type = ContentType.try_parse(track.metadata.ext)
-                multiple_files.append(
-                    (AudioFormat(content_type=content_type), stream_url, track.duration)
-                )
+                stream_url = f"{base_url}{track.content_url}?token={token}"
+                multiple_files.append(stream_url)
 
             return StreamDetails(
                 provider=self.instance_id,
                 item_id=abs_audiobook.id_,
-                # for the concatanated stream, we need to use a pcm stream format
-                audio_format=AudioFormat(
-                    content_type=ContentType.PCM_S16LE,
-                    sample_rate=44100,
-                    bit_depth=16,
-                    channels=2,
-                ),
+                audio_format=AudioFormat(content_type=content_type),
                 media_type=MediaType.AUDIOBOOK,
-                stream_type=StreamType.CUSTOM,
+                stream_type=StreamType.MULTI_FILE,
                 duration=int(abs_audiobook.media.duration),
                 data=multiple_files,
                 allow_seek=True,
-                can_seek=True,
             )
 
         self.logger.debug(
             f'Using direct playback for audiobook "{abs_audiobook.media.metadata.title}".'
         )
-
-        track = abs_audiobook.media.tracks[0]
-        media_url = track.content_url
+        media_url = abs_audiobook.media.tracks[0].content_url
         stream_url = f"{base_url}{media_url}?token={token}"
-        content_type = ContentType.UNKNOWN
-        if track.metadata is not None:
-            content_type = ContentType.try_parse(track.metadata.ext)
+
         return StreamDetails(
             provider=self.lookup_key,
             item_id=abs_audiobook.id_,
@@ -534,34 +521,6 @@ class Audiobookshelf(MusicProvider):
             allow_seek=True,
         )
 
-    async def get_audio_stream(
-        self, streamdetails: StreamDetails, seek_position: int = 0
-    ) -> AsyncGenerator[bytes, None]:
-        """
-        Return the (custom) audio stream for the provider item.
-
-        Only used for multi-file audiobooks.
-        """
-        stream_data: list[tuple[AudioFormat, str, float]] = streamdetails.data
-        total_duration = 0.0
-        for audio_format, chapter_file, chapter_duration in stream_data:
-            total_duration += chapter_duration
-            if total_duration < seek_position:
-                continue
-            seek_position_netto = round(
-                max(0, seek_position - (total_duration - chapter_duration)), 2
-            )
-            self.logger.debug(chapter_file)
-            async for chunk in get_ffmpeg_stream(
-                chapter_file,
-                input_format=audio_format,
-                # output format is always pcm because we are sending
-                # the result of multiple files as one big stream
-                output_format=streamdetails.audio_format,
-                extra_input_args=["-ss", str(seek_position_netto)] if seek_position_netto else [],
-            ):
-                yield chunk
-
     async def get_resume_position(self, item_id: str, media_type: MediaType) -> tuple[bool, int]:
         """Return finished:bool, position_ms: int."""
         progress: None | MediaProgress = None
index f8dad59e991c269efe2e15feeb0c557cddd910eb..d4194b1c849037803d33a02810bf2232f1ba5fa5 100644 (file)
@@ -59,7 +59,6 @@ from music_assistant.constants import (
     VARIOUS_ARTISTS_NAME,
 )
 from music_assistant.helpers.compare import compare_strings, create_safe_string
-from music_assistant.helpers.ffmpeg import get_ffmpeg_stream
 from music_assistant.helpers.json import json_loads
 from music_assistant.helpers.playlists import parse_m3u, parse_pls
 from music_assistant.helpers.tags import AudioTags, async_parse_tags, parse_tags, split_items
@@ -847,35 +846,6 @@ class LocalFileSystemProvider(MusicProvider):
             return await self._get_stream_details_for_podcast_episode(item_id)
         return await self._get_stream_details_for_track(item_id)
 
-    async def get_audio_stream(
-        self, streamdetails: StreamDetails, seek_position: int = 0
-    ) -> AsyncGenerator[bytes, None]:
-        """
-        Return the (custom) audio stream for the provider item.
-
-        Will only be called when the stream_type is set to CUSTOM,
-        currently only for multi-part audiobooks.
-        """
-        stream_data: tuple[AudioFormat, list[tuple[str, float]]] = streamdetails.data
-        format_org, file_based_chapters = stream_data
-        total_duration = 0.0
-        for chapter_file, chapter_duration in file_based_chapters:
-            total_duration += chapter_duration
-            if total_duration < seek_position:
-                continue
-            seek_position_netto = round(
-                max(0, seek_position - (total_duration - chapter_duration)), 2
-            )
-            async for chunk in get_ffmpeg_stream(
-                self.get_absolute_path(chapter_file),
-                input_format=format_org,
-                # output format is always pcm because we are sending
-                # the result of multiple files as one big stream
-                output_format=streamdetails.audio_format,
-                extra_input_args=["-ss", str(seek_position_netto)] if seek_position_netto else [],
-            ):
-                yield chunk
-
     async def resolve_image(self, path: str) -> str | bytes:
         """
         Resolve an image from an image path.
@@ -1646,29 +1616,32 @@ class LocalFileSystemProvider(MusicProvider):
 
         prov_mapping = next(x for x in library_item.provider_mappings if x.item_id == item_id)
         file_item = await self.resolve(item_id)
-
-        file_based_chapters: list[tuple[str, float]] | None
-        if file_based_chapters := await self.cache.get(
+        duration = library_item.duration
+        chapters_cache_key = f"{self.lookup_key}.audiobook.chapters"
+        file_based_chapters: list[tuple[str, float]] | None = await self.cache.get(
             file_item.relative_path,
-            base_key=f"{self.lookup_key}.audiobook.chapters",
-        ):
-            # this is a multi-file audiobook, we have the chapter(files) stored in cache
-            # use custom stream to simply send the chapter files one by one
+            base_key=chapters_cache_key,
+        )
+        if file_based_chapters is None:
+            # no cache available for this audiobook, we need to parse the chapters
+            tags = await async_parse_tags(file_item.absolute_path, file_item.file_size)
+            await self._parse_audiobook(file_item, tags)
+            file_based_chapters = await self.cache.get(
+                file_item.relative_path,
+                base_key=chapters_cache_key,
+            )
+
+        if file_based_chapters:
+            # this is a multi-file audiobook
             return StreamDetails(
                 provider=self.instance_id,
                 item_id=item_id,
-                # for the concatanated stream, we need to use a pcm stream format
-                audio_format=AudioFormat(
-                    content_type=ContentType.from_bit_depth(prov_mapping.audio_format.bit_depth),
-                    sample_rate=prov_mapping.audio_format.sample_rate,
-                    channels=prov_mapping.audio_format.channels,
-                ),
+                audio_format=prov_mapping.audio_format,
                 media_type=MediaType.AUDIOBOOK,
-                stream_type=StreamType.CUSTOM,
-                duration=library_item.duration,
-                data=(prov_mapping.audio_format, file_based_chapters),
+                stream_type=StreamType.MULTI_FILE,
+                duration=duration,
+                data=[self.get_absolute_path(x[0]) for x in file_based_chapters],
                 allow_seek=True,
-                can_seek=True,
             )
 
         # regular single-file streaming, simply let ffmpeg deal with the file directly
@@ -1691,8 +1664,10 @@ class LocalFileSystemProvider(MusicProvider):
     ) -> tuple[int, list[MediaItemChapter]]:
         """Return the chapters for an audiobook."""
         chapters: list[MediaItemChapter] = []
+        all_chapter_files: list[tuple[str, float]] = []
+        total_duration = 0.0
         if tags.chapters:
-            # The chapters are embedded in the file
+            # The chapters are embedded in the file tags
             chapters = [
                 MediaItemChapter(
                     position=chapter.chapter_id,
@@ -1702,40 +1677,45 @@ class LocalFileSystemProvider(MusicProvider):
                 )
                 for chapter in tags.chapters
             ]
-            return (try_parse_int(tags.duration) or 0, chapters)
-        # there could be multiple files for this audiobook in the same folder,
-        # where each file is a portion/chapter of the audiobook
-        # try to gather the chapters by traversing files in the same folder
-        chapter_file_tags: list[AudioTags] = []
-        total_duration = 0.0
-        abs_path = self.get_absolute_path(audiobook_file_item.parent_path)
-        for item in await asyncio.to_thread(sorted_scandir, self.base_path, abs_path, sort=True):
-            if "." not in item.relative_path or item.is_dir:
-                continue
-            if item.ext not in AUDIOBOOK_EXTENSIONS:
-                continue
-            item_tags = await async_parse_tags(item.absolute_path, item.file_size)
-            if not (tags.album == item_tags.album or (item_tags.tags.get("title") is None)):
-                continue
-            if item_tags.track is None:
-                continue
-            chapter_file_tags.append(item_tags)
-        chapter_file_tags.sort(key=lambda x: x.track or 0)
-        all_chapter_files: list[tuple[str, float]] = []
-        for chapter_tags in chapter_file_tags:
-            assert chapter_tags.duration is not None
-            chapters.append(
-                MediaItemChapter(
-                    position=chapter_tags.track or 0,
-                    name=chapter_tags.title,
-                    start=total_duration,
-                    end=total_duration + chapter_tags.duration,
+            total_duration = try_parse_int(tags.duration) or 0
+        else:
+            # there could be multiple files for this audiobook in the same folder,
+            # where each file is a portion/chapter of the audiobook
+            # try to gather the chapters by traversing files in the same folder
+            chapter_file_tags: list[AudioTags] = []
+            abs_path = self.get_absolute_path(audiobook_file_item.parent_path)
+            for item in await asyncio.to_thread(
+                sorted_scandir, self.base_path, abs_path, sort=True
+            ):
+                if "." not in item.relative_path or item.is_dir:
+                    continue
+                if item.ext not in AUDIOBOOK_EXTENSIONS:
+                    continue
+                item_tags = await async_parse_tags(item.absolute_path, item.file_size)
+                if not (tags.album == item_tags.album or (item_tags.tags.get("title") is None)):
+                    continue
+                if item_tags.track is None:
+                    continue
+                chapter_file_tags.append(item_tags)
+            chapter_file_tags.sort(key=lambda x: x.track or 0)
+            for chapter_tags in chapter_file_tags:
+                assert chapter_tags.duration is not None
+                chapters.append(
+                    MediaItemChapter(
+                        position=chapter_tags.track or 0,
+                        name=chapter_tags.title,
+                        start=total_duration,
+                        end=total_duration + chapter_tags.duration,
+                    )
                 )
-            )
-            all_chapter_files.append(
-                (get_relative_path(self.base_path, chapter_tags.filename), chapter_tags.duration)
-            )
-            total_duration += chapter_tags.duration
+                all_chapter_files.append(
+                    (
+                        get_relative_path(self.base_path, chapter_tags.filename),
+                        chapter_tags.duration,
+                    )
+                )
+                total_duration += chapter_tags.duration
+
         # store chapter files in cache
         # for easy access from streamdetails
         await self.cache.set(
index 85554c953e571326aa863698858b21c6890e6e24..b3852cc6fca610acd41b71fc3fe069517a2aa94a 100644 (file)
@@ -24,7 +24,7 @@ dependencies = [
   "ifaddr==0.2.0",
   "mashumaro==3.15",
   "music-assistant-frontend==2.13.1",
-  "music-assistant-models==1.1.35",
+  "music-assistant-models==1.1.36",
   "mutagen==1.47.0",
   "orjson==3.10.15",
   "pillow==11.1.0",
index c7ed93c9d917f864b054f45d92d69cb7ee99e377..88659f33c053eca0de83ade7eea99ec366723822 100644 (file)
@@ -27,7 +27,7 @@ ifaddr==0.2.0
 liblistenbrainz==0.5.6
 mashumaro==3.15
 music-assistant-frontend==2.13.1
-music-assistant-models==1.1.35
+music-assistant-models==1.1.36
 mutagen==1.47.0
 orjson==3.10.15
 pillow==11.1.0