More fixes to audio caching
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Tue, 4 Mar 2025 20:56:01 +0000 (21:56 +0100)
committerMarcel van der Veldt <m.vanderveldt@outlook.com>
Wed, 5 Mar 2025 22:40:03 +0000 (23:40 +0100)
Fix: finalize audio stream caching

- Automatically use cache when tmpfs available
- Dont use cache for very large files
- Force cache for encrypted audio
- Force cache for Apple Music streams
- Stream from intermediate file if cache is still being created
- bump models to 1.1.33

music_assistant/controllers/player_queues.py
music_assistant/controllers/streams.py
music_assistant/helpers/audio.py
music_assistant/helpers/ffmpeg.py
music_assistant/helpers/util.py
music_assistant/mass.py
music_assistant/providers/apple_music/__init__.py
music_assistant/providers/spotify/__init__.py
pyproject.toml
requirements_all.txt

index a9ef30157c34ce833d569484840405b5414b5bec..df653b4d1ec84ad5303fc26dd06594a9d3ec284a 100644 (file)
@@ -1741,6 +1741,14 @@ class PlayerQueuesController(CoreController):
             ):
                 task_id = f"fill_radio_tracks_{queue_id}"
                 self.mass.call_later(5, self._fill_radio_tracks, queue_id, task_id=task_id)
+            # auto clean up streamdetails from previously played items
+            prev_item_id = prev_state["current_item_id"]
+            if (
+                prev_item_id
+                and (prev_index := self.index_by_id(queue_id, prev_item_id))
+                and (prev_prev_item := self.get_item(queue_id, prev_index - 1))
+            ):
+                prev_prev_item.streamdetails = None
 
     def _get_flow_queue_stream_index(
         self, queue: PlayerQueue, player: Player
index acc0384fc905ced47f109f5b2e8a6024538f202b..1c9775876d42914538ea303218e25eeb5462ea24 100644 (file)
@@ -54,8 +54,6 @@ from music_assistant.helpers.audio import LOGGER as AUDIO_LOGGER
 from music_assistant.helpers.audio import (
     crossfade_pcm_parts,
     get_chunksize,
-    get_hls_substream,
-    get_icy_radio_stream,
     get_media_stream,
     get_player_filter_params,
     get_silence,
@@ -904,9 +902,7 @@ class StreamsController(CoreController):
         # collect all arguments for ffmpeg
         streamdetails = queue_item.streamdetails
         assert streamdetails
-        stream_type = streamdetails.stream_type
         filter_params = []
-        extra_input_args = streamdetails.extra_input_args or []
 
         # handle volume normalization
         gain_correct: float | None = None
@@ -935,36 +931,6 @@ class StreamsController(CoreController):
             filter_params.append(f"volume={gain_correct}dB")
         streamdetails.volume_normalization_gain_correct = gain_correct
 
-        # work out audio source for these streamdetails
-        if stream_type == StreamType.CUSTOM:
-            audio_source = self.mass.get_provider(streamdetails.provider).get_audio_stream(
-                streamdetails,
-                seek_position=streamdetails.seek_position,
-            )
-        elif stream_type == StreamType.ICY:
-            audio_source = get_icy_radio_stream(self.mass, streamdetails.path, streamdetails)
-        elif stream_type == StreamType.HLS:
-            substream = await get_hls_substream(self.mass, streamdetails.path)
-            audio_source = substream.path
-            if streamdetails.media_type == MediaType.RADIO:
-                # Especially the BBC streams struggle when they're played directly
-                # with ffmpeg, where they just stop after some minutes,
-                # so we tell ffmpeg to loop around in this case.
-                extra_input_args += ["-stream_loop", "-1", "-re"]
-        else:
-            audio_source = streamdetails.path
-
-        # handle seek support
-        if (
-            streamdetails.seek_position
-            and streamdetails.duration
-            and streamdetails.allow_seek
-            # allow seeking for custom streams,
-            # but only for custom streams that can't seek theirselves
-            and (stream_type != StreamType.CUSTOM or not streamdetails.can_seek)
-        ):
-            extra_input_args += ["-ss", str(int(streamdetails.seek_position))]
-
         if streamdetails.media_type == MediaType.RADIO:
             # pad some silence before the radio stream starts to create some headroom
             # for radio stations that do not provide any look ahead buffer
@@ -981,9 +947,7 @@ class StreamsController(CoreController):
             self.mass,
             streamdetails=streamdetails,
             pcm_format=pcm_format,
-            audio_source=audio_source,
             filter_params=filter_params,
-            extra_input_args=extra_input_args,
         ):
             yield chunk
 
index d078f6501e2fb3ad4b3ef70a5a2f9b9fe461679e..b184f59696f1c2bf51d0ed666411b706500ec707 100644 (file)
@@ -31,7 +31,6 @@ from music_assistant_models.errors import (
     MusicAssistantError,
     ProviderUnavailableError,
 )
-from music_assistant_models.helpers import get_global_cache_value, set_global_cache_values
 from music_assistant_models.streamdetails import AudioFormat
 
 from music_assistant.constants import (
@@ -47,11 +46,12 @@ from music_assistant.constants import (
 from music_assistant.helpers.json import JSON_DECODE_EXCEPTIONS, json_loads
 from music_assistant.helpers.util import clean_stream_title
 
+from .datetime import utc
 from .dsp import filter_to_ffmpeg_params
 from .ffmpeg import FFMpeg, get_ffmpeg_stream
 from .playlists import IsHLSPlaylist, PlaylistItem, fetch_playlist, parse_m3u
 from .process import AsyncProcess, communicate
-from .util import detect_charset, has_tmpfs_mount
+from .util import detect_charset, get_tmp_free_space
 
 if TYPE_CHECKING:
     from music_assistant_models.config_entries import CoreConfig, PlayerConfig
@@ -81,71 +81,75 @@ class StreamCache:
     """
     StreamCache.
 
-    Basic class to handle temporary caching of audio streams.
-    For now, based on a (in-memory) tempfile and ffmpeg.
+    Basic class to handle (temporary) in-memory caching of audio streams.
+    Useful in case of slow or unreliable network connections, faster seeking,
+    or when the audio stream is slow itself.
     """
 
-    def acquire(self) -> str:
+    @property
+    def data_complete(self) -> bool:
+        """Return if the cache is complete."""
+        return self._fetch_task is not None and self._fetch_task.done()
+
+    async def acquire(self) -> str | AsyncGenerator[bytes, None]:
         """Acquire the cache and return the cache file path."""
-        # for the edge case where the cache file is not released,
-        # set a fallback timer to remove the file after 20 minutes
-        self.mass.call_later(
-            20 * 60, remove_file, self._temp_path, task_id=f"remove_file_{self._temp_path}"
-        )
-        return self._temp_path
+        self.mass.cancel_timer(f"clear_cache_{self._temp_path}")
+        if not self.data_complete and not self._first_part_received.is_set():
+            # handle the situation where the cache
+            # file is not created yet or already removed
+            await self.create()
+        self._subscribers += 1
+        if self._all_data_written.is_set():
+            # cache is completely written, return the path
+            return self._temp_path
+        return self._stream_from_cache()
 
     def release(self) -> None:
         """Release the cache file."""
-        # edge case: MA is closing, clean down the file immediately
-        if self.mass.closing:
-            os.remove(self._temp_path)
-            return
-        # set a timer to remove the file after 1 minute
-        # if the file is accessed again within this 1 minute, the timer will be cancelled
-        self.mass.call_later(
-            60, remove_file, self._temp_path, task_id=f"remove_file_{self._temp_path}"
-        )
+        self._subscribers -= 1
+        if self._subscribers == 0:
+            # set a timer to remove the tempfile after 1 minute
+            # if the file is accessed again within this period,
+            # the timer will be cancelled
+            self.mass.call_later(60, self._clear, task_id=f"clear_cache_{self._temp_path}")
 
     def __init__(self, mass: MusicAssistant, streamdetails: StreamDetails) -> None:
         """Initialize the StreamCache."""
         self.mass = mass
         self.streamdetails = streamdetails
-        ext = streamdetails.audio_format.output_format_str
-        self._temp_path = f"/tmp/{shortuuid.random(20)}.{ext}"  # noqa: S108
+        self.logger = LOGGER.getChild("cache")
+        self._temp_path = f"/tmp/{shortuuid.random(20)}"  # noqa: S108
         self._fetch_task: asyncio.Task | None = None
+        self._subscribers: int = 0
+        self._first_part_received = asyncio.Event()
+        self._all_data_written = asyncio.Event()
         self.org_path: str | None = streamdetails.path
         self.org_stream_type: StreamType | None = streamdetails.stream_type
         self.org_extra_input_args: list[str] | None = streamdetails.extra_input_args
         streamdetails.path = self._temp_path
-        streamdetails.stream_type = StreamType.CACHE_FILE
+        streamdetails.stream_type = StreamType.CACHE
+        streamdetails.can_seek = True
+        streamdetails.allow_seek = True
         streamdetails.extra_input_args = []
 
     async def create(self) -> None:
         """Create the cache file (if needed)."""
+        self.mass.cancel_timer(f"clear_cache_{self._temp_path}")
         if await asyncio.to_thread(os.path.exists, self._temp_path):
             return
         if self._fetch_task is not None and not self._fetch_task.done():
             # fetch task is already busy
             return
         self._fetch_task = self.mass.create_task(self._create_cache_file())
-        # for the edge case where the cache file is not consumed at all,
-        # set a fallback timer to remove the file after 1 hour
-        self.mass.call_later(
-            3600, remove_file, self._temp_path, task_id=f"remove_file_{self._temp_path}"
-        )
-
-    async def wait(self) -> None:
-        """
-        Wait until the cache is ready.
-
-        Optionally wait until the full file is available (e.g. when seeking).
-        """
-        await self._fetch_task
+        # wait until the first part of the file is received
+        await self._first_part_received.wait()
 
     async def _create_cache_file(self) -> None:
         time_start = time.time()
-        LOGGER.log(VERBOSE_LOG_LEVEL, "Fetching audio stream to cache file %s", self._temp_path)
-
+        self.logger.debug(
+            "Fetching audio stream for %s",
+            self.streamdetails.uri,
+        )
         if self.org_stream_type == StreamType.CUSTOM:
             audio_source = self.mass.get_provider(self.streamdetails.provider).get_audio_stream(
                 self.streamdetails,
@@ -157,28 +161,78 @@ class StreamCache:
 
         extra_input_args = self.org_extra_input_args or []
         if self.streamdetails.decryption_key:
-            extra_input_args += ["-decryption_key", self.streamdetails.decryption_key]
-
-        ffmpeg = FFMpeg(
+            extra_input_args += [
+                "-decryption_key",
+                self.streamdetails.decryption_key,
+            ]
+
+        # we always use an intermediate ffmpeg process to fetch the original audio source
+        # this may feel a bit redundant, but it's the most reliable way to fetch the audio
+        # because ffmpeg has all logic to handle different audio formats, codecs, etc.
+        # and it also accounts for complicated cases such as encrypted streams or
+        # m4a/mp4 streams with the moov atom at the end of the file.
+        # ffmpeg will produce a lossless copy of the original codec to stdout.
+        self._first_part_received.clear()
+        self._all_data_written.clear()
+        required_bytes = get_chunksize(self.streamdetails.audio_format, 2)
+        async with FFMpeg(
             audio_input=audio_source,
             input_format=self.streamdetails.audio_format,
             output_format=self.streamdetails.audio_format,
-            extra_input_args=["-y", *extra_input_args],
+            extra_input_args=extra_input_args,
             audio_output=self._temp_path,
+        ) as ffmpeg_proc:
+            # wait until the first part of the file is received
+            while ffmpeg_proc.returncode is None:
+                await asyncio.sleep(0.05)
+                if not await asyncio.to_thread(os.path.exists, self._temp_path):
+                    continue
+                if await asyncio.to_thread(os.path.getsize, self._temp_path) >= required_bytes:
+                    break
+            self._first_part_received.set()
+            self.logger.debug(
+                "First part received for %s after %.2fs",
+                self.streamdetails.uri,
+                time.time() - time_start,
+            )
+            # wait until ffmpeg is done
+            await ffmpeg_proc.wait()
+            self._all_data_written.set()
+
+        LOGGER.debug(
+            "Writing all data for %s done in %.2fs",
+            self.streamdetails.uri,
+            time.time() - time_start,
         )
-        await ffmpeg.start()
-        await ffmpeg.wait()
-        process_time = int((time.time() - time_start) * 1000)
-        LOGGER.log(
-            VERBOSE_LOG_LEVEL,
-            "Writing cache file %s done in %s milliseconds",
-            self._temp_path,
-            process_time,
-        )
+
+    async def _stream_from_cache(self) -> AsyncGenerator[bytes, None]:
+        """Stream audio from cachefile (while its still being written)."""
+        async with aiofiles.open(self._temp_path, "rb", buffering=0) as _file:
+            while True:
+                chunk = await _file.read(64000)
+                if not chunk and self._all_data_written.is_set():
+                    break
+                elif not chunk:
+                    await asyncio.sleep(0.05)
+                else:
+                    yield chunk
+
+    async def _clear(self) -> None:
+        """Clear the cache."""
+        self._first_part_received.clear()
+        self._all_data_written.clear()
+        self._fetch_task = None
+        await remove_file(self._temp_path)
 
     def __del__(self) -> None:
         """Ensure the temp file gets cleaned up."""
+        if self.mass.closing:
+            # edge case: MA is closing, clean down the file immediately
+            if os.path.isfile(self._temp_path):
+                os.remove(self._temp_path)
+            return
         self.mass.loop.call_soon_threadsafe(self.mass.create_task, remove_file(self._temp_path))
+        self.mass.cancel_timer(f"remove_file_{self._temp_path}")
 
 
 async def crossfade_pcm_parts(
@@ -392,7 +446,6 @@ async def get_stream_details(
     seek_position: int = 0,
     fade_in: bool = False,
     prefer_album_loudness: bool = False,
-    is_start: bool = True,
 ) -> StreamDetails:
     """
     Get streamdetails for the given QueueItem.
@@ -412,13 +465,12 @@ async def get_stream_details(
         raise MediaNotFoundError(
             f"Unable to retrieve streamdetails for {queue_item.name} ({queue_item.uri})"
         )
-    if queue_item.streamdetails and (
-        not queue_item.streamdetails.seconds_streamed
-        or queue_item.streamdetails.stream_type == StreamType.CACHE_FILE
-    ):
+    if queue_item.streamdetails and (utc() - queue_item.streamdetails.created_at).seconds < 1800:
         # already got a fresh/unused (or cached) streamdetails
+        # we assume that the streamdetails are valid for max 30 minutes
         streamdetails = queue_item.streamdetails
     else:
+        # retrieve streamdetails from provider
         media_item = queue_item.media_item
         # sort by quality and check item's availability
         for prov_media in sorted(
@@ -446,14 +498,15 @@ async def get_stream_details(
                 f"Unable to retrieve streamdetails for {queue_item.name} ({queue_item.uri})"
             )
 
-    # work out how to handle radio stream
-    if (
-        streamdetails.stream_type in (StreamType.ICY, StreamType.HLS, StreamType.HTTP)
-        and streamdetails.media_type == MediaType.RADIO
-    ):
-        resolved_url, stream_type = await resolve_radio_stream(mass, streamdetails.path)
-        streamdetails.path = resolved_url
-        streamdetails.stream_type = stream_type
+        # work out how to handle radio stream
+        if (
+            streamdetails.stream_type in (StreamType.ICY, StreamType.HLS, StreamType.HTTP)
+            and streamdetails.media_type == MediaType.RADIO
+        ):
+            resolved_url, stream_type = await resolve_radio_stream(mass, streamdetails.path)
+            streamdetails.path = resolved_url
+            streamdetails.stream_type = stream_type
+
     # set queue_id on the streamdetails so we know what is being streamed
     streamdetails.queue_id = queue_item.queue_id
     # handle skip/fade_in details
@@ -482,11 +535,10 @@ async def get_stream_details(
     # attach the DSP details of all group members
     streamdetails.dsp = get_stream_dsp_details(mass, streamdetails.queue_id)
 
-    process_time = int((time.time() - time_start) * 1000)
     LOGGER.debug(
         "retrieved streamdetails for %s in %s milliseconds",
         queue_item.uri,
-        process_time,
+        int((time.time() - time_start) * 1000),
     )
 
     if streamdetails.decryption_key:
@@ -495,17 +547,15 @@ async def get_stream_details(
 
     # determine if we may use a temporary cache for the audio stream
     if streamdetails.enable_cache is None:
-        tmpfs_present = get_global_cache_value("tmpfs_present")
-        if tmpfs_present is None:
-            tmpfs_present = await has_tmpfs_mount()
-            await set_global_cache_values({"tmpfs_present": tmpfs_present})
         streamdetails.enable_cache = (
-            not is_start
-            and tmpfs_present
-            and streamdetails.duration is not None
-            and streamdetails.duration < 1800
+            streamdetails.duration is not None
+            and streamdetails.media_type
+            in (MediaType.TRACK, MediaType.AUDIOBOOK, MediaType.PODCAST_EPISODE)
             and streamdetails.stream_type
             in (StreamType.HTTP, StreamType.ENCRYPTED_HTTP, StreamType.CUSTOM, StreamType.HLS)
+            and streamdetails.audio_format.content_type != ContentType.UNKNOWN
+            and await get_tmp_free_space() > 512 * 1024 * 1024
+            and get_chunksize(streamdetails.audio_format, streamdetails.duration) < 100000000
         )
 
     # handle temporary cache support of audio stream
@@ -514,9 +564,13 @@ async def get_stream_details(
             streamdetails.cache = StreamCache(mass, streamdetails)
         else:
             streamdetails.cache = cast(StreamCache, streamdetails.cache)
+        # create cache (if needed) and wait until the cache is available
         await streamdetails.cache.create()
-        # wait until the cache file is available
-        await streamdetails.cache.wait()
+        LOGGER.debug(
+            "streamdetails cache ready for %s in %s milliseconds",
+            queue_item.uri,
+            int((time.time() - time_start) * 1000),
+        )
 
     return streamdetails
 
@@ -525,22 +579,51 @@ async def get_media_stream(
     mass: MusicAssistant,
     streamdetails: StreamDetails,
     pcm_format: AudioFormat,
-    audio_source: AsyncGenerator[bytes, None] | str,
     filter_params: list[str] | None = None,
-    extra_input_args: list[str] | None = None,
 ) -> AsyncGenerator[bytes, None]:
     """Get PCM audio stream for given media details."""
     logger = LOGGER.getChild("media_stream")
     logger.log(VERBOSE_LOG_LEVEL, "Starting media stream for %s", streamdetails.uri)
+    extra_input_args = streamdetails.extra_input_args or []
     strip_silence_begin = streamdetails.strip_silence_begin
     strip_silence_end = streamdetails.strip_silence_end
     if streamdetails.fade_in:
         filter_params.append("afade=type=in:start_time=0:duration=3")
         strip_silence_begin = False
 
-    if streamdetails.stream_type == StreamType.CACHE_FILE:
+    # work out audio source for these streamdetails
+    stream_type = streamdetails.stream_type
+    if stream_type == StreamType.CACHE:
         cache = cast(StreamCache, streamdetails.cache)
-        audio_source = cache.acquire()
+        audio_source = await cache.acquire()
+    elif stream_type == StreamType.CUSTOM:
+        audio_source = mass.get_provider(streamdetails.provider).get_audio_stream(
+            streamdetails,
+            seek_position=streamdetails.seek_position,
+        )
+    elif stream_type == StreamType.ICY:
+        audio_source = get_icy_radio_stream(mass, streamdetails.path, streamdetails)
+    elif stream_type == StreamType.HLS:
+        substream = await get_hls_substream(mass, streamdetails.path)
+        audio_source = substream.path
+        if streamdetails.media_type == MediaType.RADIO:
+            # Especially the BBC streams struggle when they're played directly
+            # with ffmpeg, where they just stop after some minutes,
+            # so we tell ffmpeg to loop around in this case.
+            extra_input_args += ["-stream_loop", "-1", "-re"]
+    else:
+        audio_source = streamdetails.path
+
+    # handle seek support
+    if (
+        streamdetails.seek_position
+        and streamdetails.duration
+        and streamdetails.allow_seek
+        # allow seeking for custom streams,
+        # but only for custom streams that can't seek theirselves
+        and (stream_type != StreamType.CUSTOM or not streamdetails.can_seek)
+    ):
+        extra_input_args += ["-ss", str(int(streamdetails.seek_position))]
 
     bytes_sent = 0
     chunk_number = 0
@@ -591,14 +674,14 @@ async def get_media_stream(
                 req_buffer_size = int(pcm_format.pcm_sample_size * 5)
             elif chunk_number > 240 and strip_silence_end:
                 req_buffer_size = int(pcm_format.pcm_sample_size * 10)
-            elif chunk_number > 60 and strip_silence_end:
+            elif chunk_number > 120 and strip_silence_end:
                 req_buffer_size = int(pcm_format.pcm_sample_size * 8)
-            elif chunk_number > 30:
+            elif chunk_number > 60:
+                req_buffer_size = int(pcm_format.pcm_sample_size * 6)
+            elif chunk_number > 20 and strip_silence_end:
                 req_buffer_size = int(pcm_format.pcm_sample_size * 4)
-            elif chunk_number > 10 and strip_silence_end:
-                req_buffer_size = int(pcm_format.pcm_sample_size * 2)
             else:
-                req_buffer_size = pcm_format.pcm_sample_size
+                req_buffer_size = pcm_format.pcm_sample_size * 2
 
             # always append to buffer
             buffer += chunk
@@ -608,8 +691,9 @@ async def get_media_stream(
                 # buffer is not full enough, move on
                 continue
 
-            if chunk_number == 5 and strip_silence_begin:
+            if strip_silence_begin:
                 # strip silence from begin of audio
+                strip_silence_begin = False
                 chunk = await strip_silence(  # noqa: PLW2901
                     mass, buffer, pcm_format=pcm_format
                 )
@@ -722,8 +806,8 @@ async def get_media_stream(
         ):
             mass.create_task(music_prov.on_streamed(streamdetails))
 
-        # schedule removal of cache file
-        if streamdetails.stream_type == StreamType.CACHE_FILE:
+        # release cache file
+        if streamdetails.stream_type == StreamType.CACHE:
             cache = cast(StreamCache, streamdetails.cache)
             cache.release()
 
@@ -1278,7 +1362,10 @@ async def analyze_loudness(
         "-t",
         "600",
     ]
-    if streamdetails.stream_type == StreamType.CUSTOM:
+    if streamdetails.stream_type == StreamType.CACHE:
+        cache = cast(StreamCache, streamdetails.cache)
+        audio_source = await cache.acquire()
+    elif streamdetails.stream_type == StreamType.CUSTOM:
         audio_source = mass.get_provider(streamdetails.provider).get_audio_stream(
             streamdetails,
         )
@@ -1329,6 +1416,10 @@ async def analyze_loudness(
                 streamdetails.uri,
                 loudness,
             )
+    # release cache file
+    if streamdetails.stream_type == StreamType.CACHE:
+        cache = cast(StreamCache, streamdetails.cache)
+        cache.release()
 
 
 def _get_normalization_mode(
index 3f4571077b408a19c4b27b11287dd0af509e388a..717000d0280b94c13cba7b3071316bfc2e8f8e40 100644 (file)
@@ -79,6 +79,8 @@ class FFMpeg(AsyncProcess):
                 clean_args.append("<URL>")
             elif "/" in arg and "." in arg:
                 clean_args.append("<FILE>")
+            elif arg.startswith("data:application/"):
+                clean_args.append("<DATA>")
             else:
                 clean_args.append(arg)
         args_str = " ".join(clean_args)
@@ -104,9 +106,9 @@ class FFMpeg(AsyncProcess):
             if self.collect_log_history:
                 self.log_history.append(line)
             if "error" in line or "warning" in line:
-                self.logger.debug(line)
-            elif "critical" in line:
                 self.logger.warning(line)
+            elif "critical" in line:
+                self.logger.error(line)
             else:
                 self.logger.log(VERBOSE_LOG_LEVEL, line)
 
@@ -142,15 +144,6 @@ class FFMpeg(AsyncProcess):
         try:
             start = time.time()
             self.logger.debug("Start reading audio data from source...")
-            # use TimedAsyncGenerator to catch we're stuck waiting on data forever
-            # don't set this timeout too low because in some cases it can indeed take a while
-            # for data to arrive (e.g. when there is X amount of seconds in the buffer)
-            # so this timeout is just to catch if the source is stuck and rpeort it and not
-            # to recover from it.
-            # async for chunk in TimedAsyncGenerator(self.audio_input, timeout=300):
-            #     if self.closed:
-            #         return
-            #     await self.write(chunk)
             async for chunk in self.audio_input:
                 if self.closed:
                     return
@@ -182,6 +175,8 @@ async def get_ffmpeg_stream(
     extra_args: list[str] | None = None,
     chunk_size: int | None = None,
     extra_input_args: list[str] | None = None,
+    collect_log_history: bool = False,
+    loglevel: str = "info",
 ) -> AsyncGenerator[bytes, None]:
     """
     Get the ffmpeg audio stream as async generator.
@@ -196,6 +191,8 @@ async def get_ffmpeg_stream(
         filter_params=filter_params,
         extra_args=extra_args,
         extra_input_args=extra_input_args,
+        collect_log_history=collect_log_history,
+        loglevel=loglevel,
     ) as ffmpeg_proc:
         # read final chunks from stdout
         iterator = ffmpeg_proc.iter_chunked(chunk_size) if chunk_size else ffmpeg_proc.iter_any()
@@ -203,7 +200,7 @@ async def get_ffmpeg_stream(
             yield chunk
 
 
-def get_ffmpeg_args(
+def get_ffmpeg_args(  # noqa: PLR0915
     input_format: AudioFormat,
     output_format: AudioFormat,
     filter_params: list[str],
@@ -226,8 +223,6 @@ def get_ffmpeg_args(
         "-ignore_unknown",
         "-protocol_whitelist",
         "file,hls,http,https,tcp,tls,crypto,pipe,data,fd,rtp,udp,concat",
-        "-probesize",
-        "8192",
     ]
     # collect input args
     input_args = []
@@ -269,6 +264,8 @@ def get_ffmpeg_args(
             "-i",
             input_path,
         ]
+    elif input_format.codec_type != ContentType.UNKNOWN:
+        input_args += ["-acodec", input_format.codec_type.name.lower(), "-i", input_path]
     else:
         # let ffmpeg auto detect the content type from the metadata/headers
         input_args += ["-i", input_path]
@@ -284,6 +281,38 @@ def get_ffmpeg_args(
         # devnull stream
         output_path = "-"
         output_args = ["-f", "null"]
+    elif output_format.content_type.is_pcm():
+        # use explicit format identifier for pcm formats
+        output_args += [
+            "-ar",
+            str(output_format.sample_rate),
+            "-acodec",
+            output_format.content_type.name.lower(),
+            "-f",
+            output_format.content_type.value,
+        ]
+    elif input_format == output_format:
+        # passthrough
+        if output_format.content_type in (
+            ContentType.MP4,
+            ContentType.MP4A,
+            ContentType.M4A,
+            ContentType.M4B,
+        ):
+            fmt = "adts"
+        elif output_format.codec_type != ContentType.UNKNOWN:
+            fmt = output_format.codec_type.name.lower()
+        else:
+            fmt = output_format.content_type.name.lower()
+        output_args = [
+            "-vn",
+            "-dn",
+            "-sn",
+            "-acodec",
+            "copy",
+            "-f",
+            fmt,
+        ]
     elif output_format.content_type == ContentType.AAC:
         output_args = ["-f", "adts", "-c:a", "aac", "-b:a", "256k"]
     elif output_format.content_type == ContentType.MP3:
@@ -311,19 +340,7 @@ def get_ffmpeg_args(
             "-compression_level",
             "0",
         ]
-    elif output_format.content_type.is_pcm():
-        # use explicit format identifier for pcm formats
-        output_args += [
-            "-ar",
-            str(output_format.sample_rate),
-            "-acodec",
-            output_format.content_type.name.lower(),
-            "-f",
-            output_format.content_type.value,
-        ]
-    elif input_format == output_format:
-        # passthrough
-        output_args = ["-c", "copy"]
+
     else:
         raise RuntimeError("Invalid/unsupported output format specified")
 
index a0fe11f5bb4d5b9fd4b05c5dc6f6c9a0cda24efc..534c7a27772cd6ec75083efd1e753a7c55cf8ff2 100644 (file)
@@ -8,6 +8,7 @@ import importlib
 import logging
 import os
 import re
+import shutil
 import socket
 import urllib.error
 import urllib.parse
@@ -464,6 +465,15 @@ async def has_tmpfs_mount() -> bool:
     return False
 
 
+async def get_tmp_free_space() -> int:
+    """Return free space on tmp."""
+    try:
+        if res := await asyncio.to_thread(shutil.disk_usage, "/tmp"):  # noqa: S108
+            return res.free
+    except (FileNotFoundError, OSError, PermissionError):
+        return 0
+
+
 def divide_chunks(data: bytes, chunk_size: int) -> Iterator[bytes]:
     """Chunk bytes data into smaller chunks."""
     for i in range(0, len(data), chunk_size):
index f51d9188b256222e17e73a25e76024b9e7f357d7..120ecabb0fd07686468af667bbdeb34b83d9260e 100644 (file)
@@ -446,6 +446,16 @@ class MusicAssistant:
         msg = "Task does not exist"
         raise KeyError(msg)
 
+    def cancel_task(self, task_id: str) -> None:
+        """Cancel existing scheduled task."""
+        if existing := self._tracked_tasks.pop(task_id, None):
+            existing.cancel()
+
+    def cancel_timer(self, task_id: str) -> None:
+        """Cancel existing scheduled timer."""
+        if existing := self._tracked_timers.pop(task_id, None):
+            existing.cancel()
+
     def register_api_command(
         self,
         command: str,
index df388b22470005ae9435490604926515560a2b28..3785e8023df8c00e9c7df50a56541dd3a2affbf5 100644 (file)
@@ -364,13 +364,13 @@ class AppleMusicProvider(MusicProvider):
         return StreamDetails(
             item_id=item_id,
             provider=self.lookup_key,
-            audio_format=AudioFormat(content_type=ContentType.M4A, codec_type=ContentType.AAC),
+            audio_format=AudioFormat(content_type=ContentType.MP4, codec_type=ContentType.AAC),
             stream_type=StreamType.ENCRYPTED_HTTP,
             decryption_key=await self._get_decryption_key(license_url, key_id, uri, item_id),
             path=stream_url,
             can_seek=True,
             allow_seek=True,
-            # enforce caching because the apple streams are m4a files with moov atom at the end
+            # enforce caching because the apple streams are mp4 files with moov atom at the end
             enable_cache=True,
         )
 
index e8588120a2afe31e3164d77a96b6e8f13429391c..3df36273f7ac40949b65b9e92612f507c3f72fa0 100644 (file)
@@ -610,7 +610,7 @@ class SpotifyProvider(MusicProvider):
                 # get first chunk with timeout, to catch the issue where librespot is not starting
                 # which seems to happen from time to time (but rarely)
                 try:
-                    chunk = await asyncio.wait_for(librespot_proc.read(64000), timeout=3)
+                    chunk = await asyncio.wait_for(librespot_proc.read(64000), timeout=5)
                     yield chunk
                 except TimeoutError:
                     raise AudioError("No audio received from librespot within timeout")
index 3f965525a53e993f433b2b460e24ed18027af7c3..ebbd1330a04dc859e7702a3586b220165be38729 100644 (file)
@@ -24,7 +24,7 @@ dependencies = [
   "ifaddr==0.2.0",
   "mashumaro==3.15",
   "music-assistant-frontend==2.12.2",
-  "music-assistant-models==1.1.32",
+  "music-assistant-models==1.1.33",
   "mutagen==1.47.0",
   "orjson==3.10.12",
   "pillow==11.1.0",
index 627d867de5c161de7e036cf820cee6749648eafd..e5d30c63e14f6eb3bda47c2a88d3788de79f76af 100644 (file)
@@ -26,7 +26,7 @@ ibroadcastaio==0.4.0
 ifaddr==0.2.0
 mashumaro==3.15
 music-assistant-frontend==2.12.2
-music-assistant-models==1.1.32
+music-assistant-models==1.1.33
 mutagen==1.47.0
 orjson==3.10.12
 pillow==11.1.0