From: Marcel van der Veldt Date: Tue, 11 Mar 2025 23:41:52 +0000 (+0100) Subject: Finalize stream caching + fix several bugs (#2029) X-Git-Url: https://git.kitaultman.com/?a=commitdiff_plain;h=61ecd80a70220b11383ca58c2c4b54c614d73daa;p=music-assistant-server.git Finalize stream caching + fix several bugs (#2029) --- diff --git a/music_assistant/constants.py b/music_assistant/constants.py index 459c1e8c..fcb346bb 100644 --- a/music_assistant/constants.py +++ b/music_assistant/constants.py @@ -84,13 +84,15 @@ CONF_POWER_CONTROL: Final[str] = "power_control" CONF_VOLUME_CONTROL: Final[str] = "volume_control" CONF_MUTE_CONTROL: Final[str] = "mute_control" CONF_OUTPUT_CODEC: Final[str] = "output_codec" -CONF_ALLOW_MEMORY_CACHE: Final[str] = "allow_memory_cache" +CONF_ALLOW_AUDIO_CACHE: Final[str] = "allow_audio_cache" +CONF_AUDIO_CACHE_MAX_SIZE: Final[str] = "audio_cache_max_size" # config default values DEFAULT_HOST: Final[str] = "0.0.0.0" DEFAULT_PORT: Final[int] = 8095 -DEFAULT_ALLOW_MEMORY_CACHE: Final[bool] = True +DEFAULT_ALLOW_AUDIO_CACHE: Final[str] = "auto" +DEFAULT_AUDIO_CACHE_MAX_SIZE: Final[int] = 5 # 5gb # common db tables DB_TABLE_PLAYLOG: Final[str] = "playlog" diff --git a/music_assistant/controllers/streams.py b/music_assistant/controllers/streams.py index 3f3efa87..552299d9 100644 --- a/music_assistant/controllers/streams.py +++ b/music_assistant/controllers/streams.py @@ -30,7 +30,8 @@ from music_assistant_models.player_queue import PlayLogEntry from music_assistant.constants import ( ANNOUNCE_ALERT_FILE, - CONF_ALLOW_MEMORY_CACHE, + CONF_ALLOW_AUDIO_CACHE, + CONF_AUDIO_CACHE_MAX_SIZE, CONF_BIND_IP, CONF_BIND_PORT, CONF_CROSSFADE, @@ -45,7 +46,8 @@ from music_assistant.constants import ( CONF_VOLUME_NORMALIZATION_FIXED_GAIN_TRACKS, CONF_VOLUME_NORMALIZATION_RADIO, CONF_VOLUME_NORMALIZATION_TRACKS, - DEFAULT_ALLOW_MEMORY_CACHE, + DEFAULT_ALLOW_AUDIO_CACHE, + DEFAULT_AUDIO_CACHE_MAX_SIZE, DEFAULT_PCM_FORMAT, DEFAULT_STREAM_HEADERS, ICY_HEADERS, @@ -63,7 +65,13 @@ from music_assistant.helpers.audio import ( ) from music_assistant.helpers.ffmpeg import LOGGER as FFMPEG_LOGGER from music_assistant.helpers.ffmpeg import check_ffmpeg_version, get_ffmpeg_stream -from music_assistant.helpers.util import get_ip, get_ips, select_free_port, try_parse_bool +from music_assistant.helpers.util import ( + clean_old_files, + get_ip, + get_ips, + select_free_port, + try_parse_bool, +) from music_assistant.helpers.webserver import Webserver from music_assistant.models.core_controller import CoreController from music_assistant.models.plugin import PluginProvider @@ -107,12 +115,21 @@ class StreamsController(CoreController): ) self.manifest.icon = "cast-audio" self.announcements: dict[str, str] = {} + # create cache dir if needed + self._audio_cache_dir = audio_cache_dir = os.path.join(self.mass.cache_path, ".audio") + if not os.path.isdir(audio_cache_dir): + os.makedirs(audio_cache_dir) @property def base_url(self) -> str: """Return the base_url for the streamserver.""" return self._server.base_url + @property + def audio_cache_dir(self) -> str: + """Return the directory where audio cache files are stored.""" + return self._audio_cache_dir + async def get_config_entries( self, action: str | None = None, @@ -198,16 +215,36 @@ class StreamsController(CoreController): required=False, ), ConfigEntry( - key=CONF_ALLOW_MEMORY_CACHE, - type=ConfigEntryType.BOOLEAN, - default_value=DEFAULT_ALLOW_MEMORY_CACHE, - label="Allow (in-memory) caching of audio streams", - description="To ensure smooth playback as well as fast seeking, " - "Music Assistant by default caches audio streams (in memory). " - "On systems with limited memory, this can be disabled, " - "but may result in less smooth playback.", + key=CONF_ALLOW_AUDIO_CACHE, + type=ConfigEntryType.STRING, + default_value=DEFAULT_ALLOW_AUDIO_CACHE, + options=[ + ConfigValueOption("Always", "always"), + ConfigValueOption("Disabled", "disabled"), + ConfigValueOption("Auto", "auto"), + ], + label="Allow caching of remote/cloudbased audio streams", + description="To ensure smooth(er) playback as well as fast seeking, " + "Music Assistant can cache audio streams on disk. \n" + "On systems with limited diskspace, this can be disabled, " + "but may result in less smooth playback or slower seeking.\n\n" + "**Always:** Enforce caching of audio streams at all times " + "(as long as there is enough free space)." + "**Disabled:** Never cache audio streams.\n" + "**Auto:** Let Music Assistant decide if caching " + "should be used on a per-item base.", + category="advanced", + required=True, + ), + ConfigEntry( + key=CONF_AUDIO_CACHE_MAX_SIZE, + type=ConfigEntryType.INTEGER, + default_value=DEFAULT_AUDIO_CACHE_MAX_SIZE, + label="Maximum size of audio cache", + description="The maximum amount of diskspace (in GB) " + "the audio cache may consume (if enabled).", + range=(1, 50), category="advanced", - required=False, ), ) @@ -218,6 +255,7 @@ class StreamsController(CoreController): FFMPEG_LOGGER.setLevel(self.logger.level) # perform check for ffmpeg version await check_ffmpeg_version() + await self._clean_audio_cache() # start the webserver self.publish_port = config.get_value(CONF_BIND_PORT) self.publish_ip = config.get_value(CONF_PUBLISH_IP) @@ -1049,3 +1087,17 @@ class StreamsController(CoreController): bit_depth=DEFAULT_PCM_FORMAT.bit_depth, channels=2, ) + + async def _clean_audio_cache(self) -> None: + """Clean up audio cache periodically.""" + max_cache_size = await self.mass.config.get_core_config_value( + self.domain, CONF_AUDIO_CACHE_MAX_SIZE + ) + cache_enabled = await self.mass.config.get_core_config_value( + self.domain, CONF_ALLOW_AUDIO_CACHE + ) + if cache_enabled == "disabled": + max_cache_size = 0.001 + await clean_old_files(self.audio_cache_dir, max_cache_size) + # reschedule self + self.mass.call_later(3600, self._clean_audio_cache) diff --git a/music_assistant/helpers/audio.py b/music_assistant/helpers/audio.py index a2542012..3be9304d 100644 --- a/music_assistant/helpers/audio.py +++ b/music_assistant/helpers/audio.py @@ -34,14 +34,14 @@ from music_assistant_models.errors import ( from music_assistant_models.streamdetails import AudioFormat from music_assistant.constants import ( - CONF_ALLOW_MEMORY_CACHE, + CONF_ALLOW_AUDIO_CACHE, CONF_ENTRY_OUTPUT_LIMITER, CONF_OUTPUT_CHANNELS, CONF_VOLUME_NORMALIZATION, CONF_VOLUME_NORMALIZATION_RADIO, CONF_VOLUME_NORMALIZATION_TARGET, CONF_VOLUME_NORMALIZATION_TRACKS, - DEFAULT_ALLOW_MEMORY_CACHE, + DEFAULT_ALLOW_AUDIO_CACHE, MASS_LOGGER_NAME, VERBOSE_LOG_LEVEL, ) @@ -53,7 +53,7 @@ from .dsp import filter_to_ffmpeg_params from .ffmpeg import FFMpeg, get_ffmpeg_stream from .playlists import IsHLSPlaylist, PlaylistItem, fetch_playlist, parse_m3u from .process import AsyncProcess, communicate -from .util import detect_charset +from .util import detect_charset, has_enough_space if TYPE_CHECKING: from music_assistant_models.config_entries import CoreConfig, PlayerConfig @@ -70,6 +70,8 @@ LOGGER = logging.getLogger(f"{MASS_LOGGER_NAME}.audio") HTTP_HEADERS = {"User-Agent": "Lavf/60.16.100.MusicAssistant"} HTTP_HEADERS_ICY = {**HTTP_HEADERS, "Icy-MetaData": "1"} +REQUIRED_FREE_CACHE_SPACE = 5 # 5 GB + async def remove_file(file_path: str) -> None: """Remove file path (if it exists).""" @@ -83,143 +85,182 @@ class StreamCache: """ StreamCache. - Basic class to handle (temporary) in-memory caching of audio streams. + Basic class to handle caching of audio streams to a (semi) temporary file. Useful in case of slow or unreliable network connections, faster seeking, or when the audio stream is slow itself. """ async def create(self) -> None: """Create the cache file (if needed).""" - self.mass.cancel_timer(f"clear_cache_{self.cache_id}") + if ( + self._cache_file is not None + and await asyncio.to_thread(os.path.exists, self._cache_file) + and self._first_part_received.is_set() + ): + # cache file already exists + return + # use cache controller to store the translation of uri-->cache file + if stored_cache_file := await self.mass.cache.get( + self.streamdetails.uri, base_key="audiocache" + ): + # cache file already exists in memory + self._cache_file = stored_cache_file + if await asyncio.to_thread(os.path.exists, self._cache_file): + # cache file already exists + return + else: + # create new cache file + cache_id = shortuuid.random(30) + self._cache_file = os.path.join(self.mass.streams.audio_cache_dir, cache_id) + await self.mass.cache.set( + self.streamdetails.uri, + self._cache_file, + base_key="audiocache", + ) + # start fetch task if its not already running if self._fetch_task is None: - self._fetch_task = self.mass.create_task(self._fill_cache()) + self._fetch_task = self.mass.create_task(self._create_cache_file()) # wait until the first part of the file is received await self._first_part_received.wait() - async def get_audio_stream(self) -> AsyncGenerator[bytes, None]: - """Stream audio from cachedata (while it might even still being written).""" - try: - self._subscribers += 1 - bytes_read = 0 - chunksize = 64000 - await self.create() - while True: - async with self._lock: - chunk = self._data[bytes_read : bytes_read + chunksize] - bytes_read += len(chunk) - if len(chunk) < chunksize and self._all_data_written.is_set(): - # reached EOF - break - elif not chunk: - # data is not yet available, wait a bit - await asyncio.sleep(0.05) - else: - yield chunk - del chunk - await asyncio.sleep(0) # yield to eventloop - finally: - self._subscribers -= 1 - if self._subscribers == 0: - # set a timer to remove the tempfile after 1 minute - # if the file is accessed again within this period, - # the timer will be cancelled - self.mass.call_later(60, self._clear, task_id=f"clear_cache_{self.cache_id}") - - async def _fill_cache(self) -> None: + async def get_audio_stream(self) -> str | AsyncGenerator[bytes, None]: + """ + Get the cached audio stream. + + Returns a string with the path of the cachefile if the file is ready. + If the file is not yet ready, it will return an async generator that will + stream the (intermediate) audio data from the cache file. + """ + + async def _stream_from_cache() -> AsyncGenerator[bytes, None]: + chunksize = get_chunksize(self.streamdetails.audio_format, 1) + async with aiofiles.open(self._cache_file, "rb") as file: + while True: + chunk = await file.read(chunksize) + if chunk: + yield chunk + await asyncio.sleep(0) # yield to eventloop + del chunk + elif self._all_data_written.is_set(): + # reached EOF + break + else: + # data is not yet available, wait a bit + await asyncio.sleep(0.05) + + if await asyncio.to_thread(os.path.exists, self._cache_file): + if self._fetch_task is None: + # a complete cache file already exists on disk from a previous run + return self._cache_file + if self._all_data_written.is_set(): + # cache file is ready + return self._cache_file + # cache file does not exist at all (or is still being written) + await self.create() + return _stream_from_cache() + + async def _create_cache_file(self) -> None: time_start = time.time() - self.logger.debug("Fetching audio stream for %s", self.streamdetails.uri) + self.logger.debug("Creating audio cache for %s", self.streamdetails.uri) + extra_input_args = self.org_extra_input_args or [] if self.org_stream_type == StreamType.CUSTOM: audio_source = self.mass.get_provider(self.streamdetails.provider).get_audio_stream( self.streamdetails, ) - elif self.org_stream_type in (StreamType.HTTP, StreamType.ENCRYPTED_HTTP, StreamType.HLS): + elif self.org_stream_type == StreamType.ICY: + raise NotImplementedError("Caching of this streamtype is not supported!") + elif self.org_stream_type == StreamType.HLS: + if self.streamdetails.media_type == MediaType.RADIO: + raise NotImplementedError("Caching of this streamtype is not supported!") + substream = await get_hls_substream(self.mass, self.org_path) + audio_source = substream.path + elif self.org_stream_type == StreamType.ENCRYPTED_HTTP: audio_source = self.org_path + extra_input_args += ["-decryption_key", self.streamdetails.decryption_key] + elif self.org_stream_type == StreamType.MULTI_FILE: + audio_source = get_multi_file_stream(self.mass, self.streamdetails) else: - raise NotImplementedError("Caching of this streamtype is not supported") - - extra_input_args = self.org_extra_input_args or [] - if self.streamdetails.decryption_key: - extra_input_args += [ - "-decryption_key", - self.streamdetails.decryption_key, - ] + audio_source = self.org_path - # we always use an intermediate ffmpeg process to fetch the original audio source + # we always use ffmpeg to fetch the original audio source # this may feel a bit redundant, but it's the most reliable way to fetch the audio # because ffmpeg has all logic to handle different audio formats, codecs, etc. # and it also accounts for complicated cases such as encrypted streams or # m4a/mp4 streams with the moov atom at the end of the file. - # ffmpeg will produce a lossless copy of the original codec to stdout. + # ffmpeg will produce a lossless copy of the original codec. self._first_part_received.clear() self._all_data_written.clear() - self._data = b"" - async for chunk in get_ffmpeg_stream( - audio_input=audio_source, - input_format=self.streamdetails.audio_format, - output_format=self.streamdetails.audio_format, - chunk_size=64000, - # apply readrate limiting to avoid buffering too much data too fast - # so we only allow reading into the cache max 5 times the normal speed - extra_input_args=["-readrate", "5", *extra_input_args], - ): - async with self._lock: - self._data += chunk - del chunk - await asyncio.sleep(0) # yield to eventloop - if not self._first_part_received.is_set(): - self._first_part_received.set() - self.logger.debug( - "First part received for %s after %.2fs", - self.streamdetails.uri, - time.time() - time_start, - ) - self._all_data_written.set() - self.logger.debug( - "Writing all data for %s done in %.2fs", - self.streamdetails.uri, - time.time() - time_start, - ) + try: + ffmpeg_proc = FFMpeg( + audio_input=audio_source, + input_format=self.org_audio_format, + output_format=self.streamdetails.audio_format, + extra_input_args=extra_input_args, + audio_output=self._cache_file, + collect_log_history=True, + ) + await ffmpeg_proc.start() + # wait until the first data is written to the cache file + while ffmpeg_proc.returncode is None: + await asyncio.sleep(0.1) + if not await asyncio.to_thread(os.path.exists, self._cache_file): + continue + if await asyncio.to_thread(os.path.getsize, self._cache_file) > 64000: + break + + self._first_part_received.set() + self.logger.debug( + "First part received for %s after %.2fs", + self.streamdetails.uri, + time.time() - time_start, + ) + # wait until ffmpeg is done + await ffmpeg_proc.wait() + + if ffmpeg_proc.returncode != 0: + ffmpeg_proc.logger.warning("\n".join(ffmpeg_proc.log_history)) + raise AudioError(f"FFMpeg error {ffmpeg_proc.returncode}") + + self._all_data_written.set() + self.logger.debug( + "Writing all data for %s done in %.2fs", + self.streamdetails.uri, + time.time() - time_start, + ) + except Exception as err: + self.logger.error("Error while creating cache for %s: %s", self.streamdetails.uri, err) + # remove the cache file + await remove_file(self._cache_file) + finally: + await ffmpeg_proc.close() def __init__(self, mass: MusicAssistant, streamdetails: StreamDetails) -> None: """Initialize the StreamCache.""" self.mass = mass self.streamdetails = streamdetails - self.cache_id = shortuuid.random(20) self.logger = LOGGER.getChild("cache") + self._cache_file: str | None = None self._fetch_task: asyncio.Task | None = None self._subscribers: int = 0 self._first_part_received = asyncio.Event() self._all_data_written = asyncio.Event() - self._data: bytes = b"" - self._lock: asyncio.Lock = asyncio.Lock() self.org_path: str | None = streamdetails.path self.org_stream_type: StreamType | None = streamdetails.stream_type self.org_extra_input_args: list[str] | None = streamdetails.extra_input_args + self.org_audio_format = streamdetails.audio_format + streamdetails.audio_format = AudioFormat( + content_type=ContentType.NUT, + codec_type=streamdetails.audio_format.codec_type, + sample_rate=streamdetails.audio_format.sample_rate, + bit_depth=streamdetails.audio_format.bit_depth, + channels=streamdetails.audio_format.channels, + ) streamdetails.path = "-" streamdetails.stream_type = StreamType.CACHE streamdetails.can_seek = True streamdetails.allow_seek = True streamdetails.extra_input_args = [] - async def _clear(self) -> None: - """Clear the cache.""" - self.logger.debug("Cleaning up cache %s", self.streamdetails.uri) - if self._fetch_task and not self._fetch_task.done(): - self._fetch_task.cancel() - self._fetch_task = None - self._first_part_received.clear() - self._all_data_written.clear() - del self._data - self._data = b"" - - def __del__(self) -> None: - """Ensure the cache data gets cleaned up.""" - if self.mass.closing: - # edge case: MA is closing - return - self.mass.cancel_timer(f"remove_file_{self.cache_id}") - del self._data - async def crossfade_pcm_parts( fade_in_part: bytes, @@ -527,24 +568,9 @@ async def get_stream_details( int((time.time() - time_start) * 1000), ) - if streamdetails.decryption_key: - # using intermediate cache is mandatory for encrypted streams - streamdetails.enable_cache = True - # determine if we may use caching for the audio stream if streamdetails.enable_cache is None: - allow_cache = mass.config.get_raw_core_config_value( - "streams", CONF_ALLOW_MEMORY_CACHE, DEFAULT_ALLOW_MEMORY_CACHE - ) - streamdetails.enable_cache = ( - allow_cache - and streamdetails.duration is not None - and streamdetails.media_type - in (MediaType.TRACK, MediaType.AUDIOBOOK, MediaType.PODCAST_EPISODE) - and streamdetails.stream_type - in (StreamType.HTTP, StreamType.ENCRYPTED_HTTP, StreamType.CUSTOM, StreamType.HLS) - and get_chunksize(streamdetails.audio_format, streamdetails.duration) < 100000000 - ) + streamdetails.enable_cache = await _is_cache_allowed(mass, streamdetails) # handle temporary cache support of audio stream if streamdetails.enable_cache: @@ -563,6 +589,51 @@ async def get_stream_details( return streamdetails +async def _is_cache_allowed(mass: MusicAssistant, streamdetails: StreamDetails) -> bool: + """Check if caching is allowed for the given streamdetails.""" + if streamdetails.media_type not in ( + MediaType.TRACK, + MediaType.AUDIOBOOK, + MediaType.PODCAST_EPISODE, + ): + return False + if streamdetails.stream_type in (StreamType.ICY, StreamType.LOCAL_FILE, StreamType.UNKNOWN): + return False + allow_cache = mass.config.get_raw_core_config_value( + "streams", CONF_ALLOW_AUDIO_CACHE, DEFAULT_ALLOW_AUDIO_CACHE + ) + if allow_cache == "disabled": + return False + if not await has_enough_space(mass.streams.audio_cache_dir, REQUIRED_FREE_CACHE_SPACE): + return False + if allow_cache == "always": + return True + # auto mode + if streamdetails.stream_type == StreamType.ENCRYPTED_HTTP: + # always prefer cache for encrypted streams + return True + if not streamdetails.duration: + # we can't determine filesize without duration so play it safe and dont allow cache + return False + if streamdetails.stream_type == StreamType.MULTI_FILE: + # prefer cache to speedup multi-file streams + # (if total filesize smaller than 5GB) + max_filesize = 5 * 1024 * 1024 * 1024 + return get_chunksize(streamdetails.audio_format, streamdetails.duration) < max_filesize + if streamdetails.stream_type == StreamType.CUSTOM: + # prefer cache for custom streams (to speedup seeking) + # (if total filesize smaller than 500MB) + max_filesize = 500 * 1024 * 1024 + return get_chunksize(streamdetails.audio_format, streamdetails.duration) < max_filesize + if streamdetails.stream_type == StreamType.HLS: + # prefer cache for HLS streams (to speedup seeking) + # (if total filesize smaller than 500MB) + max_filesize = 500 * 1024 * 1024 + return get_chunksize(streamdetails.audio_format, streamdetails.duration) < max_filesize + # deny for all other stream types + return False + + async def get_media_stream( mass: MusicAssistant, streamdetails: StreamDetails, @@ -583,7 +654,9 @@ async def get_media_stream( stream_type = streamdetails.stream_type if stream_type == StreamType.CACHE: cache = cast(StreamCache, streamdetails.cache) - audio_source = cache.get_audio_stream() + audio_source = await cache.get_audio_stream() + elif stream_type == StreamType.MULTI_FILE: + audio_source = get_multi_file_stream(mass, streamdetails) elif stream_type == StreamType.CUSTOM: audio_source = mass.get_provider(streamdetails.provider).get_audio_stream( streamdetails, @@ -599,6 +672,9 @@ async def get_media_stream( # with ffmpeg, where they just stop after some minutes, # so we tell ffmpeg to loop around in this case. extra_input_args += ["-stream_loop", "-1", "-re"] + elif stream_type == StreamType.ENCRYPTED_HTTP: + audio_source = streamdetails.path + extra_input_args += ["-decryption_key", streamdetails.decryption_key] else: audio_source = streamdetails.path @@ -715,6 +791,8 @@ async def get_media_stream( if bytes_sent == 0: # edge case: no audio data was sent raise AudioError("No audio was received") + elif ffmpeg_proc.returncode not in (0, None): + raise AudioError(f"FFMpeg exited with code {ffmpeg_proc.returncode}") finished = True except (Exception, GeneratorExit) as err: if isinstance(err, asyncio.CancelledError | GeneratorExit): @@ -722,27 +800,21 @@ async def get_media_stream( cancelled = True raise logger.error("Error while streaming %s: %s", streamdetails.uri, err) + # dump the last 10 lines of the log in case of an unclean exit + logger.warning("\n".join(list(ffmpeg_proc.log_history)[-10:])) streamdetails.stream_error = True finally: # always ensure close is called which also handles all cleanup await ffmpeg_proc.close() - # try to determine how many seconds we've streamed seconds_streamed = bytes_sent / pcm_format.pcm_sample_size if bytes_sent else 0 - if not cancelled and ffmpeg_proc.returncode not in (0, 255): - # dump the last 5 lines of the log in case of an unclean exit - log_tail = "\n" + "\n".join(list(ffmpeg_proc.log_history)[-5:]) - else: - log_tail = "" logger.debug( - "stream %s (with code %s) for %s - seconds streamed: %s %s", + "stream %s (with code %s) for %s - seconds streamed: %s", "cancelled" if cancelled else "finished" if finished else "aborted", ffmpeg_proc.returncode, streamdetails.uri, seconds_streamed, - log_tail, ) - streamdetails.seconds_streamed = seconds_streamed # store accurate duration if finished and not streamdetails.seek_position and seconds_streamed: @@ -1103,6 +1175,35 @@ async def get_file_stream( yield data +async def get_multi_file_stream( + mass: MusicAssistant, # noqa: ARG001 + streamdetails: StreamDetails, +) -> AsyncGenerator[bytes, None]: + """Return audio stream for a concatenation of multiple files.""" + files_list: list[str] = streamdetails.data + # concat input files + temp_file = f"/tmp/{shortuuid.random(20)}.txt" # noqa: S108 + async with aiofiles.open(temp_file, "w") as f: + for path in files_list: + await f.write(f"file '{path}'\n") + + try: + async for chunk in get_ffmpeg_stream( + audio_input=temp_file, + input_format=streamdetails.audio_format, + output_format=AudioFormat( + content_type=ContentType.NUT, + sample_rate=streamdetails.audio_format.sample_rate, + bit_depth=streamdetails.audio_format.bit_depth, + channels=streamdetails.audio_format.channels, + ), + extra_input_args=["-safe", "0", "-f", "concat", "-i", temp_file], + ): + yield chunk + finally: + await remove_file(temp_file) + + async def get_preview_stream( mass: MusicAssistant, provider_instance_id_or_domain: str, @@ -1176,7 +1277,7 @@ def get_chunksize( return pcm_size if fmt.content_type in (ContentType.WAV, ContentType.AIFF, ContentType.DSF): return pcm_size - if fmt.bit_rate: + if fmt.bit_rate and fmt.bit_rate < 10000: return int(((fmt.bit_rate * 1000) / 8) * seconds) if fmt.content_type in (ContentType.FLAC, ContentType.WAVPACK, ContentType.ALAC): # assume 74.7% compression ratio (level 0) @@ -1347,7 +1448,9 @@ async def analyze_loudness( ] if streamdetails.stream_type == StreamType.CACHE: cache = cast(StreamCache, streamdetails.cache) - audio_source = cache.get_audio_stream() + audio_source = await cache.get_audio_stream() + elif streamdetails.stream_type == StreamType.MULTI_FILE: + audio_source = get_multi_file_stream(mass, streamdetails) elif streamdetails.stream_type == StreamType.CUSTOM: audio_source = mass.get_provider(streamdetails.provider).get_audio_stream( streamdetails, diff --git a/music_assistant/helpers/ffmpeg.py b/music_assistant/helpers/ffmpeg.py index bdf2fd96..006d298e 100644 --- a/music_assistant/helpers/ffmpeg.py +++ b/music_assistant/helpers/ffmpeg.py @@ -60,10 +60,15 @@ class FFMpeg(AsyncProcess): self._stdin_task: asyncio.Task | None = None self._logger_task: asyncio.Task | None = None self._input_codec_parsed = False + if audio_input == "-" or isinstance(audio_input, AsyncGenerator): + stdin = True + else: + stdin = audio_input if isinstance(audio_input, int) else False + stdout = audio_output if isinstance(audio_output, int) else bool(audio_output == "-") super().__init__( ffmpeg_args, - stdin=True if isinstance(audio_input, str | AsyncGenerator) else audio_input, - stdout=True if isinstance(audio_output, str) else audio_output, + stdin=stdin, + stdout=stdout, stderr=True, ) self.logger = LOGGER @@ -190,11 +195,16 @@ async def get_ffmpeg_stream( filter_params=filter_params, extra_args=extra_args, extra_input_args=extra_input_args, + collect_log_history=True, ) as ffmpeg_proc: # read final chunks from stdout iterator = ffmpeg_proc.iter_chunked(chunk_size) if chunk_size else ffmpeg_proc.iter_any() async for chunk in iterator: yield chunk + if ffmpeg_proc.returncode not in (None, 0): + # dump the last 5 lines of the log in case of an unclean exit + log_tail = "\n" + "\n".join(list(ffmpeg_proc.log_history)[-5:]) + ffmpeg_proc.logger.error(log_tail) def get_ffmpeg_args( # noqa: PLR0915 @@ -223,7 +233,6 @@ def get_ffmpeg_args( # noqa: PLR0915 ] # collect input args input_args = [] - if extra_input_args: input_args += extra_input_args if input_path.startswith("http"): @@ -263,6 +272,9 @@ def get_ffmpeg_args( # noqa: PLR0915 ] elif input_format.codec_type != ContentType.UNKNOWN: input_args += ["-acodec", input_format.codec_type.name.lower(), "-i", input_path] + elif "-f" in extra_input_args: + # input format is already specified in the extra input args + pass else: # let ffmpeg auto detect the content type from the metadata/headers input_args += ["-i", input_path] @@ -288,19 +300,8 @@ def get_ffmpeg_args( # noqa: PLR0915 "-f", output_format.content_type.value, ] - elif input_format == output_format and not filter_params and not extra_args: - # passthrough-mode (e.g. for creating the cache) - if output_format.content_type in ( - ContentType.MP4, - ContentType.MP4A, - ContentType.M4A, - ContentType.M4B, - ): - fmt = "adts" - elif output_format.codec_type in (ContentType.UNKNOWN, ContentType.OGG): - fmt = "nut" # use special nut container - else: - fmt = output_format.content_type.name.lower() + elif output_format.content_type == ContentType.NUT: + # passthrough-mode (for creating the cache) using NUT container output_args = [ "-vn", "-dn", @@ -308,7 +309,7 @@ def get_ffmpeg_args( # noqa: PLR0915 "-acodec", "copy", "-f", - fmt, + "nut", ] elif output_format.content_type == ContentType.AAC: output_args = ["-f", "adts", "-c:a", "aac", "-b:a", "256k"] @@ -337,7 +338,6 @@ def get_ffmpeg_args( # noqa: PLR0915 "-compression_level", "0", ] - else: raise RuntimeError("Invalid/unsupported output format specified") diff --git a/music_assistant/helpers/util.py b/music_assistant/helpers/util.py index 534c7a27..25606e40 100644 --- a/music_assistant/helpers/util.py +++ b/music_assistant/helpers/util.py @@ -286,14 +286,36 @@ async def get_ip_pton(ip_string: str | None = None) -> bytes: return await asyncio.to_thread(socket.inet_pton, socket.AF_INET6, ip_string) -def get_folder_size(folderpath: str) -> float: +async def get_folder_size(folderpath: str) -> float: """Return folder size in gb.""" - total_size = 0 - for dirpath, _dirnames, filenames in os.walk(folderpath): - for _file in filenames: - _fp = os.path.join(dirpath, _file) - total_size += os.path.getsize(_fp) - return total_size / float(1 << 30) + + def _get_folder_size(folderpath: str) -> float: + total_size = 0 + for dirpath, _dirnames, filenames in os.walk(folderpath): + for _file in filenames: + _fp = os.path.join(dirpath, _file) + total_size += os.path.getsize(_fp) + return total_size / float(1 << 30) + + return await asyncio.to_thread(_get_folder_size, folderpath) + + +async def clean_old_files(folderpath: str, max_size: float) -> None: + """Clean old files in folder to make room for new files.""" + foldersize = await get_folder_size(folderpath) + if foldersize < max_size: + return + + def _clean_old_files(foldersize: float): + files: list[os.DirEntry] = [x for x in os.scandir(folderpath) if x.is_file()] + files.sort(key=lambda x: x.stat().st_mtime) + for _file in files: + foldersize -= _file.stat().st_size / float(1 << 30) + os.remove(_file.path) + if foldersize < max_size: + return + + await asyncio.to_thread(_clean_old_files, foldersize) def get_changed_keys( @@ -465,13 +487,23 @@ async def has_tmpfs_mount() -> bool: return False -async def get_tmp_free_space() -> int: - """Return free space on tmp.""" +async def get_tmp_free_space() -> float: + """Return free space on tmp in GB's.""" + return await get_free_space("/tmp") # noqa: S108 + + +async def get_free_space(folder: str) -> float: + """Return free space on given folderpath in GB.""" try: - if res := await asyncio.to_thread(shutil.disk_usage, "/tmp"): # noqa: S108 - return res.free + if res := await asyncio.to_thread(shutil.disk_usage, folder): + return res.free / float(1 << 30) except (FileNotFoundError, OSError, PermissionError): - return 0 + return 0.0 + + +async def has_enough_space(folder: str, size: int) -> bool: + """Check if folder has enough free space.""" + return await get_free_space(folder) > size def divide_chunks(data: bytes, chunk_size: int) -> Iterator[bytes]: diff --git a/music_assistant/providers/audiobookshelf/__init__.py b/music_assistant/providers/audiobookshelf/__init__.py index 9f7de87b..09d7c93b 100644 --- a/music_assistant/providers/audiobookshelf/__init__.py +++ b/music_assistant/providers/audiobookshelf/__init__.py @@ -41,7 +41,6 @@ from music_assistant_models.media_items import ( ) from music_assistant_models.streamdetails import StreamDetails -from music_assistant.helpers.ffmpeg import get_ffmpeg_stream from music_assistant.models.music_provider import MusicProvider from music_assistant.providers.audiobookshelf.parsers import ( parse_audiobook, @@ -448,47 +447,35 @@ class Audiobookshelf(MusicProvider): base_url = str(self.config.get_value(CONF_URL)) if len(tracks) == 0: raise MediaNotFoundError("Stream not found") + + content_type = ContentType.UNKNOWN + if abs_audiobook.media.tracks[0].metadata is not None: + content_type = ContentType.try_parse(abs_audiobook.media.tracks[0].metadata.ext) + if len(tracks) > 1: self.logger.debug("Using playback for multiple file audiobook.") - multiple_files = [] + multiple_files: list[str] = [] for track in tracks: - media_url = track.content_url - stream_url = f"{base_url}{media_url}?token={token}" - content_type = ContentType.UNKNOWN - if track.metadata is not None: - content_type = ContentType.try_parse(track.metadata.ext) - multiple_files.append( - (AudioFormat(content_type=content_type), stream_url, track.duration) - ) + stream_url = f"{base_url}{track.content_url}?token={token}" + multiple_files.append(stream_url) return StreamDetails( provider=self.instance_id, item_id=abs_audiobook.id_, - # for the concatanated stream, we need to use a pcm stream format - audio_format=AudioFormat( - content_type=ContentType.PCM_S16LE, - sample_rate=44100, - bit_depth=16, - channels=2, - ), + audio_format=AudioFormat(content_type=content_type), media_type=MediaType.AUDIOBOOK, - stream_type=StreamType.CUSTOM, + stream_type=StreamType.MULTI_FILE, duration=int(abs_audiobook.media.duration), data=multiple_files, allow_seek=True, - can_seek=True, ) self.logger.debug( f'Using direct playback for audiobook "{abs_audiobook.media.metadata.title}".' ) - - track = abs_audiobook.media.tracks[0] - media_url = track.content_url + media_url = abs_audiobook.media.tracks[0].content_url stream_url = f"{base_url}{media_url}?token={token}" - content_type = ContentType.UNKNOWN - if track.metadata is not None: - content_type = ContentType.try_parse(track.metadata.ext) + return StreamDetails( provider=self.lookup_key, item_id=abs_audiobook.id_, @@ -534,34 +521,6 @@ class Audiobookshelf(MusicProvider): allow_seek=True, ) - async def get_audio_stream( - self, streamdetails: StreamDetails, seek_position: int = 0 - ) -> AsyncGenerator[bytes, None]: - """ - Return the (custom) audio stream for the provider item. - - Only used for multi-file audiobooks. - """ - stream_data: list[tuple[AudioFormat, str, float]] = streamdetails.data - total_duration = 0.0 - for audio_format, chapter_file, chapter_duration in stream_data: - total_duration += chapter_duration - if total_duration < seek_position: - continue - seek_position_netto = round( - max(0, seek_position - (total_duration - chapter_duration)), 2 - ) - self.logger.debug(chapter_file) - async for chunk in get_ffmpeg_stream( - chapter_file, - input_format=audio_format, - # output format is always pcm because we are sending - # the result of multiple files as one big stream - output_format=streamdetails.audio_format, - extra_input_args=["-ss", str(seek_position_netto)] if seek_position_netto else [], - ): - yield chunk - async def get_resume_position(self, item_id: str, media_type: MediaType) -> tuple[bool, int]: """Return finished:bool, position_ms: int.""" progress: None | MediaProgress = None diff --git a/music_assistant/providers/filesystem_local/__init__.py b/music_assistant/providers/filesystem_local/__init__.py index f8dad59e..d4194b1c 100644 --- a/music_assistant/providers/filesystem_local/__init__.py +++ b/music_assistant/providers/filesystem_local/__init__.py @@ -59,7 +59,6 @@ from music_assistant.constants import ( VARIOUS_ARTISTS_NAME, ) from music_assistant.helpers.compare import compare_strings, create_safe_string -from music_assistant.helpers.ffmpeg import get_ffmpeg_stream from music_assistant.helpers.json import json_loads from music_assistant.helpers.playlists import parse_m3u, parse_pls from music_assistant.helpers.tags import AudioTags, async_parse_tags, parse_tags, split_items @@ -847,35 +846,6 @@ class LocalFileSystemProvider(MusicProvider): return await self._get_stream_details_for_podcast_episode(item_id) return await self._get_stream_details_for_track(item_id) - async def get_audio_stream( - self, streamdetails: StreamDetails, seek_position: int = 0 - ) -> AsyncGenerator[bytes, None]: - """ - Return the (custom) audio stream for the provider item. - - Will only be called when the stream_type is set to CUSTOM, - currently only for multi-part audiobooks. - """ - stream_data: tuple[AudioFormat, list[tuple[str, float]]] = streamdetails.data - format_org, file_based_chapters = stream_data - total_duration = 0.0 - for chapter_file, chapter_duration in file_based_chapters: - total_duration += chapter_duration - if total_duration < seek_position: - continue - seek_position_netto = round( - max(0, seek_position - (total_duration - chapter_duration)), 2 - ) - async for chunk in get_ffmpeg_stream( - self.get_absolute_path(chapter_file), - input_format=format_org, - # output format is always pcm because we are sending - # the result of multiple files as one big stream - output_format=streamdetails.audio_format, - extra_input_args=["-ss", str(seek_position_netto)] if seek_position_netto else [], - ): - yield chunk - async def resolve_image(self, path: str) -> str | bytes: """ Resolve an image from an image path. @@ -1646,29 +1616,32 @@ class LocalFileSystemProvider(MusicProvider): prov_mapping = next(x for x in library_item.provider_mappings if x.item_id == item_id) file_item = await self.resolve(item_id) - - file_based_chapters: list[tuple[str, float]] | None - if file_based_chapters := await self.cache.get( + duration = library_item.duration + chapters_cache_key = f"{self.lookup_key}.audiobook.chapters" + file_based_chapters: list[tuple[str, float]] | None = await self.cache.get( file_item.relative_path, - base_key=f"{self.lookup_key}.audiobook.chapters", - ): - # this is a multi-file audiobook, we have the chapter(files) stored in cache - # use custom stream to simply send the chapter files one by one + base_key=chapters_cache_key, + ) + if file_based_chapters is None: + # no cache available for this audiobook, we need to parse the chapters + tags = await async_parse_tags(file_item.absolute_path, file_item.file_size) + await self._parse_audiobook(file_item, tags) + file_based_chapters = await self.cache.get( + file_item.relative_path, + base_key=chapters_cache_key, + ) + + if file_based_chapters: + # this is a multi-file audiobook return StreamDetails( provider=self.instance_id, item_id=item_id, - # for the concatanated stream, we need to use a pcm stream format - audio_format=AudioFormat( - content_type=ContentType.from_bit_depth(prov_mapping.audio_format.bit_depth), - sample_rate=prov_mapping.audio_format.sample_rate, - channels=prov_mapping.audio_format.channels, - ), + audio_format=prov_mapping.audio_format, media_type=MediaType.AUDIOBOOK, - stream_type=StreamType.CUSTOM, - duration=library_item.duration, - data=(prov_mapping.audio_format, file_based_chapters), + stream_type=StreamType.MULTI_FILE, + duration=duration, + data=[self.get_absolute_path(x[0]) for x in file_based_chapters], allow_seek=True, - can_seek=True, ) # regular single-file streaming, simply let ffmpeg deal with the file directly @@ -1691,8 +1664,10 @@ class LocalFileSystemProvider(MusicProvider): ) -> tuple[int, list[MediaItemChapter]]: """Return the chapters for an audiobook.""" chapters: list[MediaItemChapter] = [] + all_chapter_files: list[tuple[str, float]] = [] + total_duration = 0.0 if tags.chapters: - # The chapters are embedded in the file + # The chapters are embedded in the file tags chapters = [ MediaItemChapter( position=chapter.chapter_id, @@ -1702,40 +1677,45 @@ class LocalFileSystemProvider(MusicProvider): ) for chapter in tags.chapters ] - return (try_parse_int(tags.duration) or 0, chapters) - # there could be multiple files for this audiobook in the same folder, - # where each file is a portion/chapter of the audiobook - # try to gather the chapters by traversing files in the same folder - chapter_file_tags: list[AudioTags] = [] - total_duration = 0.0 - abs_path = self.get_absolute_path(audiobook_file_item.parent_path) - for item in await asyncio.to_thread(sorted_scandir, self.base_path, abs_path, sort=True): - if "." not in item.relative_path or item.is_dir: - continue - if item.ext not in AUDIOBOOK_EXTENSIONS: - continue - item_tags = await async_parse_tags(item.absolute_path, item.file_size) - if not (tags.album == item_tags.album or (item_tags.tags.get("title") is None)): - continue - if item_tags.track is None: - continue - chapter_file_tags.append(item_tags) - chapter_file_tags.sort(key=lambda x: x.track or 0) - all_chapter_files: list[tuple[str, float]] = [] - for chapter_tags in chapter_file_tags: - assert chapter_tags.duration is not None - chapters.append( - MediaItemChapter( - position=chapter_tags.track or 0, - name=chapter_tags.title, - start=total_duration, - end=total_duration + chapter_tags.duration, + total_duration = try_parse_int(tags.duration) or 0 + else: + # there could be multiple files for this audiobook in the same folder, + # where each file is a portion/chapter of the audiobook + # try to gather the chapters by traversing files in the same folder + chapter_file_tags: list[AudioTags] = [] + abs_path = self.get_absolute_path(audiobook_file_item.parent_path) + for item in await asyncio.to_thread( + sorted_scandir, self.base_path, abs_path, sort=True + ): + if "." not in item.relative_path or item.is_dir: + continue + if item.ext not in AUDIOBOOK_EXTENSIONS: + continue + item_tags = await async_parse_tags(item.absolute_path, item.file_size) + if not (tags.album == item_tags.album or (item_tags.tags.get("title") is None)): + continue + if item_tags.track is None: + continue + chapter_file_tags.append(item_tags) + chapter_file_tags.sort(key=lambda x: x.track or 0) + for chapter_tags in chapter_file_tags: + assert chapter_tags.duration is not None + chapters.append( + MediaItemChapter( + position=chapter_tags.track or 0, + name=chapter_tags.title, + start=total_duration, + end=total_duration + chapter_tags.duration, + ) ) - ) - all_chapter_files.append( - (get_relative_path(self.base_path, chapter_tags.filename), chapter_tags.duration) - ) - total_duration += chapter_tags.duration + all_chapter_files.append( + ( + get_relative_path(self.base_path, chapter_tags.filename), + chapter_tags.duration, + ) + ) + total_duration += chapter_tags.duration + # store chapter files in cache # for easy access from streamdetails await self.cache.set( diff --git a/pyproject.toml b/pyproject.toml index 85554c95..b3852cc6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -24,7 +24,7 @@ dependencies = [ "ifaddr==0.2.0", "mashumaro==3.15", "music-assistant-frontend==2.13.1", - "music-assistant-models==1.1.35", + "music-assistant-models==1.1.36", "mutagen==1.47.0", "orjson==3.10.15", "pillow==11.1.0", diff --git a/requirements_all.txt b/requirements_all.txt index c7ed93c9..88659f33 100644 --- a/requirements_all.txt +++ b/requirements_all.txt @@ -27,7 +27,7 @@ ifaddr==0.2.0 liblistenbrainz==0.5.6 mashumaro==3.15 music-assistant-frontend==2.13.1 -music-assistant-models==1.1.35 +music-assistant-models==1.1.36 mutagen==1.47.0 orjson==3.10.15 pillow==11.1.0