From: Marcel van der Veldt Date: Tue, 4 Mar 2025 20:56:01 +0000 (+0100) Subject: More fixes to audio caching X-Git-Url: https://git.kitaultman.com/?a=commitdiff_plain;h=60a0b7f2ff1b1b0087bf56ae036b2dbee70fd994;p=music-assistant-server.git More fixes to audio caching Fix: finalize audio stream caching - Automatically use cache when tmpfs available - Dont use cache for very large files - Force cache for encrypted audio - Force cache for Apple Music streams - Stream from intermediate file if cache is still being created - bump models to 1.1.33 --- diff --git a/music_assistant/controllers/player_queues.py b/music_assistant/controllers/player_queues.py index a9ef3015..df653b4d 100644 --- a/music_assistant/controllers/player_queues.py +++ b/music_assistant/controllers/player_queues.py @@ -1741,6 +1741,14 @@ class PlayerQueuesController(CoreController): ): task_id = f"fill_radio_tracks_{queue_id}" self.mass.call_later(5, self._fill_radio_tracks, queue_id, task_id=task_id) + # auto clean up streamdetails from previously played items + prev_item_id = prev_state["current_item_id"] + if ( + prev_item_id + and (prev_index := self.index_by_id(queue_id, prev_item_id)) + and (prev_prev_item := self.get_item(queue_id, prev_index - 1)) + ): + prev_prev_item.streamdetails = None def _get_flow_queue_stream_index( self, queue: PlayerQueue, player: Player diff --git a/music_assistant/controllers/streams.py b/music_assistant/controllers/streams.py index acc0384f..1c977587 100644 --- a/music_assistant/controllers/streams.py +++ b/music_assistant/controllers/streams.py @@ -54,8 +54,6 @@ from music_assistant.helpers.audio import LOGGER as AUDIO_LOGGER from music_assistant.helpers.audio import ( crossfade_pcm_parts, get_chunksize, - get_hls_substream, - get_icy_radio_stream, get_media_stream, get_player_filter_params, get_silence, @@ -904,9 +902,7 @@ class StreamsController(CoreController): # collect all arguments for ffmpeg streamdetails = queue_item.streamdetails assert streamdetails - stream_type = streamdetails.stream_type filter_params = [] - extra_input_args = streamdetails.extra_input_args or [] # handle volume normalization gain_correct: float | None = None @@ -935,36 +931,6 @@ class StreamsController(CoreController): filter_params.append(f"volume={gain_correct}dB") streamdetails.volume_normalization_gain_correct = gain_correct - # work out audio source for these streamdetails - if stream_type == StreamType.CUSTOM: - audio_source = self.mass.get_provider(streamdetails.provider).get_audio_stream( - streamdetails, - seek_position=streamdetails.seek_position, - ) - elif stream_type == StreamType.ICY: - audio_source = get_icy_radio_stream(self.mass, streamdetails.path, streamdetails) - elif stream_type == StreamType.HLS: - substream = await get_hls_substream(self.mass, streamdetails.path) - audio_source = substream.path - if streamdetails.media_type == MediaType.RADIO: - # Especially the BBC streams struggle when they're played directly - # with ffmpeg, where they just stop after some minutes, - # so we tell ffmpeg to loop around in this case. - extra_input_args += ["-stream_loop", "-1", "-re"] - else: - audio_source = streamdetails.path - - # handle seek support - if ( - streamdetails.seek_position - and streamdetails.duration - and streamdetails.allow_seek - # allow seeking for custom streams, - # but only for custom streams that can't seek theirselves - and (stream_type != StreamType.CUSTOM or not streamdetails.can_seek) - ): - extra_input_args += ["-ss", str(int(streamdetails.seek_position))] - if streamdetails.media_type == MediaType.RADIO: # pad some silence before the radio stream starts to create some headroom # for radio stations that do not provide any look ahead buffer @@ -981,9 +947,7 @@ class StreamsController(CoreController): self.mass, streamdetails=streamdetails, pcm_format=pcm_format, - audio_source=audio_source, filter_params=filter_params, - extra_input_args=extra_input_args, ): yield chunk diff --git a/music_assistant/helpers/audio.py b/music_assistant/helpers/audio.py index d078f650..b184f596 100644 --- a/music_assistant/helpers/audio.py +++ b/music_assistant/helpers/audio.py @@ -31,7 +31,6 @@ from music_assistant_models.errors import ( MusicAssistantError, ProviderUnavailableError, ) -from music_assistant_models.helpers import get_global_cache_value, set_global_cache_values from music_assistant_models.streamdetails import AudioFormat from music_assistant.constants import ( @@ -47,11 +46,12 @@ from music_assistant.constants import ( from music_assistant.helpers.json import JSON_DECODE_EXCEPTIONS, json_loads from music_assistant.helpers.util import clean_stream_title +from .datetime import utc from .dsp import filter_to_ffmpeg_params from .ffmpeg import FFMpeg, get_ffmpeg_stream from .playlists import IsHLSPlaylist, PlaylistItem, fetch_playlist, parse_m3u from .process import AsyncProcess, communicate -from .util import detect_charset, has_tmpfs_mount +from .util import detect_charset, get_tmp_free_space if TYPE_CHECKING: from music_assistant_models.config_entries import CoreConfig, PlayerConfig @@ -81,71 +81,75 @@ class StreamCache: """ StreamCache. - Basic class to handle temporary caching of audio streams. - For now, based on a (in-memory) tempfile and ffmpeg. + Basic class to handle (temporary) in-memory caching of audio streams. + Useful in case of slow or unreliable network connections, faster seeking, + or when the audio stream is slow itself. """ - def acquire(self) -> str: + @property + def data_complete(self) -> bool: + """Return if the cache is complete.""" + return self._fetch_task is not None and self._fetch_task.done() + + async def acquire(self) -> str | AsyncGenerator[bytes, None]: """Acquire the cache and return the cache file path.""" - # for the edge case where the cache file is not released, - # set a fallback timer to remove the file after 20 minutes - self.mass.call_later( - 20 * 60, remove_file, self._temp_path, task_id=f"remove_file_{self._temp_path}" - ) - return self._temp_path + self.mass.cancel_timer(f"clear_cache_{self._temp_path}") + if not self.data_complete and not self._first_part_received.is_set(): + # handle the situation where the cache + # file is not created yet or already removed + await self.create() + self._subscribers += 1 + if self._all_data_written.is_set(): + # cache is completely written, return the path + return self._temp_path + return self._stream_from_cache() def release(self) -> None: """Release the cache file.""" - # edge case: MA is closing, clean down the file immediately - if self.mass.closing: - os.remove(self._temp_path) - return - # set a timer to remove the file after 1 minute - # if the file is accessed again within this 1 minute, the timer will be cancelled - self.mass.call_later( - 60, remove_file, self._temp_path, task_id=f"remove_file_{self._temp_path}" - ) + self._subscribers -= 1 + if self._subscribers == 0: + # set a timer to remove the tempfile after 1 minute + # if the file is accessed again within this period, + # the timer will be cancelled + self.mass.call_later(60, self._clear, task_id=f"clear_cache_{self._temp_path}") def __init__(self, mass: MusicAssistant, streamdetails: StreamDetails) -> None: """Initialize the StreamCache.""" self.mass = mass self.streamdetails = streamdetails - ext = streamdetails.audio_format.output_format_str - self._temp_path = f"/tmp/{shortuuid.random(20)}.{ext}" # noqa: S108 + self.logger = LOGGER.getChild("cache") + self._temp_path = f"/tmp/{shortuuid.random(20)}" # noqa: S108 self._fetch_task: asyncio.Task | None = None + self._subscribers: int = 0 + self._first_part_received = asyncio.Event() + self._all_data_written = asyncio.Event() self.org_path: str | None = streamdetails.path self.org_stream_type: StreamType | None = streamdetails.stream_type self.org_extra_input_args: list[str] | None = streamdetails.extra_input_args streamdetails.path = self._temp_path - streamdetails.stream_type = StreamType.CACHE_FILE + streamdetails.stream_type = StreamType.CACHE + streamdetails.can_seek = True + streamdetails.allow_seek = True streamdetails.extra_input_args = [] async def create(self) -> None: """Create the cache file (if needed).""" + self.mass.cancel_timer(f"clear_cache_{self._temp_path}") if await asyncio.to_thread(os.path.exists, self._temp_path): return if self._fetch_task is not None and not self._fetch_task.done(): # fetch task is already busy return self._fetch_task = self.mass.create_task(self._create_cache_file()) - # for the edge case where the cache file is not consumed at all, - # set a fallback timer to remove the file after 1 hour - self.mass.call_later( - 3600, remove_file, self._temp_path, task_id=f"remove_file_{self._temp_path}" - ) - - async def wait(self) -> None: - """ - Wait until the cache is ready. - - Optionally wait until the full file is available (e.g. when seeking). - """ - await self._fetch_task + # wait until the first part of the file is received + await self._first_part_received.wait() async def _create_cache_file(self) -> None: time_start = time.time() - LOGGER.log(VERBOSE_LOG_LEVEL, "Fetching audio stream to cache file %s", self._temp_path) - + self.logger.debug( + "Fetching audio stream for %s", + self.streamdetails.uri, + ) if self.org_stream_type == StreamType.CUSTOM: audio_source = self.mass.get_provider(self.streamdetails.provider).get_audio_stream( self.streamdetails, @@ -157,28 +161,78 @@ class StreamCache: extra_input_args = self.org_extra_input_args or [] if self.streamdetails.decryption_key: - extra_input_args += ["-decryption_key", self.streamdetails.decryption_key] - - ffmpeg = FFMpeg( + extra_input_args += [ + "-decryption_key", + self.streamdetails.decryption_key, + ] + + # we always use an intermediate ffmpeg process to fetch the original audio source + # this may feel a bit redundant, but it's the most reliable way to fetch the audio + # because ffmpeg has all logic to handle different audio formats, codecs, etc. + # and it also accounts for complicated cases such as encrypted streams or + # m4a/mp4 streams with the moov atom at the end of the file. + # ffmpeg will produce a lossless copy of the original codec to stdout. + self._first_part_received.clear() + self._all_data_written.clear() + required_bytes = get_chunksize(self.streamdetails.audio_format, 2) + async with FFMpeg( audio_input=audio_source, input_format=self.streamdetails.audio_format, output_format=self.streamdetails.audio_format, - extra_input_args=["-y", *extra_input_args], + extra_input_args=extra_input_args, audio_output=self._temp_path, + ) as ffmpeg_proc: + # wait until the first part of the file is received + while ffmpeg_proc.returncode is None: + await asyncio.sleep(0.05) + if not await asyncio.to_thread(os.path.exists, self._temp_path): + continue + if await asyncio.to_thread(os.path.getsize, self._temp_path) >= required_bytes: + break + self._first_part_received.set() + self.logger.debug( + "First part received for %s after %.2fs", + self.streamdetails.uri, + time.time() - time_start, + ) + # wait until ffmpeg is done + await ffmpeg_proc.wait() + self._all_data_written.set() + + LOGGER.debug( + "Writing all data for %s done in %.2fs", + self.streamdetails.uri, + time.time() - time_start, ) - await ffmpeg.start() - await ffmpeg.wait() - process_time = int((time.time() - time_start) * 1000) - LOGGER.log( - VERBOSE_LOG_LEVEL, - "Writing cache file %s done in %s milliseconds", - self._temp_path, - process_time, - ) + + async def _stream_from_cache(self) -> AsyncGenerator[bytes, None]: + """Stream audio from cachefile (while its still being written).""" + async with aiofiles.open(self._temp_path, "rb", buffering=0) as _file: + while True: + chunk = await _file.read(64000) + if not chunk and self._all_data_written.is_set(): + break + elif not chunk: + await asyncio.sleep(0.05) + else: + yield chunk + + async def _clear(self) -> None: + """Clear the cache.""" + self._first_part_received.clear() + self._all_data_written.clear() + self._fetch_task = None + await remove_file(self._temp_path) def __del__(self) -> None: """Ensure the temp file gets cleaned up.""" + if self.mass.closing: + # edge case: MA is closing, clean down the file immediately + if os.path.isfile(self._temp_path): + os.remove(self._temp_path) + return self.mass.loop.call_soon_threadsafe(self.mass.create_task, remove_file(self._temp_path)) + self.mass.cancel_timer(f"remove_file_{self._temp_path}") async def crossfade_pcm_parts( @@ -392,7 +446,6 @@ async def get_stream_details( seek_position: int = 0, fade_in: bool = False, prefer_album_loudness: bool = False, - is_start: bool = True, ) -> StreamDetails: """ Get streamdetails for the given QueueItem. @@ -412,13 +465,12 @@ async def get_stream_details( raise MediaNotFoundError( f"Unable to retrieve streamdetails for {queue_item.name} ({queue_item.uri})" ) - if queue_item.streamdetails and ( - not queue_item.streamdetails.seconds_streamed - or queue_item.streamdetails.stream_type == StreamType.CACHE_FILE - ): + if queue_item.streamdetails and (utc() - queue_item.streamdetails.created_at).seconds < 1800: # already got a fresh/unused (or cached) streamdetails + # we assume that the streamdetails are valid for max 30 minutes streamdetails = queue_item.streamdetails else: + # retrieve streamdetails from provider media_item = queue_item.media_item # sort by quality and check item's availability for prov_media in sorted( @@ -446,14 +498,15 @@ async def get_stream_details( f"Unable to retrieve streamdetails for {queue_item.name} ({queue_item.uri})" ) - # work out how to handle radio stream - if ( - streamdetails.stream_type in (StreamType.ICY, StreamType.HLS, StreamType.HTTP) - and streamdetails.media_type == MediaType.RADIO - ): - resolved_url, stream_type = await resolve_radio_stream(mass, streamdetails.path) - streamdetails.path = resolved_url - streamdetails.stream_type = stream_type + # work out how to handle radio stream + if ( + streamdetails.stream_type in (StreamType.ICY, StreamType.HLS, StreamType.HTTP) + and streamdetails.media_type == MediaType.RADIO + ): + resolved_url, stream_type = await resolve_radio_stream(mass, streamdetails.path) + streamdetails.path = resolved_url + streamdetails.stream_type = stream_type + # set queue_id on the streamdetails so we know what is being streamed streamdetails.queue_id = queue_item.queue_id # handle skip/fade_in details @@ -482,11 +535,10 @@ async def get_stream_details( # attach the DSP details of all group members streamdetails.dsp = get_stream_dsp_details(mass, streamdetails.queue_id) - process_time = int((time.time() - time_start) * 1000) LOGGER.debug( "retrieved streamdetails for %s in %s milliseconds", queue_item.uri, - process_time, + int((time.time() - time_start) * 1000), ) if streamdetails.decryption_key: @@ -495,17 +547,15 @@ async def get_stream_details( # determine if we may use a temporary cache for the audio stream if streamdetails.enable_cache is None: - tmpfs_present = get_global_cache_value("tmpfs_present") - if tmpfs_present is None: - tmpfs_present = await has_tmpfs_mount() - await set_global_cache_values({"tmpfs_present": tmpfs_present}) streamdetails.enable_cache = ( - not is_start - and tmpfs_present - and streamdetails.duration is not None - and streamdetails.duration < 1800 + streamdetails.duration is not None + and streamdetails.media_type + in (MediaType.TRACK, MediaType.AUDIOBOOK, MediaType.PODCAST_EPISODE) and streamdetails.stream_type in (StreamType.HTTP, StreamType.ENCRYPTED_HTTP, StreamType.CUSTOM, StreamType.HLS) + and streamdetails.audio_format.content_type != ContentType.UNKNOWN + and await get_tmp_free_space() > 512 * 1024 * 1024 + and get_chunksize(streamdetails.audio_format, streamdetails.duration) < 100000000 ) # handle temporary cache support of audio stream @@ -514,9 +564,13 @@ async def get_stream_details( streamdetails.cache = StreamCache(mass, streamdetails) else: streamdetails.cache = cast(StreamCache, streamdetails.cache) + # create cache (if needed) and wait until the cache is available await streamdetails.cache.create() - # wait until the cache file is available - await streamdetails.cache.wait() + LOGGER.debug( + "streamdetails cache ready for %s in %s milliseconds", + queue_item.uri, + int((time.time() - time_start) * 1000), + ) return streamdetails @@ -525,22 +579,51 @@ async def get_media_stream( mass: MusicAssistant, streamdetails: StreamDetails, pcm_format: AudioFormat, - audio_source: AsyncGenerator[bytes, None] | str, filter_params: list[str] | None = None, - extra_input_args: list[str] | None = None, ) -> AsyncGenerator[bytes, None]: """Get PCM audio stream for given media details.""" logger = LOGGER.getChild("media_stream") logger.log(VERBOSE_LOG_LEVEL, "Starting media stream for %s", streamdetails.uri) + extra_input_args = streamdetails.extra_input_args or [] strip_silence_begin = streamdetails.strip_silence_begin strip_silence_end = streamdetails.strip_silence_end if streamdetails.fade_in: filter_params.append("afade=type=in:start_time=0:duration=3") strip_silence_begin = False - if streamdetails.stream_type == StreamType.CACHE_FILE: + # work out audio source for these streamdetails + stream_type = streamdetails.stream_type + if stream_type == StreamType.CACHE: cache = cast(StreamCache, streamdetails.cache) - audio_source = cache.acquire() + audio_source = await cache.acquire() + elif stream_type == StreamType.CUSTOM: + audio_source = mass.get_provider(streamdetails.provider).get_audio_stream( + streamdetails, + seek_position=streamdetails.seek_position, + ) + elif stream_type == StreamType.ICY: + audio_source = get_icy_radio_stream(mass, streamdetails.path, streamdetails) + elif stream_type == StreamType.HLS: + substream = await get_hls_substream(mass, streamdetails.path) + audio_source = substream.path + if streamdetails.media_type == MediaType.RADIO: + # Especially the BBC streams struggle when they're played directly + # with ffmpeg, where they just stop after some minutes, + # so we tell ffmpeg to loop around in this case. + extra_input_args += ["-stream_loop", "-1", "-re"] + else: + audio_source = streamdetails.path + + # handle seek support + if ( + streamdetails.seek_position + and streamdetails.duration + and streamdetails.allow_seek + # allow seeking for custom streams, + # but only for custom streams that can't seek theirselves + and (stream_type != StreamType.CUSTOM or not streamdetails.can_seek) + ): + extra_input_args += ["-ss", str(int(streamdetails.seek_position))] bytes_sent = 0 chunk_number = 0 @@ -591,14 +674,14 @@ async def get_media_stream( req_buffer_size = int(pcm_format.pcm_sample_size * 5) elif chunk_number > 240 and strip_silence_end: req_buffer_size = int(pcm_format.pcm_sample_size * 10) - elif chunk_number > 60 and strip_silence_end: + elif chunk_number > 120 and strip_silence_end: req_buffer_size = int(pcm_format.pcm_sample_size * 8) - elif chunk_number > 30: + elif chunk_number > 60: + req_buffer_size = int(pcm_format.pcm_sample_size * 6) + elif chunk_number > 20 and strip_silence_end: req_buffer_size = int(pcm_format.pcm_sample_size * 4) - elif chunk_number > 10 and strip_silence_end: - req_buffer_size = int(pcm_format.pcm_sample_size * 2) else: - req_buffer_size = pcm_format.pcm_sample_size + req_buffer_size = pcm_format.pcm_sample_size * 2 # always append to buffer buffer += chunk @@ -608,8 +691,9 @@ async def get_media_stream( # buffer is not full enough, move on continue - if chunk_number == 5 and strip_silence_begin: + if strip_silence_begin: # strip silence from begin of audio + strip_silence_begin = False chunk = await strip_silence( # noqa: PLW2901 mass, buffer, pcm_format=pcm_format ) @@ -722,8 +806,8 @@ async def get_media_stream( ): mass.create_task(music_prov.on_streamed(streamdetails)) - # schedule removal of cache file - if streamdetails.stream_type == StreamType.CACHE_FILE: + # release cache file + if streamdetails.stream_type == StreamType.CACHE: cache = cast(StreamCache, streamdetails.cache) cache.release() @@ -1278,7 +1362,10 @@ async def analyze_loudness( "-t", "600", ] - if streamdetails.stream_type == StreamType.CUSTOM: + if streamdetails.stream_type == StreamType.CACHE: + cache = cast(StreamCache, streamdetails.cache) + audio_source = await cache.acquire() + elif streamdetails.stream_type == StreamType.CUSTOM: audio_source = mass.get_provider(streamdetails.provider).get_audio_stream( streamdetails, ) @@ -1329,6 +1416,10 @@ async def analyze_loudness( streamdetails.uri, loudness, ) + # release cache file + if streamdetails.stream_type == StreamType.CACHE: + cache = cast(StreamCache, streamdetails.cache) + cache.release() def _get_normalization_mode( diff --git a/music_assistant/helpers/ffmpeg.py b/music_assistant/helpers/ffmpeg.py index 3f457107..717000d0 100644 --- a/music_assistant/helpers/ffmpeg.py +++ b/music_assistant/helpers/ffmpeg.py @@ -79,6 +79,8 @@ class FFMpeg(AsyncProcess): clean_args.append("") elif "/" in arg and "." in arg: clean_args.append("") + elif arg.startswith("data:application/"): + clean_args.append("") else: clean_args.append(arg) args_str = " ".join(clean_args) @@ -104,9 +106,9 @@ class FFMpeg(AsyncProcess): if self.collect_log_history: self.log_history.append(line) if "error" in line or "warning" in line: - self.logger.debug(line) - elif "critical" in line: self.logger.warning(line) + elif "critical" in line: + self.logger.error(line) else: self.logger.log(VERBOSE_LOG_LEVEL, line) @@ -142,15 +144,6 @@ class FFMpeg(AsyncProcess): try: start = time.time() self.logger.debug("Start reading audio data from source...") - # use TimedAsyncGenerator to catch we're stuck waiting on data forever - # don't set this timeout too low because in some cases it can indeed take a while - # for data to arrive (e.g. when there is X amount of seconds in the buffer) - # so this timeout is just to catch if the source is stuck and rpeort it and not - # to recover from it. - # async for chunk in TimedAsyncGenerator(self.audio_input, timeout=300): - # if self.closed: - # return - # await self.write(chunk) async for chunk in self.audio_input: if self.closed: return @@ -182,6 +175,8 @@ async def get_ffmpeg_stream( extra_args: list[str] | None = None, chunk_size: int | None = None, extra_input_args: list[str] | None = None, + collect_log_history: bool = False, + loglevel: str = "info", ) -> AsyncGenerator[bytes, None]: """ Get the ffmpeg audio stream as async generator. @@ -196,6 +191,8 @@ async def get_ffmpeg_stream( filter_params=filter_params, extra_args=extra_args, extra_input_args=extra_input_args, + collect_log_history=collect_log_history, + loglevel=loglevel, ) as ffmpeg_proc: # read final chunks from stdout iterator = ffmpeg_proc.iter_chunked(chunk_size) if chunk_size else ffmpeg_proc.iter_any() @@ -203,7 +200,7 @@ async def get_ffmpeg_stream( yield chunk -def get_ffmpeg_args( +def get_ffmpeg_args( # noqa: PLR0915 input_format: AudioFormat, output_format: AudioFormat, filter_params: list[str], @@ -226,8 +223,6 @@ def get_ffmpeg_args( "-ignore_unknown", "-protocol_whitelist", "file,hls,http,https,tcp,tls,crypto,pipe,data,fd,rtp,udp,concat", - "-probesize", - "8192", ] # collect input args input_args = [] @@ -269,6 +264,8 @@ def get_ffmpeg_args( "-i", input_path, ] + elif input_format.codec_type != ContentType.UNKNOWN: + input_args += ["-acodec", input_format.codec_type.name.lower(), "-i", input_path] else: # let ffmpeg auto detect the content type from the metadata/headers input_args += ["-i", input_path] @@ -284,6 +281,38 @@ def get_ffmpeg_args( # devnull stream output_path = "-" output_args = ["-f", "null"] + elif output_format.content_type.is_pcm(): + # use explicit format identifier for pcm formats + output_args += [ + "-ar", + str(output_format.sample_rate), + "-acodec", + output_format.content_type.name.lower(), + "-f", + output_format.content_type.value, + ] + elif input_format == output_format: + # passthrough + if output_format.content_type in ( + ContentType.MP4, + ContentType.MP4A, + ContentType.M4A, + ContentType.M4B, + ): + fmt = "adts" + elif output_format.codec_type != ContentType.UNKNOWN: + fmt = output_format.codec_type.name.lower() + else: + fmt = output_format.content_type.name.lower() + output_args = [ + "-vn", + "-dn", + "-sn", + "-acodec", + "copy", + "-f", + fmt, + ] elif output_format.content_type == ContentType.AAC: output_args = ["-f", "adts", "-c:a", "aac", "-b:a", "256k"] elif output_format.content_type == ContentType.MP3: @@ -311,19 +340,7 @@ def get_ffmpeg_args( "-compression_level", "0", ] - elif output_format.content_type.is_pcm(): - # use explicit format identifier for pcm formats - output_args += [ - "-ar", - str(output_format.sample_rate), - "-acodec", - output_format.content_type.name.lower(), - "-f", - output_format.content_type.value, - ] - elif input_format == output_format: - # passthrough - output_args = ["-c", "copy"] + else: raise RuntimeError("Invalid/unsupported output format specified") diff --git a/music_assistant/helpers/util.py b/music_assistant/helpers/util.py index a0fe11f5..534c7a27 100644 --- a/music_assistant/helpers/util.py +++ b/music_assistant/helpers/util.py @@ -8,6 +8,7 @@ import importlib import logging import os import re +import shutil import socket import urllib.error import urllib.parse @@ -464,6 +465,15 @@ async def has_tmpfs_mount() -> bool: return False +async def get_tmp_free_space() -> int: + """Return free space on tmp.""" + try: + if res := await asyncio.to_thread(shutil.disk_usage, "/tmp"): # noqa: S108 + return res.free + except (FileNotFoundError, OSError, PermissionError): + return 0 + + def divide_chunks(data: bytes, chunk_size: int) -> Iterator[bytes]: """Chunk bytes data into smaller chunks.""" for i in range(0, len(data), chunk_size): diff --git a/music_assistant/mass.py b/music_assistant/mass.py index f51d9188..120ecabb 100644 --- a/music_assistant/mass.py +++ b/music_assistant/mass.py @@ -446,6 +446,16 @@ class MusicAssistant: msg = "Task does not exist" raise KeyError(msg) + def cancel_task(self, task_id: str) -> None: + """Cancel existing scheduled task.""" + if existing := self._tracked_tasks.pop(task_id, None): + existing.cancel() + + def cancel_timer(self, task_id: str) -> None: + """Cancel existing scheduled timer.""" + if existing := self._tracked_timers.pop(task_id, None): + existing.cancel() + def register_api_command( self, command: str, diff --git a/music_assistant/providers/apple_music/__init__.py b/music_assistant/providers/apple_music/__init__.py index df388b22..3785e802 100644 --- a/music_assistant/providers/apple_music/__init__.py +++ b/music_assistant/providers/apple_music/__init__.py @@ -364,13 +364,13 @@ class AppleMusicProvider(MusicProvider): return StreamDetails( item_id=item_id, provider=self.lookup_key, - audio_format=AudioFormat(content_type=ContentType.M4A, codec_type=ContentType.AAC), + audio_format=AudioFormat(content_type=ContentType.MP4, codec_type=ContentType.AAC), stream_type=StreamType.ENCRYPTED_HTTP, decryption_key=await self._get_decryption_key(license_url, key_id, uri, item_id), path=stream_url, can_seek=True, allow_seek=True, - # enforce caching because the apple streams are m4a files with moov atom at the end + # enforce caching because the apple streams are mp4 files with moov atom at the end enable_cache=True, ) diff --git a/music_assistant/providers/spotify/__init__.py b/music_assistant/providers/spotify/__init__.py index e8588120..3df36273 100644 --- a/music_assistant/providers/spotify/__init__.py +++ b/music_assistant/providers/spotify/__init__.py @@ -610,7 +610,7 @@ class SpotifyProvider(MusicProvider): # get first chunk with timeout, to catch the issue where librespot is not starting # which seems to happen from time to time (but rarely) try: - chunk = await asyncio.wait_for(librespot_proc.read(64000), timeout=3) + chunk = await asyncio.wait_for(librespot_proc.read(64000), timeout=5) yield chunk except TimeoutError: raise AudioError("No audio received from librespot within timeout") diff --git a/pyproject.toml b/pyproject.toml index 3f965525..ebbd1330 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -24,7 +24,7 @@ dependencies = [ "ifaddr==0.2.0", "mashumaro==3.15", "music-assistant-frontend==2.12.2", - "music-assistant-models==1.1.32", + "music-assistant-models==1.1.33", "mutagen==1.47.0", "orjson==3.10.12", "pillow==11.1.0", diff --git a/requirements_all.txt b/requirements_all.txt index 627d867d..e5d30c63 100644 --- a/requirements_all.txt +++ b/requirements_all.txt @@ -26,7 +26,7 @@ ibroadcastaio==0.4.0 ifaddr==0.2.0 mashumaro==3.15 music-assistant-frontend==2.12.2 -music-assistant-models==1.1.32 +music-assistant-models==1.1.33 mutagen==1.47.0 orjson==3.10.12 pillow==11.1.0