Finalize cache implementation
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Wed, 26 Mar 2025 23:31:29 +0000 (00:31 +0100)
committerMarcel van der Veldt <m.vanderveldt@outlook.com>
Wed, 26 Mar 2025 23:31:29 +0000 (00:31 +0100)
latest tweaks to caching of audio:
- ensure local file and not tmpfs
- check disk percentage free
- prevent file deletion when still in use
- some more failsafes

music_assistant/controllers/streams.py
music_assistant/helpers/audio.py
music_assistant/helpers/process.py
music_assistant/helpers/util.py

index dd992f294459b574b2d2e45b31666be89ae6c3b7..284551118210822d24ba5b28a9add1845116ba32 100644 (file)
@@ -13,7 +13,7 @@ import os
 import shutil
 import urllib.parse
 from collections.abc import AsyncGenerator
-from typing import TYPE_CHECKING, Final
+from typing import TYPE_CHECKING
 
 from aiofiles.os import wrap
 from aiohttp import web
@@ -52,8 +52,8 @@ from music_assistant.constants import (
     SILENCE_FILE,
     VERBOSE_LOG_LEVEL,
 )
-from music_assistant.helpers.audio import LOGGER as AUDIO_LOGGER
 from music_assistant.helpers.audio import (
+    CACHE_FILES_IN_USE,
     crossfade_pcm_parts,
     get_chunksize,
     get_media_stream,
@@ -61,13 +61,15 @@ from music_assistant.helpers.audio import (
     get_silence,
     get_stream_details,
 )
+from music_assistant.helpers.audio import LOGGER as AUDIO_LOGGER
 from music_assistant.helpers.ffmpeg import LOGGER as FFMPEG_LOGGER
 from music_assistant.helpers.ffmpeg import check_ffmpeg_version, get_ffmpeg_stream
 from music_assistant.helpers.util import (
+    get_folder_size,
     get_free_space,
+    get_free_space_percentage,
     get_ip,
     get_ips,
-    has_enough_space,
     select_free_port,
     try_parse_bool,
 )
@@ -84,8 +86,6 @@ if TYPE_CHECKING:
 
 isfile = wrap(os.path.isfile)
 
-AUDIO_CACHE_MAX_SIZE: Final[int] = 2  # 2gb
-
 
 def parse_pcm_info(content_type: str) -> tuple[int, int, int]:
     """Parse PCM info from a codec/content_type string."""
@@ -250,18 +250,19 @@ class StreamsController(CoreController):
         FFMPEG_LOGGER.setLevel(self.logger.level)
         # perform check for ffmpeg version
         await check_ffmpeg_version()
-        # note that on HAOS we run /tmp in tmpfs so we need to check if we're running
-        # on a system that has enough space to store the audio cache in the tmpfs
-        # if not, we choose another location
-        if await get_free_space("/tmp") < AUDIO_CACHE_MAX_SIZE * 1.5:  # noqa: S108
-            self._audio_cache_dir = os.path.join(os.path.expanduser("~"), ".audio")
+        # select a folder to store temporary audio cache files
+        # note that on HAOS we run /tmp in tmpfs so we need
+        # to pick another temporary location which is not /tmp
+        # we prefer the root/user dir because on the docker install
+        # it will be cleaned up on a reboot
+        self._audio_cache_dir = os.path.join(os.path.expanduser("~"), ".audio")
         if not await asyncio.to_thread(os.path.isdir, self._audio_cache_dir):
             await asyncio.to_thread(os.makedirs, self._audio_cache_dir)
-        self.allow_cache_default = (
-            "auto"
-            if await has_enough_space(self._audio_cache_dir, AUDIO_CACHE_MAX_SIZE * 1.5)
-            else "disabled"
-        )
+        # enable cache by default if we have enough free space only
+        disk_percentage_free = await get_free_space_percentage(self._audio_cache_dir)
+        self.allow_cache_default = "auto" if disk_percentage_free > 25 else "disabled"
+        # schedule cleanup of old audio cache files
+        await self._clean_audio_cache()
         # start the webserver
         self.publish_port = config.get_value(CONF_BIND_PORT)
         self.publish_ip = config.get_value(CONF_PUBLISH_IP)
@@ -1097,3 +1098,31 @@ class StreamsController(CoreController):
             bit_depth=DEFAULT_PCM_FORMAT.bit_depth,
             channels=2,
         )
+
+    async def _clean_audio_cache(self) -> None:
+        """Clean up audio cache periodically."""
+        free_space_in_cache_dir = await get_free_space(self._audio_cache_dir)
+        # calculate max cache size based on free space in cache dir
+        max_cache_size = min(15, free_space_in_cache_dir * 0.2)
+        cache_enabled = await self.mass.config.get_core_config_value(
+            self.domain, CONF_ALLOW_AUDIO_CACHE
+        )
+        if cache_enabled == "disabled":
+            max_cache_size = 0.001
+
+        def _clean_old_files(foldersize: float):
+            files: list[os.DirEntry] = [x for x in os.scandir(self._audio_cache_dir) if x.is_file()]
+            files.sort(key=lambda x: x.stat().st_atime)
+            for _file in files:
+                if _file.path in CACHE_FILES_IN_USE:
+                    continue
+                foldersize -= _file.stat().st_size / float(1 << 30)
+                os.remove(_file.path)
+                if foldersize < max_cache_size:
+                    return
+
+        foldersize = await get_folder_size(self._audio_cache_dir)
+        if foldersize > max_cache_size:
+            await asyncio.to_thread(_clean_old_files, foldersize)
+        # reschedule self
+        self.mass.call_later(3600, self._clean_audio_cache)
index fe17fbb16cee4b5ae1132b9bb7cad19971ee6621..4df5a00ab2db1e6754768366db982a6088d946a8 100644 (file)
@@ -10,7 +10,7 @@ import struct
 import time
 from collections.abc import AsyncGenerator
 from io import BytesIO
-from typing import TYPE_CHECKING, cast
+from typing import TYPE_CHECKING, Final, cast
 
 import aiofiles
 import shortuuid
@@ -45,7 +45,7 @@ from music_assistant.constants import (
     VERBOSE_LOG_LEVEL,
 )
 from music_assistant.helpers.json import JSON_DECODE_EXCEPTIONS, json_loads
-from music_assistant.helpers.util import clean_stream_title
+from music_assistant.helpers.util import clean_stream_title, remove_file
 
 from .datetime import utc
 from .dsp import filter_to_ffmpeg_params
@@ -71,13 +71,8 @@ HTTP_HEADERS_ICY = {**HTTP_HEADERS, "Icy-MetaData": "1"}
 
 SLOW_PROVIDERS = ("tidal", "ytmusic", "apple_music")
 
-
-async def remove_file(file_path: str) -> None:
-    """Remove file path (if it exists)."""
-    if not await asyncio.to_thread(os.path.exists, file_path):
-        return
-    await asyncio.to_thread(os.remove, file_path)
-    LOGGER.log(VERBOSE_LOG_LEVEL, "Removed cache file: %s", file_path)
+CACHE_BASE_KEY: Final[str] = "audio_cache_path"
+CACHE_FILES_IN_USE: set[str] = set()
 
 
 class StreamCache:
@@ -91,18 +86,28 @@ class StreamCache:
 
     async def create(self) -> None:
         """Create the cache file (if needed)."""
-        if self._cache_file is not None and await asyncio.to_thread(
-            os.path.exists, self._cache_file
-        ):
-            # cache file already exists
-            self.mass.cancel_timer(f"remove_cache_file_{self._cache_file}")
-            if self._first_part_received.is_set():
-                # cache file is ready, no action needed
-                return
-        else:
-            # create new cache file
-            cache_id = shortuuid.random(30)
-            self._cache_file = os.path.join(self.mass.streams.audio_cache_dir, cache_id)
+        if self._cache_file is None:
+            if cached_cache_path := await self.mass.cache.get(
+                self.streamdetails.uri, base_key=CACHE_BASE_KEY
+            ):
+                # we have a mapping stored for this uri, prefer that
+                self._cache_file = cached_cache_path
+                if await asyncio.to_thread(os.path.exists, self._cache_file):
+                    # cache file already exists from a previous session,
+                    # we can simply use that, there is nothing to create
+                    CACHE_FILES_IN_USE.add(self._cache_file)
+                    return
+            else:
+                # create new cache file
+                cache_id = shortuuid.random(30)
+                self._cache_file = cache_file = os.path.join(
+                    self.mass.streams.audio_cache_dir, cache_id
+                )
+                await self.mass.cache.set(
+                    self.streamdetails.uri, cache_file, base_key=CACHE_BASE_KEY
+                )
+        # mark file as in-use to prevent it being deleted
+        CACHE_FILES_IN_USE.add(self._cache_file)
         # start fetch task if its not already running
         if self._fetch_task is None:
             self._fetch_task = self.mass.create_task(self._create_cache_file())
@@ -113,10 +118,7 @@ class StreamCache:
         """Release the cache file."""
         self._subscribers -= 1
         if self._subscribers <= 0:
-            # schedule removal of the cache file
-            self.mass.call_later(
-                1800, remove_file, self._cache_file, task_id=f"remove_cache_file_{self._cache_file}"
-            )
+            CACHE_FILES_IN_USE.discard(self._cache_file)
 
     async def get_audio_stream(self) -> str | AsyncGenerator[bytes, None]:
         """
@@ -127,11 +129,14 @@ class StreamCache:
         stream the (intermediate) audio data from the cache file.
         """
         self._subscribers += 1
+        # mark file as in-use to prevent it being deleted
+        CACHE_FILES_IN_USE.add(self._cache_file)
 
         async def _stream_from_cache() -> AsyncGenerator[bytes, None]:
             chunksize = get_chunksize(self.streamdetails.audio_format, 1)
+            wait_loops = 0
             async with aiofiles.open(self._cache_file, "rb") as file:
-                while True:
+                while wait_loops < 2000:
                     chunk = await file.read(chunksize)
                     if chunk:
                         yield chunk
@@ -143,13 +148,15 @@ class StreamCache:
                     else:
                         # data is not yet available, wait a bit
                         await asyncio.sleep(0.05)
+                        # prevent an infinite loop in case of an error
+                        wait_loops += 1
 
         if await asyncio.to_thread(os.path.exists, self._cache_file):
             if self._fetch_task is None:
-                # a complete cache file already exists on disk from a previous run
+                # a complete cache file already exists on disk (from a previous run)
                 return self._cache_file
             if self._all_data_written.is_set():
-                # cache file is ready
+                # cache file was created recently but ready
                 return self._cache_file
 
         # cache file does not exist at all (or is still being written)
@@ -159,6 +166,9 @@ class StreamCache:
     async def _create_cache_file(self) -> None:
         time_start = time.time()
         self.logger.debug("Creating audio cache for %s", self.streamdetails.uri)
+        CACHE_FILES_IN_USE.add(self._cache_file)
+        self._first_part_received.clear()
+        self._all_data_written.clear()
         extra_input_args = self.org_extra_input_args or []
         if self.org_stream_type == StreamType.CUSTOM:
             audio_source = self.mass.get_provider(self.streamdetails.provider).get_audio_stream(
@@ -185,8 +195,6 @@ class StreamCache:
         # and it also accounts for complicated cases such as encrypted streams or
         # m4a/mp4 streams with the moov atom at the end of the file.
         # ffmpeg will produce a lossless copy of the original codec.
-        self._first_part_received.clear()
-        self._all_data_written.clear()
         try:
             ffmpeg_proc = FFMpeg(
                 audio_input=audio_source,
@@ -205,19 +213,26 @@ class StreamCache:
                 if await asyncio.to_thread(os.path.getsize, self._cache_file) > 64000:
                     break
 
-            self._first_part_received.set()
-            self.logger.debug(
-                "First part received for %s after %.2fs",
-                self.streamdetails.uri,
-                time.time() - time_start,
-            )
+            # set 'first part received' event to signal that the first part of the file is ready
+            # this is useful for the get_audio_stream method to know when it can start streaming
+            # we do guard for the returncode here, because if ffmpeg exited abnormally, we should
+            # not signal that the first part is ready
+            if ffmpeg_proc.returncode in (None, 0):
+                self._first_part_received.set()
+                self.logger.debug(
+                    "First part received for %s after %.2fs",
+                    self.streamdetails.uri,
+                    time.time() - time_start,
+                )
             # wait until ffmpeg is done
             await ffmpeg_proc.wait()
 
+            # raise an error if ffmpeg exited with a non-zero code
             if ffmpeg_proc.returncode != 0:
                 ffmpeg_proc.logger.warning("\n".join(ffmpeg_proc.log_history))
                 raise AudioError(f"FFMpeg error {ffmpeg_proc.returncode}")
 
+            # set 'all data written' event to signal that the entire file is ready
             self._all_data_written.set()
             self.logger.debug(
                 "Writing all data for %s done in %.2fs",
@@ -226,11 +241,17 @@ class StreamCache:
             )
         except Exception as err:
             self.logger.error("Error while creating cache for %s: %s", self.streamdetails.uri, err)
-            # remove the cache file
-            await remove_file(self._cache_file)
+            # make sure that the (corrupted/incomplete) cache file is removed
+            await self._remove_cache_file()
         finally:
             await ffmpeg_proc.close()
 
+    async def _remove_cache_file(self) -> None:
+        self._first_part_received.clear()
+        self._all_data_written.clear()
+        self._fetch_task = None
+        await remove_file(self._cache_file)
+
     def __init__(self, mass: MusicAssistant, streamdetails: StreamDetails) -> None:
         """Initialize the StreamCache."""
         self.mass = mass
@@ -604,7 +625,7 @@ async def _is_cache_allowed(mass: MusicAssistant, streamdetails: StreamDetails)
     )
     if allow_cache == "disabled":
         return False
-    if not await has_enough_space(mass.streams.audio_cache_dir, 0.5):
+    if not await has_enough_space(mass.streams.audio_cache_dir, 5):
         return False
     if allow_cache == "always":
         return True
@@ -618,8 +639,8 @@ async def _is_cache_allowed(mass: MusicAssistant, streamdetails: StreamDetails)
     estimated_filesize = get_chunksize(streamdetails.audio_format, streamdetails.duration)
     if streamdetails.stream_type == StreamType.MULTI_FILE:
         # prefer cache to speedup multi-file streams
-        # (if total filesize smaller than 1.5GB)
-        max_filesize = 1.5 * 1024 * 1024 * 1024
+        # (if total filesize smaller than 2GB)
+        max_filesize = 2 * 1024 * 1024 * 1024
     elif streamdetails.stream_type == StreamType.CUSTOM:
         # prefer cache for custom streams (to speedup seeking)
         max_filesize = 250 * 1024 * 1024  # 250MB
@@ -629,7 +650,7 @@ async def _is_cache_allowed(mass: MusicAssistant, streamdetails: StreamDetails)
         max_filesize = 250 * 1024 * 1024  # 250MB
     elif streamdetails.provider in SLOW_PROVIDERS:
         # prefer cache for slow providers
-        max_filesize = 250 * 1024 * 1024  # 250MB
+        max_filesize = 500 * 1024 * 1024  # 500MB
     else:
         max_filesize = 25 * 1024 * 1024
 
index fc992335fdf7538b70c7b85a94dc04e3afebed59..eadae4cee0d185aac7e940f2eb2d7b6531c433f7 100644 (file)
@@ -192,6 +192,8 @@ class AsyncProcess:
     async def close(self, send_signal: bool = False) -> None:
         """Close/terminate the process and wait for exit."""
         self._close_called = True
+        if not self.proc:
+            return
         if send_signal and self.returncode is None:
             self.proc.send_signal(SIGINT)
         if self.proc.stdin and not self.proc.stdin.is_closing():
index a05796e561f49cb5d4b46e1c5b3456995cb19e12..f51d6bb0e7f365add80d98a89d38f28ae9bb2de4 100644 (file)
@@ -26,6 +26,7 @@ import cchardet as chardet
 import ifaddr
 from zeroconf import IPVersion
 
+from music_assistant.constants import VERBOSE_LOG_LEVEL
 from music_assistant.helpers.process import check_output
 
 if TYPE_CHECKING:
@@ -299,24 +300,6 @@ async def get_folder_size(folderpath: str) -> float:
     return await asyncio.to_thread(_get_folder_size, folderpath)
 
 
-async def clean_old_files(folderpath: str, max_size: float) -> None:
-    """Clean old files in folder to make room for new files."""
-    foldersize = await get_folder_size(folderpath)
-    if foldersize < max_size:
-        return
-
-    def _clean_old_files(foldersize: float):
-        files: list[os.DirEntry] = [x for x in os.scandir(folderpath) if x.is_file()]
-        files.sort(key=lambda x: x.stat().st_mtime)
-        for _file in files:
-            foldersize -= _file.stat().st_size / float(1 << 30)
-            os.remove(_file.path)
-            if foldersize < max_size:
-                return
-
-    await asyncio.to_thread(_clean_old_files, foldersize)
-
-
 def get_changed_keys(
     dict1: dict[str, Any],
     dict2: dict[str, Any],
@@ -505,6 +488,20 @@ async def get_free_space(folder: str) -> float:
     return await asyncio.to_thread(_get_free_space, folder)
 
 
+async def get_free_space_percentage(folder: str) -> float:
+    """Return free space on given folderpath in percentage."""
+
+    def _get_free_space(folder: str) -> float:
+        """Return free space on given folderpath in GB."""
+        try:
+            if res := shutil.disk_usage(folder):
+                return res.free / res.total * 100
+        except (FileNotFoundError, OSError, PermissionError):
+            return 0.0
+
+    return await asyncio.to_thread(_get_free_space, folder)
+
+
 async def has_enough_space(folder: str, size: int) -> bool:
     """Check if folder has enough free space."""
     return await get_free_space(folder) > size
@@ -516,6 +513,14 @@ def divide_chunks(data: bytes, chunk_size: int) -> Iterator[bytes]:
         yield data[i : i + chunk_size]
 
 
+async def remove_file(file_path: str) -> None:
+    """Remove file path (if it exists)."""
+    if not await asyncio.to_thread(os.path.exists, file_path):
+        return
+    await asyncio.to_thread(os.remove, file_path)
+    LOGGER.log(VERBOSE_LOG_LEVEL, "Removed file: %s", file_path)
+
+
 def get_primary_ip_address_from_zeroconf(discovery_info: AsyncServiceInfo) -> str | None:
     """Get primary IP address from zeroconf discovery info."""
     for address in discovery_info.parsed_addresses(IPVersion.V4Only):