Add extra buffering to queue stream to create backpressure (#2544)
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Sat, 25 Oct 2025 02:54:24 +0000 (04:54 +0200)
committerGitHub <noreply@github.com>
Sat, 25 Oct 2025 02:54:24 +0000 (04:54 +0200)
music_assistant/controllers/streams.py
music_assistant/helpers/audio.py
music_assistant/helpers/audio_buffer.py [new file with mode: 0644]
music_assistant/helpers/buffered_generator.py [new file with mode: 0644]
music_assistant/helpers/util.py
music_assistant/providers/sonos/player.py
music_assistant/providers/sonos/provider.py

index d99c622770b227e1fda82db0b711f64ba0e150ab..d4cbaebe001d3757e51d3ddb877370fc7cd101a5 100644 (file)
@@ -9,12 +9,13 @@ the upnp callbacks and json rpc api for slimproto clients.
 from __future__ import annotations
 
 import asyncio
+import gc
 import logging
 import os
 import urllib.parse
 from collections.abc import AsyncGenerator
 from dataclasses import dataclass
-from typing import TYPE_CHECKING
+from typing import TYPE_CHECKING, Final
 
 from aiofiles.os import wrap
 from aiohttp import web
@@ -58,11 +59,13 @@ from music_assistant.helpers.audio import LOGGER as AUDIO_LOGGER
 from music_assistant.helpers.audio import (
     get_chunksize,
     get_media_stream,
+    get_media_stream_with_buffer,
     get_player_filter_params,
     get_silence,
     get_stream_details,
     resample_pcm_audio,
 )
+from music_assistant.helpers.buffered_generator import use_buffer
 from music_assistant.helpers.ffmpeg import LOGGER as FFMPEG_LOGGER
 from music_assistant.helpers.ffmpeg import check_ffmpeg_version, get_ffmpeg_stream
 from music_assistant.helpers.smart_fades import (
@@ -73,6 +76,7 @@ from music_assistant.helpers.smart_fades import (
 from music_assistant.helpers.util import get_ip_addresses, select_free_port
 from music_assistant.helpers.webserver import Webserver
 from music_assistant.models.core_controller import CoreController
+from music_assistant.models.music_provider import MusicProvider
 from music_assistant.models.plugin import PluginProvider
 
 if TYPE_CHECKING:
@@ -87,6 +91,8 @@ if TYPE_CHECKING:
 
 isfile = wrap(os.path.isfile)
 
+CONF_ALLOW_BUFFER: Final[str] = "allow_buffering"
+
 
 def parse_pcm_info(content_type: str) -> tuple[int, int, int]:
     """Parse PCM info from a codec/content_type string."""
@@ -170,6 +176,23 @@ class StreamsController(CoreController):
                 "Make sure that this server can be reached "
                 "on the given IP and TCP port by players on the local network.",
             ),
+            ConfigEntry(
+                key=CONF_ALLOW_BUFFER,
+                type=ConfigEntryType.BOOLEAN,
+                default_value=False,
+                label="Allow (in-memory) buffering of (track) audio",
+                description="By default, Music Assistant tries to be as resource "
+                "efficient as possible when streaming audio, especially considering "
+                "low-end devices such as Raspberry Pi's. This means that audio "
+                "buffering is disabled by default to reduce memory usage. \n\n"
+                "Enabling this option allows for in-memory buffering of audio, "
+                "which (massively) improves playback (and seeking) performance but it comes "
+                "at the cost of increased memory usage. "
+                "If you run Music Assistant on a capable device with enough memory, "
+                "enabling this option is strongly recommended.",
+                required=False,
+                category="audio",
+            ),
             ConfigEntry(
                 key=CONF_VOLUME_NORMALIZATION_RADIO,
                 type=ConfigEntryType.STRING,
@@ -437,6 +460,7 @@ class StreamsController(CoreController):
             audio_input = self.get_queue_item_stream(
                 queue_item=queue_item,
                 pcm_format=pcm_format,
+                seek_position=queue_item.streamdetails.seek_position,
             )
         # stream the audio
         # this final ffmpeg process in the chain will convert the raw, lossless PCM audio into
@@ -452,7 +476,11 @@ class StreamsController(CoreController):
                 input_format=pcm_format,
                 output_format=output_format,
             ),
-            chunk_size=get_chunksize(output_format),
+            # we need to slowly feed the music to avoid the player stopping and later
+            # restarting (or completely failing) the audio stream by keeping the buffer short.
+            # this is reported to be an issue especially with Chromecast players.
+            # see for example: https://github.com/music-assistant/support/issues/3717
+            extra_input_args=["-readrate", "1.0", "-readrate_initial_burst", "2"],
         ):
             try:
                 await resp.write(chunk)
@@ -558,6 +586,11 @@ class StreamsController(CoreController):
             filter_params=get_player_filter_params(
                 self.mass, queue_player.player_id, flow_pcm_format, output_format
             ),
+            # we need to slowly feed the music to avoid the player stopping and later
+            # restarting (or completely failing) the audio stream by keeping the buffer short.
+            # this is reported to be an issue especially with Chromecast players.
+            # see for example: https://github.com/music-assistant/support/issues/3717
+            extra_input_args=["-readrate", "1.0", "-readrate_initial_burst", "5"],
             chunk_size=icy_meta_interval if enable_icy else get_chunksize(output_format),
         ):
             try:
@@ -718,7 +751,7 @@ class StreamsController(CoreController):
         )
         http_profile = str(http_profile_value) if http_profile_value is not None else "default"
         if http_profile == "forced_content_length":
-            # guess content length based on duration
+            # just set an insanely high content length to make sure the player keeps playing
             resp.content_length = get_chunksize(output_format, 12 * 3600)
         elif http_profile == "chunked":
             resp.enable_chunked_encoding()
@@ -761,6 +794,7 @@ class StreamsController(CoreController):
         # like https hosts and it also offers the pre-announce 'bell'
         return f"{self.base_url}/announcement/{player_id}.{content_type.value}"
 
+    @use_buffer(30, 4)
     async def get_queue_flow_stream(
         self,
         queue: PlayerQueue,
@@ -841,6 +875,7 @@ class StreamsController(CoreController):
             async for chunk in self.get_queue_item_stream(
                 queue_track,
                 pcm_format=pcm_format,
+                seek_position=queue_track.streamdetails.seek_position,
             ):
                 # buffer size needs to be big enough to include the crossfade part
                 req_buffer_size = (
@@ -1037,6 +1072,7 @@ class StreamsController(CoreController):
         self,
         queue_item: QueueItem,
         pcm_format: AudioFormat,
+        seek_position: int = 0,
     ) -> AsyncGenerator[bytes, None]:
         """Get the (PCM) audio stream for a single queue item."""
         # collect all arguments for ffmpeg
@@ -1074,23 +1110,97 @@ class StreamsController(CoreController):
             filter_params.append(f"volume={gain_correct}dB")
         streamdetails.volume_normalization_gain_correct = gain_correct
 
-        first_chunk_received = False
-        async for chunk in get_media_stream(
-            self.mass,
-            streamdetails=streamdetails,
-            pcm_format=pcm_format,
-            filter_params=filter_params,
-        ):
-            if not first_chunk_received:
-                first_chunk_received = True
-                # inform the queue that the track is now loaded in the buffer
-                # so for example the next track can be enqueued
-                self.mass.player_queues.track_loaded_in_buffer(
-                    queue_item.queue_id, queue_item.queue_item_id
-                )
-            yield chunk
-            del chunk
+        allow_buffer = bool(
+            self.mass.config.get_raw_core_config_value(self.domain, CONF_ALLOW_BUFFER, False)
+            and streamdetails.duration
+        )
+
+        self.logger.debug(
+            "Starting queue item stream for %s (%s)"
+            " - using buffer: %s"
+            " - using fade-in: %s"
+            " - volume normalization: %s",
+            queue_item.name,
+            queue_item.uri,
+            allow_buffer,
+            streamdetails.fade_in,
+            streamdetails.volume_normalization_mode,
+        )
+        if allow_buffer:
+            media_stream_gen = get_media_stream_with_buffer(
+                self.mass,
+                streamdetails=streamdetails,
+                pcm_format=pcm_format,
+                seek_position=int(seek_position),
+                filter_params=filter_params,
+            )
+        else:
+            media_stream_gen = get_media_stream(
+                self.mass,
+                streamdetails=streamdetails,
+                pcm_format=pcm_format,
+                seek_position=int(seek_position),
+                filter_params=filter_params,
+            )
 
+        first_chunk_received = False
+        fade_in_buffer = b""
+        bytes_received = 0
+        aborted = False
+        try:
+            async for chunk in media_stream_gen:
+                bytes_received += len(chunk)
+                if not first_chunk_received:
+                    first_chunk_received = True
+                    # inform the queue that the track is now loaded in the buffer
+                    # so for example the next track can be enqueued
+                    self.mass.player_queues.track_loaded_in_buffer(
+                        queue_item.queue_id, queue_item.queue_item_id
+                    )
+                # handle optional fade-in
+                if streamdetails.fade_in:
+                    if len(fade_in_buffer) < pcm_format.pcm_sample_size * 4:
+                        fade_in_buffer += chunk
+                    elif fade_in_buffer:
+                        async for fade_chunk in get_ffmpeg_stream(
+                            audio_input=fade_in_buffer + chunk,
+                            input_format=pcm_format,
+                            output_format=pcm_format,
+                            filter_params=["afade=type=in:start_time=0:duration=3"],
+                        ):
+                            yield fade_chunk
+                    fade_in_buffer = b""
+                    streamdetails.fade_in = False
+                else:
+                    yield chunk
+                # help garbage collection by explicitly deleting chunk
+                del chunk
+        except (Exception, GeneratorExit):
+            aborted = True
+            raise
+        finally:
+            # determine how many seconds we've streamed
+            # for pcm output we can calculate this easily
+            seconds_streamed = bytes_received / pcm_format.pcm_sample_size
+            streamdetails.seconds_streamed = seconds_streamed
+            self.logger.debug(
+                "stream %s for %s - seconds streamed: %s",
+                "aborted" if aborted else "finished",
+                streamdetails.uri,
+                seconds_streamed,
+            )
+            # report stream to provider
+            if (not aborted and seconds_streamed >= 30) and (
+                music_prov := self.mass.get_provider(streamdetails.provider)
+            ):
+                if TYPE_CHECKING:  # avoid circular import
+                    assert isinstance(music_prov, MusicProvider)
+                self.mass.create_task(music_prov.on_streamed(streamdetails))
+            # Run garbage collection in executor to reclaim memory from large buffers
+            loop = asyncio.get_running_loop()
+            await loop.run_in_executor(None, gc.collect)
+
+    @use_buffer(30, 4)
     async def get_queue_item_stream_with_smartfade(
         self,
         queue_item: QueueItem,
@@ -1128,7 +1238,17 @@ class StreamsController(CoreController):
             crossfade_size = int(pcm_format.pcm_sample_size * standard_crossfade_duration + 4)
         fade_out_data: bytes | None = None
 
-        async for chunk in self.get_queue_item_stream(queue_item, pcm_format):
+        if crossfade_data:
+            discard_seconds = int(crossfade_data.fade_in_size / pcm_format.pcm_sample_size) - 1
+            discard_bytes = int(discard_seconds * pcm_format.pcm_sample_size)
+            discard_leftover = int(crossfade_data.fade_in_size - discard_bytes)
+        else:
+            discard_seconds = streamdetails.seek_position
+            discard_leftover = 0
+
+        async for chunk in self.get_queue_item_stream(
+            queue_item, pcm_format, seek_position=discard_seconds
+        ):
             # ALWAYS APPEND CHUNK TO BUFFER
             buffer += chunk
             del chunk
@@ -1138,8 +1258,8 @@ class StreamsController(CoreController):
 
             ####  HANDLE CROSSFADE DATA FROM PREVIOUS TRACK
             if crossfade_data:
-                # discard the fade_in_part from the crossfade data
-                buffer = buffer[crossfade_data.fade_in_size :]
+                # discard the fade_in_part from the crossfade data (minus what we already seeked)
+                buffer = buffer[discard_leftover:]
                 # send the (second half of the) crossfade data
                 if crossfade_data.pcm_format != pcm_format:
                     # pcm format mismatch, we need to resample the crossfade data
index 9c1446c7bf6ef8fd9962c56d0f341d3734d023dc..dd5ce2ae079973121830d3240c71970e0bed8add 100644 (file)
@@ -49,6 +49,7 @@ from music_assistant.helpers.json import JSON_DECODE_EXCEPTIONS, json_loads
 from music_assistant.helpers.throttle_retry import BYPASS_THROTTLER
 from music_assistant.helpers.util import clean_stream_title, remove_file
 
+from .audio_buffer import AudioBuffer
 from .datetime import utc
 from .dsp import filter_to_ffmpeg_params
 from .ffmpeg import FFMpeg, get_ffmpeg_stream
@@ -333,9 +334,9 @@ async def get_stream_details(
         raise MediaNotFoundError(
             f"Unable to retrieve streamdetails for {queue_item.name} ({queue_item.uri})"
         )
-    if (
-        queue_item.streamdetails
-        and (utc() - queue_item.streamdetails.created_at).seconds < STREAMDETAILS_EXPIRATION
+    if queue_item.streamdetails and (
+        (utc() - queue_item.streamdetails.created_at).seconds < STREAMDETAILS_EXPIRATION
+        or queue_item.streamdetails.buffer
     ):
         # already got a fresh/unused (or cached) streamdetails
         # we assume that the streamdetails are valid for max STREAMDETAILS_EXPIRATION seconds
@@ -383,6 +384,14 @@ async def get_stream_details(
             resolved_url, stream_type = await resolve_radio_stream(mass, streamdetails.path)
             streamdetails.path = resolved_url
             streamdetails.stream_type = stream_type
+        # handle volume normalization details
+        if result := await mass.music.get_loudness(
+            streamdetails.item_id,
+            streamdetails.provider,
+            media_type=queue_item.media_type,
+        ):
+            streamdetails.loudness = result[0]
+            streamdetails.loudness_album = result[1]
 
     # set queue_id on the streamdetails so we know what is being streamed
     streamdetails.queue_id = queue_item.queue_id
@@ -391,15 +400,6 @@ async def get_stream_details(
     streamdetails.fade_in = fade_in
     if not streamdetails.duration:
         streamdetails.duration = queue_item.duration
-
-    # handle volume normalization details
-    if result := await mass.music.get_loudness(
-        streamdetails.item_id,
-        streamdetails.provider,
-        media_type=queue_item.media_type,
-    ):
-        streamdetails.loudness = result[0]
-        streamdetails.loudness_album = result[1]
     streamdetails.prefer_album_loudness = prefer_album_loudness
     player_settings = await mass.config.get_player_config(streamdetails.queue_id)
     core_config = await mass.config.get_core_config("streams")
@@ -421,22 +421,139 @@ async def get_stream_details(
     return streamdetails
 
 
+async def get_media_stream_with_buffer(
+    mass: MusicAssistant,
+    streamdetails: StreamDetails,
+    pcm_format: AudioFormat,
+    seek_position: int = 0,
+    filter_params: list[str] | None = None,
+) -> AsyncGenerator[bytes, None]:
+    """Get audio stream for given media details as raw PCM with buffering."""
+    LOGGER.debug(
+        "get_media_stream_with_buffer: Starting for %s (seek: %s)",
+        streamdetails.uri,
+        seek_position,
+    )
+
+    # checksum based on pcm_format and filter_params
+    checksum = f"{pcm_format}-{filter_params}"
+
+    async def fill_buffer_task() -> None:
+        """Background task to fill the audio buffer."""
+        chunk_count = 0
+        try:
+            async for chunk in get_media_stream(
+                mass, streamdetails, pcm_format, seek_position=0, filter_params=filter_params
+            ):
+                chunk_count += 1
+                await audio_buffer.put(chunk)
+        except asyncio.CancelledError:
+            LOGGER.log(
+                VERBOSE_LOG_LEVEL,
+                "fill_buffer_task: Cancelled after %s chunks for %s",
+                chunk_count,
+                streamdetails.uri,
+            )
+            raise
+        except Exception as err:
+            LOGGER.error(
+                "fill audio buffer task: Error after %s chunks for %s: %s",
+                chunk_count,
+                streamdetails.uri,
+                err,
+            )
+        finally:
+            await audio_buffer.set_eof()
+            LOGGER.log(
+                VERBOSE_LOG_LEVEL,
+                "fill_buffer_task: Completed (%s chunks) for %s",
+                chunk_count,
+                streamdetails.uri,
+            )
+
+    # check for existing buffer and reuse if possible
+    existing_buffer: AudioBuffer | None = streamdetails.buffer
+    if existing_buffer is not None:
+        if not existing_buffer.is_valid(checksum, seek_position):
+            LOGGER.log(
+                VERBOSE_LOG_LEVEL,
+                "get_media_stream_with_buffer: Existing buffer invalid for %s "
+                "(seek: %s, discarded: %s)",
+                streamdetails.uri,
+                seek_position,
+                existing_buffer._discarded_chunks,
+            )
+            await existing_buffer.clear()
+            streamdetails.buffer = None
+            existing_buffer = None
+        else:
+            LOGGER.debug(
+                "get_media_stream_with_buffer: Reusing existing buffer for %s - "
+                "available: %ss, seek: %s, discarded: %s",
+                streamdetails.uri,
+                existing_buffer.seconds_available,
+                seek_position,
+                existing_buffer._discarded_chunks,
+            )
+            audio_buffer = existing_buffer
+
+    if not existing_buffer and seek_position > 60:
+        # If seeking into the track and no valid buffer exists,
+        # just start a normal stream without buffering,
+        # otherwise we would need to fill the buffer up to the seek position first
+        # which is not efficient.
+        LOGGER.debug(
+            "get_media_stream_with_buffer: No existing buffer and seek >60s for %s, "
+            "starting normal stream",
+            streamdetails.uri,
+        )
+        async for chunk in get_media_stream(
+            mass,
+            streamdetails,
+            pcm_format,
+            seek_position=seek_position,
+            filter_params=filter_params,
+        ):
+            yield chunk
+        return
+
+    if not existing_buffer:
+        # create new audio buffer and start fill task
+        LOGGER.debug(
+            "get_media_stream_with_buffer: Creating new buffer for %s",
+            streamdetails.uri,
+        )
+        audio_buffer = AudioBuffer(pcm_format, checksum)
+        streamdetails.buffer = audio_buffer
+        task = mass.loop.create_task(fill_buffer_task())
+        audio_buffer.attach_fill_task(task)
+
+    # yield data from the buffer
+    chunk_count = 0
+    try:
+        async for chunk in audio_buffer.iter(seek_position=seek_position):
+            chunk_count += 1
+            yield chunk
+    finally:
+        LOGGER.log(
+            VERBOSE_LOG_LEVEL,
+            "get_media_stream_with_buffer: Completed, yielded %s chunks",
+            chunk_count,
+        )
+
+
 async def get_media_stream(
     mass: MusicAssistant,
     streamdetails: StreamDetails,
     pcm_format: AudioFormat,
+    seek_position: int = 0,
     filter_params: list[str] | None = None,
 ) -> AsyncGenerator[bytes, None]:
     """Get audio stream for given media details as raw PCM."""
     logger = LOGGER.getChild("media_stream")
     logger.log(VERBOSE_LOG_LEVEL, "Starting media stream for %s", streamdetails.uri)
     extra_input_args = streamdetails.extra_input_args or []
-    if filter_params is None:
-        filter_params = []
-    if streamdetails.fade_in:
-        filter_params.append("afade=type=in:start_time=0:duration=3")
 
-    seek_position = streamdetails.seek_position
     # work out audio source for these streamdetails
     audio_source: str | AsyncGenerator[bytes, None]
     stream_type = streamdetails.stream_type
@@ -498,14 +615,9 @@ async def get_media_stream(
         await ffmpeg_proc.start()
         assert ffmpeg_proc.proc is not None  # for type checking
         logger.debug(
-            "Started media stream for %s"
-            " - using streamtype: %s"
-            " - volume normalization: %s"
-            " - output format: %s"
-            " - ffmpeg PID: %s",
+            "Started media stream for %s - using streamtype: %s - pcm format: %s - ffmpeg PID: %s",
             streamdetails.uri,
             streamdetails.stream_type,
-            streamdetails.volume_normalization_mode,
             pcm_format.content_type.value,
             ffmpeg_proc.proc.pid,
         )
@@ -519,8 +631,9 @@ async def get_media_stream(
                 first_chunk_received = True
                 streamdetails.audio_format.codec_type = ffmpeg_proc.input_format.codec_type
                 logger.debug(
-                    "First chunk received after %s seconds",
+                    "First chunk received after %s seconds (codec detected: %s)",
                     mass.loop.time() - stream_start,
+                    ffmpeg_proc.input_format.codec_type,
                 )
             yield chunk
             bytes_sent += len(chunk)
@@ -547,27 +660,26 @@ async def get_media_stream(
     finally:
         # always ensure close is called which also handles all cleanup
         await ffmpeg_proc.close()
-        # determine how many seconds we've streamed
+        # determine how many seconds we've received
         # for pcm output we can calculate this easily
-        seconds_streamed = bytes_sent / pcm_format.pcm_sample_size if bytes_sent else 0
-        streamdetails.seconds_streamed = seconds_streamed
+        seconds_received = bytes_sent / pcm_format.pcm_sample_size if bytes_sent else 0
         # store accurate duration
-        if finished and not streamdetails.seek_position and seconds_streamed:
-            streamdetails.duration = int(seconds_streamed)
+        if finished and not seek_position and seconds_received:
+            streamdetails.duration = int(seconds_received)
 
-        logger.debug(
-            "stream %s (with code %s) for %s - seconds streamed: %s",
+        logger.log(
+            VERBOSE_LOG_LEVEL,
+            "stream %s (with code %s) for %s",
             "cancelled" if cancelled else "finished" if finished else "aborted",
             ffmpeg_proc.returncode,
             streamdetails.uri,
-            seconds_streamed,
         )
 
         # parse loudnorm data if we have that collected (and enabled)
         if (
             (streamdetails.loudness is None or finished)
             and streamdetails.volume_normalization_mode == VolumeNormalizationMode.DYNAMIC
-            and (finished or (seconds_streamed >= 300))
+            and (finished or (seconds_received >= 300))
         ):
             # if dynamic volume normalization is enabled and the entire track is streamed
             # the loudnorm filter will output the measurement in the log,
@@ -579,7 +691,6 @@ async def get_media_stream(
                     streamdetails.uri,
                     loudness_details,
                 )
-                streamdetails.loudness = loudness_details
                 mass.create_task(
                     mass.music.set_loudness(
                         streamdetails.item_id,
@@ -596,21 +707,13 @@ async def get_media_stream(
                 VolumeNormalizationMode.DISABLED,
                 VolumeNormalizationMode.FIXED_GAIN,
             )
-            and (finished or (seconds_streamed >= 300))
+            and (finished or (seconds_received >= 300))
         ):
             # dynamic mode not allowed and no measurement known, we need to analyze the audio
             # add background task to start analyzing the audio
             task_id = f"analyze_loudness_{streamdetails.uri}"
             mass.call_later(5, analyze_loudness, mass, streamdetails, task_id=task_id)
 
-        # report stream to provider
-        if (finished or seconds_streamed >= 30) and (
-            music_prov := mass.get_provider(streamdetails.provider)
-        ):
-            if TYPE_CHECKING:  # avoid circular import
-                assert isinstance(music_prov, MusicProvider)
-            mass.create_task(music_prov.on_streamed(streamdetails))
-
 
 def create_wave_header(
     samplerate: int = 44100, channels: int = 2, bitspersample: int = 16, duration: int | None = None
@@ -1288,14 +1391,12 @@ async def analyze_loudness(
     streamdetails: StreamDetails,
 ) -> None:
     """Analyze media item's audio, to calculate EBU R128 loudness."""
-    if result := await mass.music.get_loudness(
+    if await mass.music.get_loudness(
         streamdetails.item_id,
         streamdetails.provider,
         media_type=streamdetails.media_type,
     ):
         # only when needed we do the analyze job
-        streamdetails.loudness = result[0]
-        streamdetails.loudness_album = result[1]
         return
 
     logger = LOGGER.getChild("analyze_loudness")
@@ -1360,7 +1461,6 @@ async def analyze_loudness(
                 log_lines_str or "received empty value",
             )
         else:
-            streamdetails.loudness = loudness
             await mass.music.set_loudness(
                 streamdetails.item_id,
                 streamdetails.provider,
diff --git a/music_assistant/helpers/audio_buffer.py b/music_assistant/helpers/audio_buffer.py
new file mode 100644 (file)
index 0000000..24ff7d0
--- /dev/null
@@ -0,0 +1,314 @@
+"""Audio buffer implementation for PCM audio streaming."""
+
+from __future__ import annotations
+
+import asyncio
+import gc
+import logging
+import time
+from collections import deque
+from collections.abc import AsyncGenerator
+from contextlib import suppress
+from typing import TYPE_CHECKING, Any
+
+from music_assistant_models.errors import AudioError
+
+from music_assistant.constants import MASS_LOGGER_NAME, VERBOSE_LOG_LEVEL
+
+if TYPE_CHECKING:
+    from music_assistant_models.media_items import AudioFormat
+
+LOGGER = logging.getLogger(f"{MASS_LOGGER_NAME}.audio_buffer")
+
+DEFAULT_MAX_BUFFER_SIZE_SECONDS: int = 60 * 5  # 5 minutes
+
+
+class AudioBuffer:
+    """Simple buffer to hold (PCM) audio chunks with seek capability.
+
+    Each chunk represents exactly 1 second of audio.
+    Chunks are stored in a deque for efficient O(1) append and popleft operations.
+    """
+
+    def __init__(
+        self,
+        pcm_format: AudioFormat,
+        checksum: str,
+        max_size_seconds: int = DEFAULT_MAX_BUFFER_SIZE_SECONDS,
+    ) -> None:
+        """
+        Initialize AudioBuffer.
+
+        Args:
+            pcm_format: The PCM audio format specification
+            checksum: The checksum for the audio data (for validation purposes)
+            max_size_seconds: Maximum buffer size in seconds
+        """
+        self.pcm_format = pcm_format
+        self.checksum = checksum
+        self.max_size_seconds = max_size_seconds
+        # Store chunks in a deque for O(1) append and popleft operations
+        self._chunks: deque[bytes] = deque()
+        # Track how many chunks have been discarded from the start
+        self._discarded_chunks = 0
+        self._lock = asyncio.Lock()
+        self._data_available = asyncio.Condition(self._lock)
+        self._space_available = asyncio.Condition(self._lock)
+        self._eof_received = False
+        self._buffer_fill_task: asyncio.Task[None] | None = None
+        self._last_access_time: float = time.time()
+        self._inactivity_task: asyncio.Task[None] | None = None
+        self._cancelled = False  # Set to True when buffer is cleared/cancelled
+
+    @property
+    def cancelled(self) -> bool:
+        """Return whether the buffer has been cancelled or cleared."""
+        if self._cancelled:
+            return True
+        return self._buffer_fill_task is not None and self._buffer_fill_task.cancelled()
+
+    @property
+    def chunk_size_bytes(self) -> int:
+        """Return the size in bytes of one second of PCM audio."""
+        return self.pcm_format.pcm_sample_size
+
+    @property
+    def size_seconds(self) -> int:
+        """Return current size of the buffer in seconds."""
+        return len(self._chunks)
+
+    @property
+    def seconds_available(self) -> int:
+        """Return number of seconds of audio currently available in the buffer."""
+        return len(self._chunks)
+
+    def is_valid(self, checksum: str, seek_position: int = 0) -> bool:
+        """
+        Validate the buffer's checksum and check if seek position is available.
+
+        Args:
+            checksum: The checksum to validate against
+            seek_position: The position we want to seek to (0-based)
+
+        Returns:
+            True if buffer is valid and seek position is available
+        """
+        if self.cancelled:
+            return False
+
+        if self.checksum != checksum:
+            return False
+
+        # Check if buffer is close to inactivity timeout (within 30 seconds)
+        # to prevent race condition where buffer gets cleared right after validation
+        time_since_access = time.time() - self._last_access_time
+        inactivity_timeout = 60 * 5  # 5 minutes
+        if time_since_access > (inactivity_timeout - 30):
+            # Buffer is close to being cleared, don't reuse it
+            return False
+
+        # Check if the seek position has already been discarded
+        return seek_position >= self._discarded_chunks
+
+    async def put(self, chunk: bytes) -> None:
+        """
+        Put a chunk of data into the buffer.
+
+        Each chunk represents exactly 1 second of PCM audio.
+        Waits if buffer is full.
+
+        Args:
+            chunk: Bytes representing 1 second of PCM audio
+        """
+        async with self._space_available:
+            # Wait until there's space in the buffer
+            while len(self._chunks) >= self.max_size_seconds and not self._eof_received:
+                if LOGGER.isEnabledFor(VERBOSE_LOG_LEVEL):
+                    LOGGER.log(
+                        VERBOSE_LOG_LEVEL,
+                        "AudioBuffer.put: Buffer full (%s/%s), waiting for space...",
+                        len(self._chunks),
+                        self.max_size_seconds,
+                    )
+                await self._space_available.wait()
+
+            if self._eof_received:
+                # Don't accept new data after EOF
+                LOGGER.log(
+                    VERBOSE_LOG_LEVEL, "AudioBuffer.put: EOF already received, rejecting chunk"
+                )
+                return
+
+            # Add chunk to the list (index = second position)
+            self._chunks.append(chunk)
+            if LOGGER.isEnabledFor(VERBOSE_LOG_LEVEL):
+                LOGGER.log(
+                    VERBOSE_LOG_LEVEL,
+                    "AudioBuffer.put: Added chunk at position %s (size: %s bytes, buffer size: %s)",
+                    self._discarded_chunks + len(self._chunks) - 1,
+                    len(chunk),
+                    len(self._chunks),
+                )
+
+            # Notify waiting consumers
+            self._data_available.notify_all()
+
+    async def get(self, chunk_number: int = 0) -> bytes:
+        """
+        Get one second of data from the buffer at the specified chunk number.
+
+        Waits until requested chunk is available.
+        Discards old chunks if buffer is full.
+
+        Args:
+            chunk_number: The chunk index to retrieve (0-based, absolute position).
+
+        Returns:
+            Bytes containing one second of audio data
+
+        Raises:
+            AudioError: If EOF is reached before chunk is available or
+                       if chunk has been discarded
+        """
+        # Update last access time
+        self._last_access_time = time.time()
+
+        async with self._data_available:
+            # Check if the chunk was already discarded
+            if chunk_number < self._discarded_chunks:
+                msg = (
+                    f"Chunk {chunk_number} has been discarded "
+                    f"(buffer starts at {self._discarded_chunks})"
+                )
+                raise AudioError(msg)
+
+            # Wait until the requested chunk is available or EOF
+            buffer_index = chunk_number - self._discarded_chunks
+            while buffer_index >= len(self._chunks):
+                if self._eof_received:
+                    raise AudioError("EOF")
+                await self._data_available.wait()
+                buffer_index = chunk_number - self._discarded_chunks
+
+            # If buffer is at max size, discard the oldest chunk to make room
+            if len(self._chunks) >= self.max_size_seconds:
+                discarded = self._chunks.popleft()  # O(1) operation with deque
+                self._discarded_chunks += 1
+                if LOGGER.isEnabledFor(VERBOSE_LOG_LEVEL):
+                    LOGGER.log(
+                        VERBOSE_LOG_LEVEL,
+                        "AudioBuffer.get: Discarded chunk %s (size: %s bytes) to free space",
+                        self._discarded_chunks - 1,
+                        len(discarded),
+                    )
+                # Notify producers waiting for space
+                self._space_available.notify_all()
+                # Recalculate buffer index after discard
+                buffer_index = chunk_number - self._discarded_chunks
+
+            # Return the chunk at the requested index
+            return self._chunks[buffer_index]
+
+    async def iter(self, seek_position: int = 0) -> AsyncGenerator[bytes, None]:
+        """
+        Iterate over seconds of audio data until EOF.
+
+        Args:
+            seek_position: Optional starting position in seconds (default: 0).
+
+        Yields:
+            Bytes containing one second of audio data
+        """
+        chunk_number = seek_position
+        while True:
+            try:
+                yield await self.get(chunk_number=chunk_number)
+                chunk_number += 1
+            except AudioError:
+                break  # EOF reached
+
+    async def clear(self) -> None:
+        """Reset the buffer completely, clearing all data."""
+        chunk_count = len(self._chunks)
+        LOGGER.log(
+            VERBOSE_LOG_LEVEL,
+            "AudioBuffer.clear: Resetting buffer (had %s chunks, has fill task: %s)",
+            chunk_count,
+            self._buffer_fill_task is not None,
+        )
+        if self._buffer_fill_task:
+            self._buffer_fill_task.cancel()
+            with suppress(asyncio.CancelledError):
+                await self._buffer_fill_task
+        if self._inactivity_task:
+            self._inactivity_task.cancel()
+            with suppress(asyncio.CancelledError):
+                await self._inactivity_task
+        async with self._lock:
+            self._chunks.clear()
+            self._discarded_chunks = 0
+            self._eof_received = False
+            self._cancelled = True  # Mark buffer as cancelled
+            # Notify all waiting tasks
+            self._data_available.notify_all()
+            self._space_available.notify_all()
+
+        # Run garbage collection in executor to reclaim memory from large buffers
+        loop = asyncio.get_running_loop()
+        await loop.run_in_executor(None, gc.collect)
+
+    async def set_eof(self) -> None:
+        """Signal that no more data will be added to the buffer."""
+        async with self._lock:
+            LOGGER.log(
+                VERBOSE_LOG_LEVEL,
+                "AudioBuffer.set_eof: Marking EOF (buffer has %s chunks)",
+                len(self._chunks),
+            )
+            self._eof_received = True
+            # Wake up all waiting consumers and producers
+            self._data_available.notify_all()
+            self._space_available.notify_all()
+
+    async def _monitor_inactivity(self) -> None:
+        """Monitor buffer for inactivity and clear if inactive for 5 minutes."""
+        inactivity_timeout = 60 * 5  # 5 minutes
+        check_interval = 30  # Check every 30 seconds
+        while True:
+            await asyncio.sleep(check_interval)
+
+            # Check if buffer has been inactive (no data and no activity)
+            time_since_access = time.time() - self._last_access_time
+
+            # If buffer is empty and hasn't been accessed for timeout period,
+            # it likely means the producer failed or stream was abandoned
+            if len(self._chunks) == 0 and time_since_access > inactivity_timeout:
+                LOGGER.log(
+                    VERBOSE_LOG_LEVEL,
+                    "AudioBuffer: Empty buffer with no activity for %.1f seconds, "
+                    "clearing (likely abandoned stream)",
+                    time_since_access,
+                )
+                await self.clear()
+                break  # Stop monitoring after clearing
+
+            # If buffer has data but hasn't been consumed, clear it
+            if len(self._chunks) > 0 and time_since_access > inactivity_timeout:
+                LOGGER.log(
+                    VERBOSE_LOG_LEVEL,
+                    "AudioBuffer: No activity for %.1f seconds, clearing buffer (had %s chunks)",
+                    time_since_access,
+                    len(self._chunks),
+                )
+                await self.clear()
+                break  # Stop monitoring after clearing
+
+    def attach_fill_task(self, task: asyncio.Task[Any]) -> None:
+        """Attach a background task that fills the buffer."""
+        self._buffer_fill_task = task
+
+        # Start inactivity monitor if not already running
+        if self._inactivity_task is None or self._inactivity_task.done():
+            self._last_access_time = time.time()  # Initialize access time
+            loop = asyncio.get_running_loop()
+            self._inactivity_task = loop.create_task(self._monitor_inactivity())
diff --git a/music_assistant/helpers/buffered_generator.py b/music_assistant/helpers/buffered_generator.py
new file mode 100644 (file)
index 0000000..53b35ad
--- /dev/null
@@ -0,0 +1,148 @@
+"""Helper for adding buffering to async generators."""
+
+from __future__ import annotations
+
+import asyncio
+import contextlib
+from collections.abc import AsyncGenerator, Callable
+from functools import wraps
+from typing import Final, ParamSpec
+
+# Type variables for the buffered decorator
+_P = ParamSpec("_P")
+
+DEFAULT_BUFFER_SIZE: Final = 30
+DEFAULT_MIN_BUFFER_BEFORE_YIELD: Final = 5
+
+
+async def buffered(
+    generator: AsyncGenerator[bytes, None],
+    buffer_size: int = DEFAULT_BUFFER_SIZE,
+    min_buffer_before_yield: int = DEFAULT_MIN_BUFFER_BEFORE_YIELD,
+) -> AsyncGenerator[bytes, None]:
+    """
+    Add buffering to an async generator that yields bytes.
+
+    This function uses an asyncio.Queue to decouple the producer (reading from the stream)
+    from the consumer (yielding to the client). The producer runs in a separate task and
+    fills the buffer, while the consumer yields from the buffer.
+
+    Args:
+        generator: The async generator to buffer
+        buffer_size: Maximum number of chunks to buffer (default: 30)
+        min_buffer_before_yield: Minimum chunks to buffer before starting to yield (default: 5)
+
+    Example:
+        async for chunk in buffered(my_generator(), buffer_size=100):
+            process(chunk)
+    """
+    buffer: asyncio.Queue[bytes | None] = asyncio.Queue(maxsize=buffer_size)
+    producer_error: Exception | None = None
+    threshold_reached = asyncio.Event()
+
+    if buffer_size <= 1:
+        # No buffering needed, yield directly
+        async for chunk in generator:
+            yield chunk
+        return
+
+    async def producer() -> None:
+        """Read from the original generator and fill the buffer."""
+        nonlocal producer_error
+        try:
+            async for chunk in generator:
+                await buffer.put(chunk)
+                if not threshold_reached.is_set() and buffer.qsize() >= min_buffer_before_yield:
+                    threshold_reached.set()
+        except asyncio.CancelledError:
+            # Task was cancelled, clean up the generator
+            with contextlib.suppress(RuntimeError, asyncio.CancelledError):
+                await generator.aclose()
+            raise
+        except Exception as err:
+            producer_error = err
+            # Consumer probably stopped consuming, close the original generator
+            with contextlib.suppress(RuntimeError, asyncio.CancelledError):
+                await generator.aclose()
+        finally:
+            threshold_reached.set()
+            # Signal end of stream by putting None
+            with contextlib.suppress(asyncio.QueueFull):
+                await buffer.put(None)
+
+    # Start the producer task
+    loop = asyncio.get_running_loop()
+    producer_task = loop.create_task(producer())
+
+    # Keep a strong reference to prevent garbage collection issues
+    # The event loop only keeps weak references to tasks
+    _active_tasks = getattr(loop, "_buffered_generator_tasks", None)
+    if _active_tasks is None:
+        _active_tasks = set()
+        loop._buffered_generator_tasks = _active_tasks  # type: ignore[attr-defined]
+    _active_tasks.add(producer_task)
+
+    # Remove from set when done
+    producer_task.add_done_callback(_active_tasks.discard)
+
+    try:
+        # Wait for initial buffer to fill
+        await threshold_reached.wait()
+
+        # Consume from buffer and yield
+        while True:
+            data = await buffer.get()
+            if data is None:
+                # End of stream
+                if producer_error:
+                    raise producer_error
+                break
+            yield data
+
+    finally:
+        # Ensure the producer task is cleaned up
+        if not producer_task.done():
+            producer_task.cancel()
+        with contextlib.suppress(asyncio.CancelledError, RuntimeError):
+            await producer_task
+
+
+def use_buffer(
+    buffer_size: int = DEFAULT_BUFFER_SIZE,
+    min_buffer_before_yield: int = DEFAULT_MIN_BUFFER_BEFORE_YIELD,
+) -> Callable[
+    [Callable[_P, AsyncGenerator[bytes, None]]],
+    Callable[_P, AsyncGenerator[bytes, None]],
+]:
+    """
+    Add buffering to async generator functions that yield bytes (decorator).
+
+    This decorator uses an asyncio.Queue to decouple the producer (reading from the stream)
+    from the consumer (yielding to the client). The producer runs in a separate task and
+    fills the buffer, while the consumer yields from the buffer.
+
+    Args:
+        buffer_size: Maximum number of chunks to buffer (default: 30)
+        min_buffer_before_yield: Minimum chunks to buffer before starting to yield (default: 5)
+
+    Example:
+        @use_buffer(buffer_size=100)
+        async def my_stream() -> AsyncGenerator[bytes, None]:
+            ...
+    """
+
+    def decorator(
+        func: Callable[_P, AsyncGenerator[bytes, None]],
+    ) -> Callable[_P, AsyncGenerator[bytes, None]]:
+        @wraps(func)
+        async def wrapper(*args: _P.args, **kwargs: _P.kwargs) -> AsyncGenerator[bytes, None]:
+            async for chunk in buffered(
+                func(*args, **kwargs),
+                buffer_size=buffer_size,
+                min_buffer_before_yield=min_buffer_before_yield,
+            ):
+                yield chunk
+
+        return wrapper
+
+    return decorator
index 2a373e35388c16e6ca861ce69a93a0d17e6957bd..212fe78e4606c0ae4a0c23824c52ac983fb19c46 100644 (file)
@@ -796,7 +796,12 @@ def guard_single_request[ProviderT: "Provider | CoreController", **P, R](
             cache_key_parts.append(f"{key}{kwargs[key]}")
         task_id = ".".join(map(str, cache_key_parts))
         task: asyncio.Task[R] = mass.create_task(
-            func, self, *args, **kwargs, task_id=task_id, abort_existing=False
+            func,
+            self,
+            *args,
+            task_id=task_id,
+            abort_existing=False,
+            **kwargs,
         )
         return await task
 
index 3250205317f884941855e7a8e42c4006139dda34..1e252148d7e48a2c245a4916daacb26d473beffd 100644 (file)
@@ -427,11 +427,9 @@ class SonosPlayer(Player):
             # Regular Queue item playback
             # create a sonos cloud queue and load it
             cloud_queue_url = f"{self.mass.streams.base_url}/sonos_queue/v2.3/"
-            track_data = self.provider._parse_sonos_queue_item(media)
             await self.client.player.group.play_cloud_queue(
                 cloud_queue_url,
                 item_id=media.queue_item_id,
-                track_metadata=track_data["track"],
             )
             return
 
index 0626f17cf994dd004e2801239e96a3dc8012ec63..8b28afc71b9a4e18871ffe763780c80a7992b24a 100644 (file)
@@ -292,7 +292,7 @@ class SonosPlayerProvider(PlayerProvider):
                 "service": {"name": "Music Assistant", "id": "mass"},
                 "name": media.title,
                 "imageUrl": media.image_url,
-                "durationMillis": media.duration * 1000 if media.duration else 0,
+                "durationMillis": int(media.duration * 1000) if media.duration else 0,
                 "artist": {
                     "name": media.artist,
                 }