Simplify cache logic even more
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Sun, 16 Mar 2025 10:38:26 +0000 (11:38 +0100)
committerMarcel van der Veldt <m.vanderveldt@outlook.com>
Sun, 16 Mar 2025 10:38:26 +0000 (11:38 +0100)
music_assistant/controllers/streams.py
music_assistant/helpers/audio.py

index 1eb3f4fcb33c44adac9959c58df0b75bde27bdee..dd992f294459b574b2d2e45b31666be89ae6c3b7 100644 (file)
@@ -64,7 +64,6 @@ from music_assistant.helpers.audio import (
 from music_assistant.helpers.ffmpeg import LOGGER as FFMPEG_LOGGER
 from music_assistant.helpers.ffmpeg import check_ffmpeg_version, get_ffmpeg_stream
 from music_assistant.helpers.util import (
-    clean_old_files,
     get_free_space,
     get_ip,
     get_ips,
@@ -263,8 +262,6 @@ class StreamsController(CoreController):
             if await has_enough_space(self._audio_cache_dir, AUDIO_CACHE_MAX_SIZE * 1.5)
             else "disabled"
         )
-        # schedule cleanup of old audio cache files
-        await self._clean_audio_cache()
         # start the webserver
         self.publish_port = config.get_value(CONF_BIND_PORT)
         self.publish_ip = config.get_value(CONF_PUBLISH_IP)
@@ -1100,15 +1097,3 @@ class StreamsController(CoreController):
             bit_depth=DEFAULT_PCM_FORMAT.bit_depth,
             channels=2,
         )
-
-    async def _clean_audio_cache(self) -> None:
-        """Clean up audio cache periodically."""
-        max_cache_size = AUDIO_CACHE_MAX_SIZE
-        cache_enabled = await self.mass.config.get_core_config_value(
-            self.domain, CONF_ALLOW_AUDIO_CACHE
-        )
-        if cache_enabled == "disabled":
-            max_cache_size = 0.001
-        await clean_old_files(self.audio_cache_dir, max_cache_size)
-        # reschedule self
-        self.mass.call_later(3600, self._clean_audio_cache)
index 52209c59de93d2c90219641b813504c6966eff8c..fe17fbb16cee4b5ae1132b9bb7cad19971ee6621 100644 (file)
@@ -69,6 +69,8 @@ LOGGER = logging.getLogger(f"{MASS_LOGGER_NAME}.audio")
 HTTP_HEADERS = {"User-Agent": "Lavf/60.16.100.MusicAssistant"}
 HTTP_HEADERS_ICY = {**HTTP_HEADERS, "Icy-MetaData": "1"}
 
+SLOW_PROVIDERS = ("tidal", "ytmusic", "apple_music")
+
 
 async def remove_file(file_path: str) -> None:
     """Remove file path (if it exists)."""
@@ -89,37 +91,33 @@ class StreamCache:
 
     async def create(self) -> None:
         """Create the cache file (if needed)."""
-        if (
-            self._cache_file is not None
-            and await asyncio.to_thread(os.path.exists, self._cache_file)
-            and self._first_part_received.is_set()
+        if self._cache_file is not None and await asyncio.to_thread(
+            os.path.exists, self._cache_file
         ):
             # cache file already exists
-            return
-        # use cache controller to store the translation of uri-->cache file
-        if (
-            stored_cache_file := await self.mass.cache.get(
-                self.streamdetails.uri, base_key="audiocache"
-            )
-        ) and await asyncio.to_thread(os.path.exists, stored_cache_file):
-            # cache file already exists in memory
-            self._cache_file = stored_cache_file
-            return
+            self.mass.cancel_timer(f"remove_cache_file_{self._cache_file}")
+            if self._first_part_received.is_set():
+                # cache file is ready, no action needed
+                return
         else:
             # create new cache file
             cache_id = shortuuid.random(30)
             self._cache_file = os.path.join(self.mass.streams.audio_cache_dir, cache_id)
-            await self.mass.cache.set(
-                self.streamdetails.uri,
-                self._cache_file,
-                base_key="audiocache",
-            )
         # start fetch task if its not already running
         if self._fetch_task is None:
             self._fetch_task = self.mass.create_task(self._create_cache_file())
         # wait until the first part of the file is received
         await self._first_part_received.wait()
 
+    def release(self) -> None:
+        """Release the cache file."""
+        self._subscribers -= 1
+        if self._subscribers <= 0:
+            # schedule removal of the cache file
+            self.mass.call_later(
+                1800, remove_file, self._cache_file, task_id=f"remove_cache_file_{self._cache_file}"
+            )
+
     async def get_audio_stream(self) -> str | AsyncGenerator[bytes, None]:
         """
         Get the cached audio stream.
@@ -128,6 +126,7 @@ class StreamCache:
         If the file is not yet ready, it will return an async generator that will
         stream the (intermediate) audio data from the cache file.
         """
+        self._subscribers += 1
 
         async def _stream_from_cache() -> AsyncGenerator[bytes, None]:
             chunksize = get_chunksize(self.streamdetails.audio_format, 1)
@@ -152,6 +151,7 @@ class StreamCache:
             if self._all_data_written.is_set():
                 # cache file is ready
                 return self._cache_file
+
         # cache file does not exist at all (or is still being written)
         await self.create()
         return _stream_from_cache()
@@ -596,6 +596,9 @@ async def _is_cache_allowed(mass: MusicAssistant, streamdetails: StreamDetails)
         return False
     if streamdetails.stream_type in (StreamType.ICY, StreamType.LOCAL_FILE, StreamType.UNKNOWN):
         return False
+    if streamdetails.stream_type == StreamType.LOCAL_FILE:
+        # no need to cache local files
+        return False
     allow_cache = mass.config.get_raw_core_config_value(
         "streams", CONF_ALLOW_AUDIO_CACHE, mass.streams.allow_cache_default
     )
@@ -612,23 +615,25 @@ async def _is_cache_allowed(mass: MusicAssistant, streamdetails: StreamDetails)
     if not streamdetails.duration:
         # we can't determine filesize without duration so play it safe and dont allow cache
         return False
+    estimated_filesize = get_chunksize(streamdetails.audio_format, streamdetails.duration)
     if streamdetails.stream_type == StreamType.MULTI_FILE:
         # prefer cache to speedup multi-file streams
-        # (if total filesize smaller than 5GB)
-        max_filesize = 5 * 1024 * 1024 * 1024
-        return get_chunksize(streamdetails.audio_format, streamdetails.duration) < max_filesize
-    if streamdetails.stream_type == StreamType.CUSTOM:
+        # (if total filesize smaller than 1.5GB)
+        max_filesize = 1.5 * 1024 * 1024 * 1024
+    elif streamdetails.stream_type == StreamType.CUSTOM:
         # prefer cache for custom streams (to speedup seeking)
-        # (if total filesize smaller than 500MB)
-        max_filesize = 500 * 1024 * 1024
+        max_filesize = 250 * 1024 * 1024  # 250MB
         return get_chunksize(streamdetails.audio_format, streamdetails.duration) < max_filesize
-    if streamdetails.stream_type == StreamType.HLS:
+    elif streamdetails.stream_type == StreamType.HLS:
         # prefer cache for HLS streams (to speedup seeking)
-        # (if total filesize smaller than 500MB)
-        max_filesize = 500 * 1024 * 1024
-        return get_chunksize(streamdetails.audio_format, streamdetails.duration) < max_filesize
-    # deny for all other stream types
-    return False
+        max_filesize = 250 * 1024 * 1024  # 250MB
+    elif streamdetails.provider in SLOW_PROVIDERS:
+        # prefer cache for slow providers
+        max_filesize = 250 * 1024 * 1024  # 250MB
+    else:
+        max_filesize = 25 * 1024 * 1024
+
+    return estimated_filesize < max_filesize
 
 
 async def get_media_stream(
@@ -817,6 +822,11 @@ async def get_media_stream(
         if finished and not streamdetails.seek_position and seconds_streamed:
             streamdetails.duration = seconds_streamed
 
+        # release cache if needed
+        if cache := streamdetails.cache:
+            cache = cast(StreamCache, streamdetails.cache)
+            cache.release()
+
         # parse loudnorm data if we have that collected (and enabled)
         if (
             (streamdetails.loudness is None or finished)
@@ -1499,6 +1509,11 @@ async def analyze_loudness(
                 streamdetails.uri,
                 loudness,
             )
+        finally:
+            # release cache if needed
+            if cache := streamdetails.cache:
+                cache = cast(StreamCache, streamdetails.cache)
+                cache.release()
 
 
 def _get_normalization_mode(