import shutil
import urllib.parse
from collections.abc import AsyncGenerator
-from typing import TYPE_CHECKING, Final
+from typing import TYPE_CHECKING
from aiofiles.os import wrap
from aiohttp import web
SILENCE_FILE,
VERBOSE_LOG_LEVEL,
)
-from music_assistant.helpers.audio import LOGGER as AUDIO_LOGGER
from music_assistant.helpers.audio import (
+ CACHE_FILES_IN_USE,
crossfade_pcm_parts,
get_chunksize,
get_media_stream,
get_silence,
get_stream_details,
)
+from music_assistant.helpers.audio import LOGGER as AUDIO_LOGGER
from music_assistant.helpers.ffmpeg import LOGGER as FFMPEG_LOGGER
from music_assistant.helpers.ffmpeg import check_ffmpeg_version, get_ffmpeg_stream
from music_assistant.helpers.util import (
+ get_folder_size,
get_free_space,
+ get_free_space_percentage,
get_ip,
get_ips,
- has_enough_space,
select_free_port,
try_parse_bool,
)
isfile = wrap(os.path.isfile)
-AUDIO_CACHE_MAX_SIZE: Final[int] = 2 # 2gb
-
def parse_pcm_info(content_type: str) -> tuple[int, int, int]:
"""Parse PCM info from a codec/content_type string."""
FFMPEG_LOGGER.setLevel(self.logger.level)
# perform check for ffmpeg version
await check_ffmpeg_version()
- # note that on HAOS we run /tmp in tmpfs so we need to check if we're running
- # on a system that has enough space to store the audio cache in the tmpfs
- # if not, we choose another location
- if await get_free_space("/tmp") < AUDIO_CACHE_MAX_SIZE * 1.5: # noqa: S108
- self._audio_cache_dir = os.path.join(os.path.expanduser("~"), ".audio")
+ # select a folder to store temporary audio cache files
+ # note that on HAOS we run /tmp in tmpfs so we need
+ # to pick another temporary location which is not /tmp
+ # we prefer the root/user dir because on the docker install
+ # it will be cleaned up on a reboot
+ self._audio_cache_dir = os.path.join(os.path.expanduser("~"), ".audio")
if not await asyncio.to_thread(os.path.isdir, self._audio_cache_dir):
await asyncio.to_thread(os.makedirs, self._audio_cache_dir)
- self.allow_cache_default = (
- "auto"
- if await has_enough_space(self._audio_cache_dir, AUDIO_CACHE_MAX_SIZE * 1.5)
- else "disabled"
- )
+ # enable cache by default if we have enough free space only
+ disk_percentage_free = await get_free_space_percentage(self._audio_cache_dir)
+ self.allow_cache_default = "auto" if disk_percentage_free > 25 else "disabled"
+ # schedule cleanup of old audio cache files
+ await self._clean_audio_cache()
# start the webserver
self.publish_port = config.get_value(CONF_BIND_PORT)
self.publish_ip = config.get_value(CONF_PUBLISH_IP)
bit_depth=DEFAULT_PCM_FORMAT.bit_depth,
channels=2,
)
+
+ async def _clean_audio_cache(self) -> None:
+ """Clean up audio cache periodically."""
+ free_space_in_cache_dir = await get_free_space(self._audio_cache_dir)
+ # calculate max cache size based on free space in cache dir
+ max_cache_size = min(15, free_space_in_cache_dir * 0.2)
+ cache_enabled = await self.mass.config.get_core_config_value(
+ self.domain, CONF_ALLOW_AUDIO_CACHE
+ )
+ if cache_enabled == "disabled":
+ max_cache_size = 0.001
+
+ def _clean_old_files(foldersize: float):
+ files: list[os.DirEntry] = [x for x in os.scandir(self._audio_cache_dir) if x.is_file()]
+ files.sort(key=lambda x: x.stat().st_atime)
+ for _file in files:
+ if _file.path in CACHE_FILES_IN_USE:
+ continue
+ foldersize -= _file.stat().st_size / float(1 << 30)
+ os.remove(_file.path)
+ if foldersize < max_cache_size:
+ return
+
+ foldersize = await get_folder_size(self._audio_cache_dir)
+ if foldersize > max_cache_size:
+ await asyncio.to_thread(_clean_old_files, foldersize)
+ # reschedule self
+ self.mass.call_later(3600, self._clean_audio_cache)
import time
from collections.abc import AsyncGenerator
from io import BytesIO
-from typing import TYPE_CHECKING, cast
+from typing import TYPE_CHECKING, Final, cast
import aiofiles
import shortuuid
VERBOSE_LOG_LEVEL,
)
from music_assistant.helpers.json import JSON_DECODE_EXCEPTIONS, json_loads
-from music_assistant.helpers.util import clean_stream_title
+from music_assistant.helpers.util import clean_stream_title, remove_file
from .datetime import utc
from .dsp import filter_to_ffmpeg_params
SLOW_PROVIDERS = ("tidal", "ytmusic", "apple_music")
-
-async def remove_file(file_path: str) -> None:
- """Remove file path (if it exists)."""
- if not await asyncio.to_thread(os.path.exists, file_path):
- return
- await asyncio.to_thread(os.remove, file_path)
- LOGGER.log(VERBOSE_LOG_LEVEL, "Removed cache file: %s", file_path)
+CACHE_BASE_KEY: Final[str] = "audio_cache_path"
+CACHE_FILES_IN_USE: set[str] = set()
class StreamCache:
async def create(self) -> None:
"""Create the cache file (if needed)."""
- if self._cache_file is not None and await asyncio.to_thread(
- os.path.exists, self._cache_file
- ):
- # cache file already exists
- self.mass.cancel_timer(f"remove_cache_file_{self._cache_file}")
- if self._first_part_received.is_set():
- # cache file is ready, no action needed
- return
- else:
- # create new cache file
- cache_id = shortuuid.random(30)
- self._cache_file = os.path.join(self.mass.streams.audio_cache_dir, cache_id)
+ if self._cache_file is None:
+ if cached_cache_path := await self.mass.cache.get(
+ self.streamdetails.uri, base_key=CACHE_BASE_KEY
+ ):
+ # we have a mapping stored for this uri, prefer that
+ self._cache_file = cached_cache_path
+ if await asyncio.to_thread(os.path.exists, self._cache_file):
+ # cache file already exists from a previous session,
+ # we can simply use that, there is nothing to create
+ CACHE_FILES_IN_USE.add(self._cache_file)
+ return
+ else:
+ # create new cache file
+ cache_id = shortuuid.random(30)
+ self._cache_file = cache_file = os.path.join(
+ self.mass.streams.audio_cache_dir, cache_id
+ )
+ await self.mass.cache.set(
+ self.streamdetails.uri, cache_file, base_key=CACHE_BASE_KEY
+ )
+ # mark file as in-use to prevent it being deleted
+ CACHE_FILES_IN_USE.add(self._cache_file)
# start fetch task if its not already running
if self._fetch_task is None:
self._fetch_task = self.mass.create_task(self._create_cache_file())
"""Release the cache file."""
self._subscribers -= 1
if self._subscribers <= 0:
- # schedule removal of the cache file
- self.mass.call_later(
- 1800, remove_file, self._cache_file, task_id=f"remove_cache_file_{self._cache_file}"
- )
+ CACHE_FILES_IN_USE.discard(self._cache_file)
async def get_audio_stream(self) -> str | AsyncGenerator[bytes, None]:
"""
stream the (intermediate) audio data from the cache file.
"""
self._subscribers += 1
+ # mark file as in-use to prevent it being deleted
+ CACHE_FILES_IN_USE.add(self._cache_file)
async def _stream_from_cache() -> AsyncGenerator[bytes, None]:
chunksize = get_chunksize(self.streamdetails.audio_format, 1)
+ wait_loops = 0
async with aiofiles.open(self._cache_file, "rb") as file:
- while True:
+ while wait_loops < 2000:
chunk = await file.read(chunksize)
if chunk:
yield chunk
else:
# data is not yet available, wait a bit
await asyncio.sleep(0.05)
+ # prevent an infinite loop in case of an error
+ wait_loops += 1
if await asyncio.to_thread(os.path.exists, self._cache_file):
if self._fetch_task is None:
- # a complete cache file already exists on disk from a previous run
+ # a complete cache file already exists on disk (from a previous run)
return self._cache_file
if self._all_data_written.is_set():
- # cache file is ready
+ # cache file was created recently but ready
return self._cache_file
# cache file does not exist at all (or is still being written)
async def _create_cache_file(self) -> None:
time_start = time.time()
self.logger.debug("Creating audio cache for %s", self.streamdetails.uri)
+ CACHE_FILES_IN_USE.add(self._cache_file)
+ self._first_part_received.clear()
+ self._all_data_written.clear()
extra_input_args = self.org_extra_input_args or []
if self.org_stream_type == StreamType.CUSTOM:
audio_source = self.mass.get_provider(self.streamdetails.provider).get_audio_stream(
# and it also accounts for complicated cases such as encrypted streams or
# m4a/mp4 streams with the moov atom at the end of the file.
# ffmpeg will produce a lossless copy of the original codec.
- self._first_part_received.clear()
- self._all_data_written.clear()
try:
ffmpeg_proc = FFMpeg(
audio_input=audio_source,
if await asyncio.to_thread(os.path.getsize, self._cache_file) > 64000:
break
- self._first_part_received.set()
- self.logger.debug(
- "First part received for %s after %.2fs",
- self.streamdetails.uri,
- time.time() - time_start,
- )
+ # set 'first part received' event to signal that the first part of the file is ready
+ # this is useful for the get_audio_stream method to know when it can start streaming
+ # we do guard for the returncode here, because if ffmpeg exited abnormally, we should
+ # not signal that the first part is ready
+ if ffmpeg_proc.returncode in (None, 0):
+ self._first_part_received.set()
+ self.logger.debug(
+ "First part received for %s after %.2fs",
+ self.streamdetails.uri,
+ time.time() - time_start,
+ )
# wait until ffmpeg is done
await ffmpeg_proc.wait()
+ # raise an error if ffmpeg exited with a non-zero code
if ffmpeg_proc.returncode != 0:
ffmpeg_proc.logger.warning("\n".join(ffmpeg_proc.log_history))
raise AudioError(f"FFMpeg error {ffmpeg_proc.returncode}")
+ # set 'all data written' event to signal that the entire file is ready
self._all_data_written.set()
self.logger.debug(
"Writing all data for %s done in %.2fs",
)
except Exception as err:
self.logger.error("Error while creating cache for %s: %s", self.streamdetails.uri, err)
- # remove the cache file
- await remove_file(self._cache_file)
+ # make sure that the (corrupted/incomplete) cache file is removed
+ await self._remove_cache_file()
finally:
await ffmpeg_proc.close()
+ async def _remove_cache_file(self) -> None:
+ self._first_part_received.clear()
+ self._all_data_written.clear()
+ self._fetch_task = None
+ await remove_file(self._cache_file)
+
def __init__(self, mass: MusicAssistant, streamdetails: StreamDetails) -> None:
"""Initialize the StreamCache."""
self.mass = mass
)
if allow_cache == "disabled":
return False
- if not await has_enough_space(mass.streams.audio_cache_dir, 0.5):
+ if not await has_enough_space(mass.streams.audio_cache_dir, 5):
return False
if allow_cache == "always":
return True
estimated_filesize = get_chunksize(streamdetails.audio_format, streamdetails.duration)
if streamdetails.stream_type == StreamType.MULTI_FILE:
# prefer cache to speedup multi-file streams
- # (if total filesize smaller than 1.5GB)
- max_filesize = 1.5 * 1024 * 1024 * 1024
+ # (if total filesize smaller than 2GB)
+ max_filesize = 2 * 1024 * 1024 * 1024
elif streamdetails.stream_type == StreamType.CUSTOM:
# prefer cache for custom streams (to speedup seeking)
max_filesize = 250 * 1024 * 1024 # 250MB
max_filesize = 250 * 1024 * 1024 # 250MB
elif streamdetails.provider in SLOW_PROVIDERS:
# prefer cache for slow providers
- max_filesize = 250 * 1024 * 1024 # 250MB
+ max_filesize = 500 * 1024 * 1024 # 500MB
else:
max_filesize = 25 * 1024 * 1024
import ifaddr
from zeroconf import IPVersion
+from music_assistant.constants import VERBOSE_LOG_LEVEL
from music_assistant.helpers.process import check_output
if TYPE_CHECKING:
return await asyncio.to_thread(_get_folder_size, folderpath)
-async def clean_old_files(folderpath: str, max_size: float) -> None:
- """Clean old files in folder to make room for new files."""
- foldersize = await get_folder_size(folderpath)
- if foldersize < max_size:
- return
-
- def _clean_old_files(foldersize: float):
- files: list[os.DirEntry] = [x for x in os.scandir(folderpath) if x.is_file()]
- files.sort(key=lambda x: x.stat().st_mtime)
- for _file in files:
- foldersize -= _file.stat().st_size / float(1 << 30)
- os.remove(_file.path)
- if foldersize < max_size:
- return
-
- await asyncio.to_thread(_clean_old_files, foldersize)
-
-
def get_changed_keys(
dict1: dict[str, Any],
dict2: dict[str, Any],
return await asyncio.to_thread(_get_free_space, folder)
+async def get_free_space_percentage(folder: str) -> float:
+ """Return free space on given folderpath in percentage."""
+
+ def _get_free_space(folder: str) -> float:
+ """Return free space on given folderpath in GB."""
+ try:
+ if res := shutil.disk_usage(folder):
+ return res.free / res.total * 100
+ except (FileNotFoundError, OSError, PermissionError):
+ return 0.0
+
+ return await asyncio.to_thread(_get_free_space, folder)
+
+
async def has_enough_space(folder: str, size: int) -> bool:
"""Check if folder has enough free space."""
return await get_free_space(folder) > size
yield data[i : i + chunk_size]
+async def remove_file(file_path: str) -> None:
+ """Remove file path (if it exists)."""
+ if not await asyncio.to_thread(os.path.exists, file_path):
+ return
+ await asyncio.to_thread(os.remove, file_path)
+ LOGGER.log(VERBOSE_LOG_LEVEL, "Removed file: %s", file_path)
+
+
def get_primary_ip_address_from_zeroconf(discovery_info: AsyncServiceInfo) -> str | None:
"""Get primary IP address from zeroconf discovery info."""
for address in discovery_info.parsed_addresses(IPVersion.V4Only):