HTTP_HEADERS = {"User-Agent": "Lavf/60.16.100.MusicAssistant"}
HTTP_HEADERS_ICY = {**HTTP_HEADERS, "Icy-MetaData": "1"}
+SLOW_PROVIDERS = ("tidal", "ytmusic", "apple_music")
+
async def remove_file(file_path: str) -> None:
"""Remove file path (if it exists)."""
async def create(self) -> None:
"""Create the cache file (if needed)."""
- if (
- self._cache_file is not None
- and await asyncio.to_thread(os.path.exists, self._cache_file)
- and self._first_part_received.is_set()
+ if self._cache_file is not None and await asyncio.to_thread(
+ os.path.exists, self._cache_file
):
# cache file already exists
- return
- # use cache controller to store the translation of uri-->cache file
- if (
- stored_cache_file := await self.mass.cache.get(
- self.streamdetails.uri, base_key="audiocache"
- )
- ) and await asyncio.to_thread(os.path.exists, stored_cache_file):
- # cache file already exists in memory
- self._cache_file = stored_cache_file
- return
+ self.mass.cancel_timer(f"remove_cache_file_{self._cache_file}")
+ if self._first_part_received.is_set():
+ # cache file is ready, no action needed
+ return
else:
# create new cache file
cache_id = shortuuid.random(30)
self._cache_file = os.path.join(self.mass.streams.audio_cache_dir, cache_id)
- await self.mass.cache.set(
- self.streamdetails.uri,
- self._cache_file,
- base_key="audiocache",
- )
# start fetch task if its not already running
if self._fetch_task is None:
self._fetch_task = self.mass.create_task(self._create_cache_file())
# wait until the first part of the file is received
await self._first_part_received.wait()
+ def release(self) -> None:
+ """Release the cache file."""
+ self._subscribers -= 1
+ if self._subscribers <= 0:
+ # schedule removal of the cache file
+ self.mass.call_later(
+ 1800, remove_file, self._cache_file, task_id=f"remove_cache_file_{self._cache_file}"
+ )
+
async def get_audio_stream(self) -> str | AsyncGenerator[bytes, None]:
"""
Get the cached audio stream.
If the file is not yet ready, it will return an async generator that will
stream the (intermediate) audio data from the cache file.
"""
+ self._subscribers += 1
async def _stream_from_cache() -> AsyncGenerator[bytes, None]:
chunksize = get_chunksize(self.streamdetails.audio_format, 1)
if self._all_data_written.is_set():
# cache file is ready
return self._cache_file
+
# cache file does not exist at all (or is still being written)
await self.create()
return _stream_from_cache()
return False
if streamdetails.stream_type in (StreamType.ICY, StreamType.LOCAL_FILE, StreamType.UNKNOWN):
return False
+ if streamdetails.stream_type == StreamType.LOCAL_FILE:
+ # no need to cache local files
+ return False
allow_cache = mass.config.get_raw_core_config_value(
"streams", CONF_ALLOW_AUDIO_CACHE, mass.streams.allow_cache_default
)
if not streamdetails.duration:
# we can't determine filesize without duration so play it safe and dont allow cache
return False
+ estimated_filesize = get_chunksize(streamdetails.audio_format, streamdetails.duration)
if streamdetails.stream_type == StreamType.MULTI_FILE:
# prefer cache to speedup multi-file streams
- # (if total filesize smaller than 5GB)
- max_filesize = 5 * 1024 * 1024 * 1024
- return get_chunksize(streamdetails.audio_format, streamdetails.duration) < max_filesize
- if streamdetails.stream_type == StreamType.CUSTOM:
+ # (if total filesize smaller than 1.5GB)
+ max_filesize = 1.5 * 1024 * 1024 * 1024
+ elif streamdetails.stream_type == StreamType.CUSTOM:
# prefer cache for custom streams (to speedup seeking)
- # (if total filesize smaller than 500MB)
- max_filesize = 500 * 1024 * 1024
+ max_filesize = 250 * 1024 * 1024 # 250MB
return get_chunksize(streamdetails.audio_format, streamdetails.duration) < max_filesize
- if streamdetails.stream_type == StreamType.HLS:
+ elif streamdetails.stream_type == StreamType.HLS:
# prefer cache for HLS streams (to speedup seeking)
- # (if total filesize smaller than 500MB)
- max_filesize = 500 * 1024 * 1024
- return get_chunksize(streamdetails.audio_format, streamdetails.duration) < max_filesize
- # deny for all other stream types
- return False
+ max_filesize = 250 * 1024 * 1024 # 250MB
+ elif streamdetails.provider in SLOW_PROVIDERS:
+ # prefer cache for slow providers
+ max_filesize = 250 * 1024 * 1024 # 250MB
+ else:
+ max_filesize = 25 * 1024 * 1024
+
+ return estimated_filesize < max_filesize
async def get_media_stream(
if finished and not streamdetails.seek_position and seconds_streamed:
streamdetails.duration = seconds_streamed
+ # release cache if needed
+ if cache := streamdetails.cache:
+ cache = cast(StreamCache, streamdetails.cache)
+ cache.release()
+
# parse loudnorm data if we have that collected (and enabled)
if (
(streamdetails.loudness is None or finished)
streamdetails.uri,
loudness,
)
+ finally:
+ # release cache if needed
+ if cache := streamdetails.cache:
+ cache = cast(StreamCache, streamdetails.cache)
+ cache.release()
def _get_normalization_mode(