From 9f0011071a4848fef12c55191aee18ec95acafbc Mon Sep 17 00:00:00 2001 From: Marcel van der Veldt Date: Thu, 27 Mar 2025 00:31:29 +0100 Subject: [PATCH] Finalize cache implementation latest tweaks to caching of audio: - ensure local file and not tmpfs - check disk percentage free - prevent file deletion when still in use - some more failsafes --- music_assistant/controllers/streams.py | 59 ++++++++++---- music_assistant/helpers/audio.py | 105 +++++++++++++++---------- music_assistant/helpers/process.py | 2 + music_assistant/helpers/util.py | 41 +++++----- 4 files changed, 132 insertions(+), 75 deletions(-) diff --git a/music_assistant/controllers/streams.py b/music_assistant/controllers/streams.py index dd992f29..28455111 100644 --- a/music_assistant/controllers/streams.py +++ b/music_assistant/controllers/streams.py @@ -13,7 +13,7 @@ import os import shutil import urllib.parse from collections.abc import AsyncGenerator -from typing import TYPE_CHECKING, Final +from typing import TYPE_CHECKING from aiofiles.os import wrap from aiohttp import web @@ -52,8 +52,8 @@ from music_assistant.constants import ( SILENCE_FILE, VERBOSE_LOG_LEVEL, ) -from music_assistant.helpers.audio import LOGGER as AUDIO_LOGGER from music_assistant.helpers.audio import ( + CACHE_FILES_IN_USE, crossfade_pcm_parts, get_chunksize, get_media_stream, @@ -61,13 +61,15 @@ from music_assistant.helpers.audio import ( get_silence, get_stream_details, ) +from music_assistant.helpers.audio import LOGGER as AUDIO_LOGGER from music_assistant.helpers.ffmpeg import LOGGER as FFMPEG_LOGGER from music_assistant.helpers.ffmpeg import check_ffmpeg_version, get_ffmpeg_stream from music_assistant.helpers.util import ( + get_folder_size, get_free_space, + get_free_space_percentage, get_ip, get_ips, - has_enough_space, select_free_port, try_parse_bool, ) @@ -84,8 +86,6 @@ if TYPE_CHECKING: isfile = wrap(os.path.isfile) -AUDIO_CACHE_MAX_SIZE: Final[int] = 2 # 2gb - def parse_pcm_info(content_type: str) -> tuple[int, int, int]: """Parse PCM info from a codec/content_type string.""" @@ -250,18 +250,19 @@ class StreamsController(CoreController): FFMPEG_LOGGER.setLevel(self.logger.level) # perform check for ffmpeg version await check_ffmpeg_version() - # note that on HAOS we run /tmp in tmpfs so we need to check if we're running - # on a system that has enough space to store the audio cache in the tmpfs - # if not, we choose another location - if await get_free_space("/tmp") < AUDIO_CACHE_MAX_SIZE * 1.5: # noqa: S108 - self._audio_cache_dir = os.path.join(os.path.expanduser("~"), ".audio") + # select a folder to store temporary audio cache files + # note that on HAOS we run /tmp in tmpfs so we need + # to pick another temporary location which is not /tmp + # we prefer the root/user dir because on the docker install + # it will be cleaned up on a reboot + self._audio_cache_dir = os.path.join(os.path.expanduser("~"), ".audio") if not await asyncio.to_thread(os.path.isdir, self._audio_cache_dir): await asyncio.to_thread(os.makedirs, self._audio_cache_dir) - self.allow_cache_default = ( - "auto" - if await has_enough_space(self._audio_cache_dir, AUDIO_CACHE_MAX_SIZE * 1.5) - else "disabled" - ) + # enable cache by default if we have enough free space only + disk_percentage_free = await get_free_space_percentage(self._audio_cache_dir) + self.allow_cache_default = "auto" if disk_percentage_free > 25 else "disabled" + # schedule cleanup of old audio cache files + await self._clean_audio_cache() # start the webserver self.publish_port = config.get_value(CONF_BIND_PORT) self.publish_ip = config.get_value(CONF_PUBLISH_IP) @@ -1097,3 +1098,31 @@ class StreamsController(CoreController): bit_depth=DEFAULT_PCM_FORMAT.bit_depth, channels=2, ) + + async def _clean_audio_cache(self) -> None: + """Clean up audio cache periodically.""" + free_space_in_cache_dir = await get_free_space(self._audio_cache_dir) + # calculate max cache size based on free space in cache dir + max_cache_size = min(15, free_space_in_cache_dir * 0.2) + cache_enabled = await self.mass.config.get_core_config_value( + self.domain, CONF_ALLOW_AUDIO_CACHE + ) + if cache_enabled == "disabled": + max_cache_size = 0.001 + + def _clean_old_files(foldersize: float): + files: list[os.DirEntry] = [x for x in os.scandir(self._audio_cache_dir) if x.is_file()] + files.sort(key=lambda x: x.stat().st_atime) + for _file in files: + if _file.path in CACHE_FILES_IN_USE: + continue + foldersize -= _file.stat().st_size / float(1 << 30) + os.remove(_file.path) + if foldersize < max_cache_size: + return + + foldersize = await get_folder_size(self._audio_cache_dir) + if foldersize > max_cache_size: + await asyncio.to_thread(_clean_old_files, foldersize) + # reschedule self + self.mass.call_later(3600, self._clean_audio_cache) diff --git a/music_assistant/helpers/audio.py b/music_assistant/helpers/audio.py index fe17fbb1..4df5a00a 100644 --- a/music_assistant/helpers/audio.py +++ b/music_assistant/helpers/audio.py @@ -10,7 +10,7 @@ import struct import time from collections.abc import AsyncGenerator from io import BytesIO -from typing import TYPE_CHECKING, cast +from typing import TYPE_CHECKING, Final, cast import aiofiles import shortuuid @@ -45,7 +45,7 @@ from music_assistant.constants import ( VERBOSE_LOG_LEVEL, ) from music_assistant.helpers.json import JSON_DECODE_EXCEPTIONS, json_loads -from music_assistant.helpers.util import clean_stream_title +from music_assistant.helpers.util import clean_stream_title, remove_file from .datetime import utc from .dsp import filter_to_ffmpeg_params @@ -71,13 +71,8 @@ HTTP_HEADERS_ICY = {**HTTP_HEADERS, "Icy-MetaData": "1"} SLOW_PROVIDERS = ("tidal", "ytmusic", "apple_music") - -async def remove_file(file_path: str) -> None: - """Remove file path (if it exists).""" - if not await asyncio.to_thread(os.path.exists, file_path): - return - await asyncio.to_thread(os.remove, file_path) - LOGGER.log(VERBOSE_LOG_LEVEL, "Removed cache file: %s", file_path) +CACHE_BASE_KEY: Final[str] = "audio_cache_path" +CACHE_FILES_IN_USE: set[str] = set() class StreamCache: @@ -91,18 +86,28 @@ class StreamCache: async def create(self) -> None: """Create the cache file (if needed).""" - if self._cache_file is not None and await asyncio.to_thread( - os.path.exists, self._cache_file - ): - # cache file already exists - self.mass.cancel_timer(f"remove_cache_file_{self._cache_file}") - if self._first_part_received.is_set(): - # cache file is ready, no action needed - return - else: - # create new cache file - cache_id = shortuuid.random(30) - self._cache_file = os.path.join(self.mass.streams.audio_cache_dir, cache_id) + if self._cache_file is None: + if cached_cache_path := await self.mass.cache.get( + self.streamdetails.uri, base_key=CACHE_BASE_KEY + ): + # we have a mapping stored for this uri, prefer that + self._cache_file = cached_cache_path + if await asyncio.to_thread(os.path.exists, self._cache_file): + # cache file already exists from a previous session, + # we can simply use that, there is nothing to create + CACHE_FILES_IN_USE.add(self._cache_file) + return + else: + # create new cache file + cache_id = shortuuid.random(30) + self._cache_file = cache_file = os.path.join( + self.mass.streams.audio_cache_dir, cache_id + ) + await self.mass.cache.set( + self.streamdetails.uri, cache_file, base_key=CACHE_BASE_KEY + ) + # mark file as in-use to prevent it being deleted + CACHE_FILES_IN_USE.add(self._cache_file) # start fetch task if its not already running if self._fetch_task is None: self._fetch_task = self.mass.create_task(self._create_cache_file()) @@ -113,10 +118,7 @@ class StreamCache: """Release the cache file.""" self._subscribers -= 1 if self._subscribers <= 0: - # schedule removal of the cache file - self.mass.call_later( - 1800, remove_file, self._cache_file, task_id=f"remove_cache_file_{self._cache_file}" - ) + CACHE_FILES_IN_USE.discard(self._cache_file) async def get_audio_stream(self) -> str | AsyncGenerator[bytes, None]: """ @@ -127,11 +129,14 @@ class StreamCache: stream the (intermediate) audio data from the cache file. """ self._subscribers += 1 + # mark file as in-use to prevent it being deleted + CACHE_FILES_IN_USE.add(self._cache_file) async def _stream_from_cache() -> AsyncGenerator[bytes, None]: chunksize = get_chunksize(self.streamdetails.audio_format, 1) + wait_loops = 0 async with aiofiles.open(self._cache_file, "rb") as file: - while True: + while wait_loops < 2000: chunk = await file.read(chunksize) if chunk: yield chunk @@ -143,13 +148,15 @@ class StreamCache: else: # data is not yet available, wait a bit await asyncio.sleep(0.05) + # prevent an infinite loop in case of an error + wait_loops += 1 if await asyncio.to_thread(os.path.exists, self._cache_file): if self._fetch_task is None: - # a complete cache file already exists on disk from a previous run + # a complete cache file already exists on disk (from a previous run) return self._cache_file if self._all_data_written.is_set(): - # cache file is ready + # cache file was created recently but ready return self._cache_file # cache file does not exist at all (or is still being written) @@ -159,6 +166,9 @@ class StreamCache: async def _create_cache_file(self) -> None: time_start = time.time() self.logger.debug("Creating audio cache for %s", self.streamdetails.uri) + CACHE_FILES_IN_USE.add(self._cache_file) + self._first_part_received.clear() + self._all_data_written.clear() extra_input_args = self.org_extra_input_args or [] if self.org_stream_type == StreamType.CUSTOM: audio_source = self.mass.get_provider(self.streamdetails.provider).get_audio_stream( @@ -185,8 +195,6 @@ class StreamCache: # and it also accounts for complicated cases such as encrypted streams or # m4a/mp4 streams with the moov atom at the end of the file. # ffmpeg will produce a lossless copy of the original codec. - self._first_part_received.clear() - self._all_data_written.clear() try: ffmpeg_proc = FFMpeg( audio_input=audio_source, @@ -205,19 +213,26 @@ class StreamCache: if await asyncio.to_thread(os.path.getsize, self._cache_file) > 64000: break - self._first_part_received.set() - self.logger.debug( - "First part received for %s after %.2fs", - self.streamdetails.uri, - time.time() - time_start, - ) + # set 'first part received' event to signal that the first part of the file is ready + # this is useful for the get_audio_stream method to know when it can start streaming + # we do guard for the returncode here, because if ffmpeg exited abnormally, we should + # not signal that the first part is ready + if ffmpeg_proc.returncode in (None, 0): + self._first_part_received.set() + self.logger.debug( + "First part received for %s after %.2fs", + self.streamdetails.uri, + time.time() - time_start, + ) # wait until ffmpeg is done await ffmpeg_proc.wait() + # raise an error if ffmpeg exited with a non-zero code if ffmpeg_proc.returncode != 0: ffmpeg_proc.logger.warning("\n".join(ffmpeg_proc.log_history)) raise AudioError(f"FFMpeg error {ffmpeg_proc.returncode}") + # set 'all data written' event to signal that the entire file is ready self._all_data_written.set() self.logger.debug( "Writing all data for %s done in %.2fs", @@ -226,11 +241,17 @@ class StreamCache: ) except Exception as err: self.logger.error("Error while creating cache for %s: %s", self.streamdetails.uri, err) - # remove the cache file - await remove_file(self._cache_file) + # make sure that the (corrupted/incomplete) cache file is removed + await self._remove_cache_file() finally: await ffmpeg_proc.close() + async def _remove_cache_file(self) -> None: + self._first_part_received.clear() + self._all_data_written.clear() + self._fetch_task = None + await remove_file(self._cache_file) + def __init__(self, mass: MusicAssistant, streamdetails: StreamDetails) -> None: """Initialize the StreamCache.""" self.mass = mass @@ -604,7 +625,7 @@ async def _is_cache_allowed(mass: MusicAssistant, streamdetails: StreamDetails) ) if allow_cache == "disabled": return False - if not await has_enough_space(mass.streams.audio_cache_dir, 0.5): + if not await has_enough_space(mass.streams.audio_cache_dir, 5): return False if allow_cache == "always": return True @@ -618,8 +639,8 @@ async def _is_cache_allowed(mass: MusicAssistant, streamdetails: StreamDetails) estimated_filesize = get_chunksize(streamdetails.audio_format, streamdetails.duration) if streamdetails.stream_type == StreamType.MULTI_FILE: # prefer cache to speedup multi-file streams - # (if total filesize smaller than 1.5GB) - max_filesize = 1.5 * 1024 * 1024 * 1024 + # (if total filesize smaller than 2GB) + max_filesize = 2 * 1024 * 1024 * 1024 elif streamdetails.stream_type == StreamType.CUSTOM: # prefer cache for custom streams (to speedup seeking) max_filesize = 250 * 1024 * 1024 # 250MB @@ -629,7 +650,7 @@ async def _is_cache_allowed(mass: MusicAssistant, streamdetails: StreamDetails) max_filesize = 250 * 1024 * 1024 # 250MB elif streamdetails.provider in SLOW_PROVIDERS: # prefer cache for slow providers - max_filesize = 250 * 1024 * 1024 # 250MB + max_filesize = 500 * 1024 * 1024 # 500MB else: max_filesize = 25 * 1024 * 1024 diff --git a/music_assistant/helpers/process.py b/music_assistant/helpers/process.py index fc992335..eadae4ce 100644 --- a/music_assistant/helpers/process.py +++ b/music_assistant/helpers/process.py @@ -192,6 +192,8 @@ class AsyncProcess: async def close(self, send_signal: bool = False) -> None: """Close/terminate the process and wait for exit.""" self._close_called = True + if not self.proc: + return if send_signal and self.returncode is None: self.proc.send_signal(SIGINT) if self.proc.stdin and not self.proc.stdin.is_closing(): diff --git a/music_assistant/helpers/util.py b/music_assistant/helpers/util.py index a05796e5..f51d6bb0 100644 --- a/music_assistant/helpers/util.py +++ b/music_assistant/helpers/util.py @@ -26,6 +26,7 @@ import cchardet as chardet import ifaddr from zeroconf import IPVersion +from music_assistant.constants import VERBOSE_LOG_LEVEL from music_assistant.helpers.process import check_output if TYPE_CHECKING: @@ -299,24 +300,6 @@ async def get_folder_size(folderpath: str) -> float: return await asyncio.to_thread(_get_folder_size, folderpath) -async def clean_old_files(folderpath: str, max_size: float) -> None: - """Clean old files in folder to make room for new files.""" - foldersize = await get_folder_size(folderpath) - if foldersize < max_size: - return - - def _clean_old_files(foldersize: float): - files: list[os.DirEntry] = [x for x in os.scandir(folderpath) if x.is_file()] - files.sort(key=lambda x: x.stat().st_mtime) - for _file in files: - foldersize -= _file.stat().st_size / float(1 << 30) - os.remove(_file.path) - if foldersize < max_size: - return - - await asyncio.to_thread(_clean_old_files, foldersize) - - def get_changed_keys( dict1: dict[str, Any], dict2: dict[str, Any], @@ -505,6 +488,20 @@ async def get_free_space(folder: str) -> float: return await asyncio.to_thread(_get_free_space, folder) +async def get_free_space_percentage(folder: str) -> float: + """Return free space on given folderpath in percentage.""" + + def _get_free_space(folder: str) -> float: + """Return free space on given folderpath in GB.""" + try: + if res := shutil.disk_usage(folder): + return res.free / res.total * 100 + except (FileNotFoundError, OSError, PermissionError): + return 0.0 + + return await asyncio.to_thread(_get_free_space, folder) + + async def has_enough_space(folder: str, size: int) -> bool: """Check if folder has enough free space.""" return await get_free_space(folder) > size @@ -516,6 +513,14 @@ def divide_chunks(data: bytes, chunk_size: int) -> Iterator[bytes]: yield data[i : i + chunk_size] +async def remove_file(file_path: str) -> None: + """Remove file path (if it exists).""" + if not await asyncio.to_thread(os.path.exists, file_path): + return + await asyncio.to_thread(os.remove, file_path) + LOGGER.log(VERBOSE_LOG_LEVEL, "Removed file: %s", file_path) + + def get_primary_ip_address_from_zeroconf(discovery_info: AsyncServiceInfo) -> str | None: """Get primary IP address from zeroconf discovery info.""" for address in discovery_info.parsed_addresses(IPVersion.V4Only): -- 2.34.1