From: Marcel van der Veldt Date: Sat, 25 Oct 2025 02:54:24 +0000 (+0200) Subject: Add extra buffering to queue stream to create backpressure (#2544) X-Git-Url: https://git.kitaultman.com/?a=commitdiff_plain;h=8f05d7127170015d2caa99b5fad2a09de6654fc8;p=music-assistant-server.git Add extra buffering to queue stream to create backpressure (#2544) --- diff --git a/music_assistant/controllers/streams.py b/music_assistant/controllers/streams.py index d99c6227..d4cbaebe 100644 --- a/music_assistant/controllers/streams.py +++ b/music_assistant/controllers/streams.py @@ -9,12 +9,13 @@ the upnp callbacks and json rpc api for slimproto clients. from __future__ import annotations import asyncio +import gc import logging import os import urllib.parse from collections.abc import AsyncGenerator from dataclasses import dataclass -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Final from aiofiles.os import wrap from aiohttp import web @@ -58,11 +59,13 @@ from music_assistant.helpers.audio import LOGGER as AUDIO_LOGGER from music_assistant.helpers.audio import ( get_chunksize, get_media_stream, + get_media_stream_with_buffer, get_player_filter_params, get_silence, get_stream_details, resample_pcm_audio, ) +from music_assistant.helpers.buffered_generator import use_buffer from music_assistant.helpers.ffmpeg import LOGGER as FFMPEG_LOGGER from music_assistant.helpers.ffmpeg import check_ffmpeg_version, get_ffmpeg_stream from music_assistant.helpers.smart_fades import ( @@ -73,6 +76,7 @@ from music_assistant.helpers.smart_fades import ( from music_assistant.helpers.util import get_ip_addresses, select_free_port from music_assistant.helpers.webserver import Webserver from music_assistant.models.core_controller import CoreController +from music_assistant.models.music_provider import MusicProvider from music_assistant.models.plugin import PluginProvider if TYPE_CHECKING: @@ -87,6 +91,8 @@ if TYPE_CHECKING: isfile = wrap(os.path.isfile) +CONF_ALLOW_BUFFER: Final[str] = "allow_buffering" + def parse_pcm_info(content_type: str) -> tuple[int, int, int]: """Parse PCM info from a codec/content_type string.""" @@ -170,6 +176,23 @@ class StreamsController(CoreController): "Make sure that this server can be reached " "on the given IP and TCP port by players on the local network.", ), + ConfigEntry( + key=CONF_ALLOW_BUFFER, + type=ConfigEntryType.BOOLEAN, + default_value=False, + label="Allow (in-memory) buffering of (track) audio", + description="By default, Music Assistant tries to be as resource " + "efficient as possible when streaming audio, especially considering " + "low-end devices such as Raspberry Pi's. This means that audio " + "buffering is disabled by default to reduce memory usage. \n\n" + "Enabling this option allows for in-memory buffering of audio, " + "which (massively) improves playback (and seeking) performance but it comes " + "at the cost of increased memory usage. " + "If you run Music Assistant on a capable device with enough memory, " + "enabling this option is strongly recommended.", + required=False, + category="audio", + ), ConfigEntry( key=CONF_VOLUME_NORMALIZATION_RADIO, type=ConfigEntryType.STRING, @@ -437,6 +460,7 @@ class StreamsController(CoreController): audio_input = self.get_queue_item_stream( queue_item=queue_item, pcm_format=pcm_format, + seek_position=queue_item.streamdetails.seek_position, ) # stream the audio # this final ffmpeg process in the chain will convert the raw, lossless PCM audio into @@ -452,7 +476,11 @@ class StreamsController(CoreController): input_format=pcm_format, output_format=output_format, ), - chunk_size=get_chunksize(output_format), + # we need to slowly feed the music to avoid the player stopping and later + # restarting (or completely failing) the audio stream by keeping the buffer short. + # this is reported to be an issue especially with Chromecast players. + # see for example: https://github.com/music-assistant/support/issues/3717 + extra_input_args=["-readrate", "1.0", "-readrate_initial_burst", "2"], ): try: await resp.write(chunk) @@ -558,6 +586,11 @@ class StreamsController(CoreController): filter_params=get_player_filter_params( self.mass, queue_player.player_id, flow_pcm_format, output_format ), + # we need to slowly feed the music to avoid the player stopping and later + # restarting (or completely failing) the audio stream by keeping the buffer short. + # this is reported to be an issue especially with Chromecast players. + # see for example: https://github.com/music-assistant/support/issues/3717 + extra_input_args=["-readrate", "1.0", "-readrate_initial_burst", "5"], chunk_size=icy_meta_interval if enable_icy else get_chunksize(output_format), ): try: @@ -718,7 +751,7 @@ class StreamsController(CoreController): ) http_profile = str(http_profile_value) if http_profile_value is not None else "default" if http_profile == "forced_content_length": - # guess content length based on duration + # just set an insanely high content length to make sure the player keeps playing resp.content_length = get_chunksize(output_format, 12 * 3600) elif http_profile == "chunked": resp.enable_chunked_encoding() @@ -761,6 +794,7 @@ class StreamsController(CoreController): # like https hosts and it also offers the pre-announce 'bell' return f"{self.base_url}/announcement/{player_id}.{content_type.value}" + @use_buffer(30, 4) async def get_queue_flow_stream( self, queue: PlayerQueue, @@ -841,6 +875,7 @@ class StreamsController(CoreController): async for chunk in self.get_queue_item_stream( queue_track, pcm_format=pcm_format, + seek_position=queue_track.streamdetails.seek_position, ): # buffer size needs to be big enough to include the crossfade part req_buffer_size = ( @@ -1037,6 +1072,7 @@ class StreamsController(CoreController): self, queue_item: QueueItem, pcm_format: AudioFormat, + seek_position: int = 0, ) -> AsyncGenerator[bytes, None]: """Get the (PCM) audio stream for a single queue item.""" # collect all arguments for ffmpeg @@ -1074,23 +1110,97 @@ class StreamsController(CoreController): filter_params.append(f"volume={gain_correct}dB") streamdetails.volume_normalization_gain_correct = gain_correct - first_chunk_received = False - async for chunk in get_media_stream( - self.mass, - streamdetails=streamdetails, - pcm_format=pcm_format, - filter_params=filter_params, - ): - if not first_chunk_received: - first_chunk_received = True - # inform the queue that the track is now loaded in the buffer - # so for example the next track can be enqueued - self.mass.player_queues.track_loaded_in_buffer( - queue_item.queue_id, queue_item.queue_item_id - ) - yield chunk - del chunk + allow_buffer = bool( + self.mass.config.get_raw_core_config_value(self.domain, CONF_ALLOW_BUFFER, False) + and streamdetails.duration + ) + + self.logger.debug( + "Starting queue item stream for %s (%s)" + " - using buffer: %s" + " - using fade-in: %s" + " - volume normalization: %s", + queue_item.name, + queue_item.uri, + allow_buffer, + streamdetails.fade_in, + streamdetails.volume_normalization_mode, + ) + if allow_buffer: + media_stream_gen = get_media_stream_with_buffer( + self.mass, + streamdetails=streamdetails, + pcm_format=pcm_format, + seek_position=int(seek_position), + filter_params=filter_params, + ) + else: + media_stream_gen = get_media_stream( + self.mass, + streamdetails=streamdetails, + pcm_format=pcm_format, + seek_position=int(seek_position), + filter_params=filter_params, + ) + first_chunk_received = False + fade_in_buffer = b"" + bytes_received = 0 + aborted = False + try: + async for chunk in media_stream_gen: + bytes_received += len(chunk) + if not first_chunk_received: + first_chunk_received = True + # inform the queue that the track is now loaded in the buffer + # so for example the next track can be enqueued + self.mass.player_queues.track_loaded_in_buffer( + queue_item.queue_id, queue_item.queue_item_id + ) + # handle optional fade-in + if streamdetails.fade_in: + if len(fade_in_buffer) < pcm_format.pcm_sample_size * 4: + fade_in_buffer += chunk + elif fade_in_buffer: + async for fade_chunk in get_ffmpeg_stream( + audio_input=fade_in_buffer + chunk, + input_format=pcm_format, + output_format=pcm_format, + filter_params=["afade=type=in:start_time=0:duration=3"], + ): + yield fade_chunk + fade_in_buffer = b"" + streamdetails.fade_in = False + else: + yield chunk + # help garbage collection by explicitly deleting chunk + del chunk + except (Exception, GeneratorExit): + aborted = True + raise + finally: + # determine how many seconds we've streamed + # for pcm output we can calculate this easily + seconds_streamed = bytes_received / pcm_format.pcm_sample_size + streamdetails.seconds_streamed = seconds_streamed + self.logger.debug( + "stream %s for %s - seconds streamed: %s", + "aborted" if aborted else "finished", + streamdetails.uri, + seconds_streamed, + ) + # report stream to provider + if (not aborted and seconds_streamed >= 30) and ( + music_prov := self.mass.get_provider(streamdetails.provider) + ): + if TYPE_CHECKING: # avoid circular import + assert isinstance(music_prov, MusicProvider) + self.mass.create_task(music_prov.on_streamed(streamdetails)) + # Run garbage collection in executor to reclaim memory from large buffers + loop = asyncio.get_running_loop() + await loop.run_in_executor(None, gc.collect) + + @use_buffer(30, 4) async def get_queue_item_stream_with_smartfade( self, queue_item: QueueItem, @@ -1128,7 +1238,17 @@ class StreamsController(CoreController): crossfade_size = int(pcm_format.pcm_sample_size * standard_crossfade_duration + 4) fade_out_data: bytes | None = None - async for chunk in self.get_queue_item_stream(queue_item, pcm_format): + if crossfade_data: + discard_seconds = int(crossfade_data.fade_in_size / pcm_format.pcm_sample_size) - 1 + discard_bytes = int(discard_seconds * pcm_format.pcm_sample_size) + discard_leftover = int(crossfade_data.fade_in_size - discard_bytes) + else: + discard_seconds = streamdetails.seek_position + discard_leftover = 0 + + async for chunk in self.get_queue_item_stream( + queue_item, pcm_format, seek_position=discard_seconds + ): # ALWAYS APPEND CHUNK TO BUFFER buffer += chunk del chunk @@ -1138,8 +1258,8 @@ class StreamsController(CoreController): #### HANDLE CROSSFADE DATA FROM PREVIOUS TRACK if crossfade_data: - # discard the fade_in_part from the crossfade data - buffer = buffer[crossfade_data.fade_in_size :] + # discard the fade_in_part from the crossfade data (minus what we already seeked) + buffer = buffer[discard_leftover:] # send the (second half of the) crossfade data if crossfade_data.pcm_format != pcm_format: # pcm format mismatch, we need to resample the crossfade data diff --git a/music_assistant/helpers/audio.py b/music_assistant/helpers/audio.py index 9c1446c7..dd5ce2ae 100644 --- a/music_assistant/helpers/audio.py +++ b/music_assistant/helpers/audio.py @@ -49,6 +49,7 @@ from music_assistant.helpers.json import JSON_DECODE_EXCEPTIONS, json_loads from music_assistant.helpers.throttle_retry import BYPASS_THROTTLER from music_assistant.helpers.util import clean_stream_title, remove_file +from .audio_buffer import AudioBuffer from .datetime import utc from .dsp import filter_to_ffmpeg_params from .ffmpeg import FFMpeg, get_ffmpeg_stream @@ -333,9 +334,9 @@ async def get_stream_details( raise MediaNotFoundError( f"Unable to retrieve streamdetails for {queue_item.name} ({queue_item.uri})" ) - if ( - queue_item.streamdetails - and (utc() - queue_item.streamdetails.created_at).seconds < STREAMDETAILS_EXPIRATION + if queue_item.streamdetails and ( + (utc() - queue_item.streamdetails.created_at).seconds < STREAMDETAILS_EXPIRATION + or queue_item.streamdetails.buffer ): # already got a fresh/unused (or cached) streamdetails # we assume that the streamdetails are valid for max STREAMDETAILS_EXPIRATION seconds @@ -383,6 +384,14 @@ async def get_stream_details( resolved_url, stream_type = await resolve_radio_stream(mass, streamdetails.path) streamdetails.path = resolved_url streamdetails.stream_type = stream_type + # handle volume normalization details + if result := await mass.music.get_loudness( + streamdetails.item_id, + streamdetails.provider, + media_type=queue_item.media_type, + ): + streamdetails.loudness = result[0] + streamdetails.loudness_album = result[1] # set queue_id on the streamdetails so we know what is being streamed streamdetails.queue_id = queue_item.queue_id @@ -391,15 +400,6 @@ async def get_stream_details( streamdetails.fade_in = fade_in if not streamdetails.duration: streamdetails.duration = queue_item.duration - - # handle volume normalization details - if result := await mass.music.get_loudness( - streamdetails.item_id, - streamdetails.provider, - media_type=queue_item.media_type, - ): - streamdetails.loudness = result[0] - streamdetails.loudness_album = result[1] streamdetails.prefer_album_loudness = prefer_album_loudness player_settings = await mass.config.get_player_config(streamdetails.queue_id) core_config = await mass.config.get_core_config("streams") @@ -421,22 +421,139 @@ async def get_stream_details( return streamdetails +async def get_media_stream_with_buffer( + mass: MusicAssistant, + streamdetails: StreamDetails, + pcm_format: AudioFormat, + seek_position: int = 0, + filter_params: list[str] | None = None, +) -> AsyncGenerator[bytes, None]: + """Get audio stream for given media details as raw PCM with buffering.""" + LOGGER.debug( + "get_media_stream_with_buffer: Starting for %s (seek: %s)", + streamdetails.uri, + seek_position, + ) + + # checksum based on pcm_format and filter_params + checksum = f"{pcm_format}-{filter_params}" + + async def fill_buffer_task() -> None: + """Background task to fill the audio buffer.""" + chunk_count = 0 + try: + async for chunk in get_media_stream( + mass, streamdetails, pcm_format, seek_position=0, filter_params=filter_params + ): + chunk_count += 1 + await audio_buffer.put(chunk) + except asyncio.CancelledError: + LOGGER.log( + VERBOSE_LOG_LEVEL, + "fill_buffer_task: Cancelled after %s chunks for %s", + chunk_count, + streamdetails.uri, + ) + raise + except Exception as err: + LOGGER.error( + "fill audio buffer task: Error after %s chunks for %s: %s", + chunk_count, + streamdetails.uri, + err, + ) + finally: + await audio_buffer.set_eof() + LOGGER.log( + VERBOSE_LOG_LEVEL, + "fill_buffer_task: Completed (%s chunks) for %s", + chunk_count, + streamdetails.uri, + ) + + # check for existing buffer and reuse if possible + existing_buffer: AudioBuffer | None = streamdetails.buffer + if existing_buffer is not None: + if not existing_buffer.is_valid(checksum, seek_position): + LOGGER.log( + VERBOSE_LOG_LEVEL, + "get_media_stream_with_buffer: Existing buffer invalid for %s " + "(seek: %s, discarded: %s)", + streamdetails.uri, + seek_position, + existing_buffer._discarded_chunks, + ) + await existing_buffer.clear() + streamdetails.buffer = None + existing_buffer = None + else: + LOGGER.debug( + "get_media_stream_with_buffer: Reusing existing buffer for %s - " + "available: %ss, seek: %s, discarded: %s", + streamdetails.uri, + existing_buffer.seconds_available, + seek_position, + existing_buffer._discarded_chunks, + ) + audio_buffer = existing_buffer + + if not existing_buffer and seek_position > 60: + # If seeking into the track and no valid buffer exists, + # just start a normal stream without buffering, + # otherwise we would need to fill the buffer up to the seek position first + # which is not efficient. + LOGGER.debug( + "get_media_stream_with_buffer: No existing buffer and seek >60s for %s, " + "starting normal stream", + streamdetails.uri, + ) + async for chunk in get_media_stream( + mass, + streamdetails, + pcm_format, + seek_position=seek_position, + filter_params=filter_params, + ): + yield chunk + return + + if not existing_buffer: + # create new audio buffer and start fill task + LOGGER.debug( + "get_media_stream_with_buffer: Creating new buffer for %s", + streamdetails.uri, + ) + audio_buffer = AudioBuffer(pcm_format, checksum) + streamdetails.buffer = audio_buffer + task = mass.loop.create_task(fill_buffer_task()) + audio_buffer.attach_fill_task(task) + + # yield data from the buffer + chunk_count = 0 + try: + async for chunk in audio_buffer.iter(seek_position=seek_position): + chunk_count += 1 + yield chunk + finally: + LOGGER.log( + VERBOSE_LOG_LEVEL, + "get_media_stream_with_buffer: Completed, yielded %s chunks", + chunk_count, + ) + + async def get_media_stream( mass: MusicAssistant, streamdetails: StreamDetails, pcm_format: AudioFormat, + seek_position: int = 0, filter_params: list[str] | None = None, ) -> AsyncGenerator[bytes, None]: """Get audio stream for given media details as raw PCM.""" logger = LOGGER.getChild("media_stream") logger.log(VERBOSE_LOG_LEVEL, "Starting media stream for %s", streamdetails.uri) extra_input_args = streamdetails.extra_input_args or [] - if filter_params is None: - filter_params = [] - if streamdetails.fade_in: - filter_params.append("afade=type=in:start_time=0:duration=3") - seek_position = streamdetails.seek_position # work out audio source for these streamdetails audio_source: str | AsyncGenerator[bytes, None] stream_type = streamdetails.stream_type @@ -498,14 +615,9 @@ async def get_media_stream( await ffmpeg_proc.start() assert ffmpeg_proc.proc is not None # for type checking logger.debug( - "Started media stream for %s" - " - using streamtype: %s" - " - volume normalization: %s" - " - output format: %s" - " - ffmpeg PID: %s", + "Started media stream for %s - using streamtype: %s - pcm format: %s - ffmpeg PID: %s", streamdetails.uri, streamdetails.stream_type, - streamdetails.volume_normalization_mode, pcm_format.content_type.value, ffmpeg_proc.proc.pid, ) @@ -519,8 +631,9 @@ async def get_media_stream( first_chunk_received = True streamdetails.audio_format.codec_type = ffmpeg_proc.input_format.codec_type logger.debug( - "First chunk received after %s seconds", + "First chunk received after %s seconds (codec detected: %s)", mass.loop.time() - stream_start, + ffmpeg_proc.input_format.codec_type, ) yield chunk bytes_sent += len(chunk) @@ -547,27 +660,26 @@ async def get_media_stream( finally: # always ensure close is called which also handles all cleanup await ffmpeg_proc.close() - # determine how many seconds we've streamed + # determine how many seconds we've received # for pcm output we can calculate this easily - seconds_streamed = bytes_sent / pcm_format.pcm_sample_size if bytes_sent else 0 - streamdetails.seconds_streamed = seconds_streamed + seconds_received = bytes_sent / pcm_format.pcm_sample_size if bytes_sent else 0 # store accurate duration - if finished and not streamdetails.seek_position and seconds_streamed: - streamdetails.duration = int(seconds_streamed) + if finished and not seek_position and seconds_received: + streamdetails.duration = int(seconds_received) - logger.debug( - "stream %s (with code %s) for %s - seconds streamed: %s", + logger.log( + VERBOSE_LOG_LEVEL, + "stream %s (with code %s) for %s", "cancelled" if cancelled else "finished" if finished else "aborted", ffmpeg_proc.returncode, streamdetails.uri, - seconds_streamed, ) # parse loudnorm data if we have that collected (and enabled) if ( (streamdetails.loudness is None or finished) and streamdetails.volume_normalization_mode == VolumeNormalizationMode.DYNAMIC - and (finished or (seconds_streamed >= 300)) + and (finished or (seconds_received >= 300)) ): # if dynamic volume normalization is enabled and the entire track is streamed # the loudnorm filter will output the measurement in the log, @@ -579,7 +691,6 @@ async def get_media_stream( streamdetails.uri, loudness_details, ) - streamdetails.loudness = loudness_details mass.create_task( mass.music.set_loudness( streamdetails.item_id, @@ -596,21 +707,13 @@ async def get_media_stream( VolumeNormalizationMode.DISABLED, VolumeNormalizationMode.FIXED_GAIN, ) - and (finished or (seconds_streamed >= 300)) + and (finished or (seconds_received >= 300)) ): # dynamic mode not allowed and no measurement known, we need to analyze the audio # add background task to start analyzing the audio task_id = f"analyze_loudness_{streamdetails.uri}" mass.call_later(5, analyze_loudness, mass, streamdetails, task_id=task_id) - # report stream to provider - if (finished or seconds_streamed >= 30) and ( - music_prov := mass.get_provider(streamdetails.provider) - ): - if TYPE_CHECKING: # avoid circular import - assert isinstance(music_prov, MusicProvider) - mass.create_task(music_prov.on_streamed(streamdetails)) - def create_wave_header( samplerate: int = 44100, channels: int = 2, bitspersample: int = 16, duration: int | None = None @@ -1288,14 +1391,12 @@ async def analyze_loudness( streamdetails: StreamDetails, ) -> None: """Analyze media item's audio, to calculate EBU R128 loudness.""" - if result := await mass.music.get_loudness( + if await mass.music.get_loudness( streamdetails.item_id, streamdetails.provider, media_type=streamdetails.media_type, ): # only when needed we do the analyze job - streamdetails.loudness = result[0] - streamdetails.loudness_album = result[1] return logger = LOGGER.getChild("analyze_loudness") @@ -1360,7 +1461,6 @@ async def analyze_loudness( log_lines_str or "received empty value", ) else: - streamdetails.loudness = loudness await mass.music.set_loudness( streamdetails.item_id, streamdetails.provider, diff --git a/music_assistant/helpers/audio_buffer.py b/music_assistant/helpers/audio_buffer.py new file mode 100644 index 00000000..24ff7d01 --- /dev/null +++ b/music_assistant/helpers/audio_buffer.py @@ -0,0 +1,314 @@ +"""Audio buffer implementation for PCM audio streaming.""" + +from __future__ import annotations + +import asyncio +import gc +import logging +import time +from collections import deque +from collections.abc import AsyncGenerator +from contextlib import suppress +from typing import TYPE_CHECKING, Any + +from music_assistant_models.errors import AudioError + +from music_assistant.constants import MASS_LOGGER_NAME, VERBOSE_LOG_LEVEL + +if TYPE_CHECKING: + from music_assistant_models.media_items import AudioFormat + +LOGGER = logging.getLogger(f"{MASS_LOGGER_NAME}.audio_buffer") + +DEFAULT_MAX_BUFFER_SIZE_SECONDS: int = 60 * 5 # 5 minutes + + +class AudioBuffer: + """Simple buffer to hold (PCM) audio chunks with seek capability. + + Each chunk represents exactly 1 second of audio. + Chunks are stored in a deque for efficient O(1) append and popleft operations. + """ + + def __init__( + self, + pcm_format: AudioFormat, + checksum: str, + max_size_seconds: int = DEFAULT_MAX_BUFFER_SIZE_SECONDS, + ) -> None: + """ + Initialize AudioBuffer. + + Args: + pcm_format: The PCM audio format specification + checksum: The checksum for the audio data (for validation purposes) + max_size_seconds: Maximum buffer size in seconds + """ + self.pcm_format = pcm_format + self.checksum = checksum + self.max_size_seconds = max_size_seconds + # Store chunks in a deque for O(1) append and popleft operations + self._chunks: deque[bytes] = deque() + # Track how many chunks have been discarded from the start + self._discarded_chunks = 0 + self._lock = asyncio.Lock() + self._data_available = asyncio.Condition(self._lock) + self._space_available = asyncio.Condition(self._lock) + self._eof_received = False + self._buffer_fill_task: asyncio.Task[None] | None = None + self._last_access_time: float = time.time() + self._inactivity_task: asyncio.Task[None] | None = None + self._cancelled = False # Set to True when buffer is cleared/cancelled + + @property + def cancelled(self) -> bool: + """Return whether the buffer has been cancelled or cleared.""" + if self._cancelled: + return True + return self._buffer_fill_task is not None and self._buffer_fill_task.cancelled() + + @property + def chunk_size_bytes(self) -> int: + """Return the size in bytes of one second of PCM audio.""" + return self.pcm_format.pcm_sample_size + + @property + def size_seconds(self) -> int: + """Return current size of the buffer in seconds.""" + return len(self._chunks) + + @property + def seconds_available(self) -> int: + """Return number of seconds of audio currently available in the buffer.""" + return len(self._chunks) + + def is_valid(self, checksum: str, seek_position: int = 0) -> bool: + """ + Validate the buffer's checksum and check if seek position is available. + + Args: + checksum: The checksum to validate against + seek_position: The position we want to seek to (0-based) + + Returns: + True if buffer is valid and seek position is available + """ + if self.cancelled: + return False + + if self.checksum != checksum: + return False + + # Check if buffer is close to inactivity timeout (within 30 seconds) + # to prevent race condition where buffer gets cleared right after validation + time_since_access = time.time() - self._last_access_time + inactivity_timeout = 60 * 5 # 5 minutes + if time_since_access > (inactivity_timeout - 30): + # Buffer is close to being cleared, don't reuse it + return False + + # Check if the seek position has already been discarded + return seek_position >= self._discarded_chunks + + async def put(self, chunk: bytes) -> None: + """ + Put a chunk of data into the buffer. + + Each chunk represents exactly 1 second of PCM audio. + Waits if buffer is full. + + Args: + chunk: Bytes representing 1 second of PCM audio + """ + async with self._space_available: + # Wait until there's space in the buffer + while len(self._chunks) >= self.max_size_seconds and not self._eof_received: + if LOGGER.isEnabledFor(VERBOSE_LOG_LEVEL): + LOGGER.log( + VERBOSE_LOG_LEVEL, + "AudioBuffer.put: Buffer full (%s/%s), waiting for space...", + len(self._chunks), + self.max_size_seconds, + ) + await self._space_available.wait() + + if self._eof_received: + # Don't accept new data after EOF + LOGGER.log( + VERBOSE_LOG_LEVEL, "AudioBuffer.put: EOF already received, rejecting chunk" + ) + return + + # Add chunk to the list (index = second position) + self._chunks.append(chunk) + if LOGGER.isEnabledFor(VERBOSE_LOG_LEVEL): + LOGGER.log( + VERBOSE_LOG_LEVEL, + "AudioBuffer.put: Added chunk at position %s (size: %s bytes, buffer size: %s)", + self._discarded_chunks + len(self._chunks) - 1, + len(chunk), + len(self._chunks), + ) + + # Notify waiting consumers + self._data_available.notify_all() + + async def get(self, chunk_number: int = 0) -> bytes: + """ + Get one second of data from the buffer at the specified chunk number. + + Waits until requested chunk is available. + Discards old chunks if buffer is full. + + Args: + chunk_number: The chunk index to retrieve (0-based, absolute position). + + Returns: + Bytes containing one second of audio data + + Raises: + AudioError: If EOF is reached before chunk is available or + if chunk has been discarded + """ + # Update last access time + self._last_access_time = time.time() + + async with self._data_available: + # Check if the chunk was already discarded + if chunk_number < self._discarded_chunks: + msg = ( + f"Chunk {chunk_number} has been discarded " + f"(buffer starts at {self._discarded_chunks})" + ) + raise AudioError(msg) + + # Wait until the requested chunk is available or EOF + buffer_index = chunk_number - self._discarded_chunks + while buffer_index >= len(self._chunks): + if self._eof_received: + raise AudioError("EOF") + await self._data_available.wait() + buffer_index = chunk_number - self._discarded_chunks + + # If buffer is at max size, discard the oldest chunk to make room + if len(self._chunks) >= self.max_size_seconds: + discarded = self._chunks.popleft() # O(1) operation with deque + self._discarded_chunks += 1 + if LOGGER.isEnabledFor(VERBOSE_LOG_LEVEL): + LOGGER.log( + VERBOSE_LOG_LEVEL, + "AudioBuffer.get: Discarded chunk %s (size: %s bytes) to free space", + self._discarded_chunks - 1, + len(discarded), + ) + # Notify producers waiting for space + self._space_available.notify_all() + # Recalculate buffer index after discard + buffer_index = chunk_number - self._discarded_chunks + + # Return the chunk at the requested index + return self._chunks[buffer_index] + + async def iter(self, seek_position: int = 0) -> AsyncGenerator[bytes, None]: + """ + Iterate over seconds of audio data until EOF. + + Args: + seek_position: Optional starting position in seconds (default: 0). + + Yields: + Bytes containing one second of audio data + """ + chunk_number = seek_position + while True: + try: + yield await self.get(chunk_number=chunk_number) + chunk_number += 1 + except AudioError: + break # EOF reached + + async def clear(self) -> None: + """Reset the buffer completely, clearing all data.""" + chunk_count = len(self._chunks) + LOGGER.log( + VERBOSE_LOG_LEVEL, + "AudioBuffer.clear: Resetting buffer (had %s chunks, has fill task: %s)", + chunk_count, + self._buffer_fill_task is not None, + ) + if self._buffer_fill_task: + self._buffer_fill_task.cancel() + with suppress(asyncio.CancelledError): + await self._buffer_fill_task + if self._inactivity_task: + self._inactivity_task.cancel() + with suppress(asyncio.CancelledError): + await self._inactivity_task + async with self._lock: + self._chunks.clear() + self._discarded_chunks = 0 + self._eof_received = False + self._cancelled = True # Mark buffer as cancelled + # Notify all waiting tasks + self._data_available.notify_all() + self._space_available.notify_all() + + # Run garbage collection in executor to reclaim memory from large buffers + loop = asyncio.get_running_loop() + await loop.run_in_executor(None, gc.collect) + + async def set_eof(self) -> None: + """Signal that no more data will be added to the buffer.""" + async with self._lock: + LOGGER.log( + VERBOSE_LOG_LEVEL, + "AudioBuffer.set_eof: Marking EOF (buffer has %s chunks)", + len(self._chunks), + ) + self._eof_received = True + # Wake up all waiting consumers and producers + self._data_available.notify_all() + self._space_available.notify_all() + + async def _monitor_inactivity(self) -> None: + """Monitor buffer for inactivity and clear if inactive for 5 minutes.""" + inactivity_timeout = 60 * 5 # 5 minutes + check_interval = 30 # Check every 30 seconds + while True: + await asyncio.sleep(check_interval) + + # Check if buffer has been inactive (no data and no activity) + time_since_access = time.time() - self._last_access_time + + # If buffer is empty and hasn't been accessed for timeout period, + # it likely means the producer failed or stream was abandoned + if len(self._chunks) == 0 and time_since_access > inactivity_timeout: + LOGGER.log( + VERBOSE_LOG_LEVEL, + "AudioBuffer: Empty buffer with no activity for %.1f seconds, " + "clearing (likely abandoned stream)", + time_since_access, + ) + await self.clear() + break # Stop monitoring after clearing + + # If buffer has data but hasn't been consumed, clear it + if len(self._chunks) > 0 and time_since_access > inactivity_timeout: + LOGGER.log( + VERBOSE_LOG_LEVEL, + "AudioBuffer: No activity for %.1f seconds, clearing buffer (had %s chunks)", + time_since_access, + len(self._chunks), + ) + await self.clear() + break # Stop monitoring after clearing + + def attach_fill_task(self, task: asyncio.Task[Any]) -> None: + """Attach a background task that fills the buffer.""" + self._buffer_fill_task = task + + # Start inactivity monitor if not already running + if self._inactivity_task is None or self._inactivity_task.done(): + self._last_access_time = time.time() # Initialize access time + loop = asyncio.get_running_loop() + self._inactivity_task = loop.create_task(self._monitor_inactivity()) diff --git a/music_assistant/helpers/buffered_generator.py b/music_assistant/helpers/buffered_generator.py new file mode 100644 index 00000000..53b35ad3 --- /dev/null +++ b/music_assistant/helpers/buffered_generator.py @@ -0,0 +1,148 @@ +"""Helper for adding buffering to async generators.""" + +from __future__ import annotations + +import asyncio +import contextlib +from collections.abc import AsyncGenerator, Callable +from functools import wraps +from typing import Final, ParamSpec + +# Type variables for the buffered decorator +_P = ParamSpec("_P") + +DEFAULT_BUFFER_SIZE: Final = 30 +DEFAULT_MIN_BUFFER_BEFORE_YIELD: Final = 5 + + +async def buffered( + generator: AsyncGenerator[bytes, None], + buffer_size: int = DEFAULT_BUFFER_SIZE, + min_buffer_before_yield: int = DEFAULT_MIN_BUFFER_BEFORE_YIELD, +) -> AsyncGenerator[bytes, None]: + """ + Add buffering to an async generator that yields bytes. + + This function uses an asyncio.Queue to decouple the producer (reading from the stream) + from the consumer (yielding to the client). The producer runs in a separate task and + fills the buffer, while the consumer yields from the buffer. + + Args: + generator: The async generator to buffer + buffer_size: Maximum number of chunks to buffer (default: 30) + min_buffer_before_yield: Minimum chunks to buffer before starting to yield (default: 5) + + Example: + async for chunk in buffered(my_generator(), buffer_size=100): + process(chunk) + """ + buffer: asyncio.Queue[bytes | None] = asyncio.Queue(maxsize=buffer_size) + producer_error: Exception | None = None + threshold_reached = asyncio.Event() + + if buffer_size <= 1: + # No buffering needed, yield directly + async for chunk in generator: + yield chunk + return + + async def producer() -> None: + """Read from the original generator and fill the buffer.""" + nonlocal producer_error + try: + async for chunk in generator: + await buffer.put(chunk) + if not threshold_reached.is_set() and buffer.qsize() >= min_buffer_before_yield: + threshold_reached.set() + except asyncio.CancelledError: + # Task was cancelled, clean up the generator + with contextlib.suppress(RuntimeError, asyncio.CancelledError): + await generator.aclose() + raise + except Exception as err: + producer_error = err + # Consumer probably stopped consuming, close the original generator + with contextlib.suppress(RuntimeError, asyncio.CancelledError): + await generator.aclose() + finally: + threshold_reached.set() + # Signal end of stream by putting None + with contextlib.suppress(asyncio.QueueFull): + await buffer.put(None) + + # Start the producer task + loop = asyncio.get_running_loop() + producer_task = loop.create_task(producer()) + + # Keep a strong reference to prevent garbage collection issues + # The event loop only keeps weak references to tasks + _active_tasks = getattr(loop, "_buffered_generator_tasks", None) + if _active_tasks is None: + _active_tasks = set() + loop._buffered_generator_tasks = _active_tasks # type: ignore[attr-defined] + _active_tasks.add(producer_task) + + # Remove from set when done + producer_task.add_done_callback(_active_tasks.discard) + + try: + # Wait for initial buffer to fill + await threshold_reached.wait() + + # Consume from buffer and yield + while True: + data = await buffer.get() + if data is None: + # End of stream + if producer_error: + raise producer_error + break + yield data + + finally: + # Ensure the producer task is cleaned up + if not producer_task.done(): + producer_task.cancel() + with contextlib.suppress(asyncio.CancelledError, RuntimeError): + await producer_task + + +def use_buffer( + buffer_size: int = DEFAULT_BUFFER_SIZE, + min_buffer_before_yield: int = DEFAULT_MIN_BUFFER_BEFORE_YIELD, +) -> Callable[ + [Callable[_P, AsyncGenerator[bytes, None]]], + Callable[_P, AsyncGenerator[bytes, None]], +]: + """ + Add buffering to async generator functions that yield bytes (decorator). + + This decorator uses an asyncio.Queue to decouple the producer (reading from the stream) + from the consumer (yielding to the client). The producer runs in a separate task and + fills the buffer, while the consumer yields from the buffer. + + Args: + buffer_size: Maximum number of chunks to buffer (default: 30) + min_buffer_before_yield: Minimum chunks to buffer before starting to yield (default: 5) + + Example: + @use_buffer(buffer_size=100) + async def my_stream() -> AsyncGenerator[bytes, None]: + ... + """ + + def decorator( + func: Callable[_P, AsyncGenerator[bytes, None]], + ) -> Callable[_P, AsyncGenerator[bytes, None]]: + @wraps(func) + async def wrapper(*args: _P.args, **kwargs: _P.kwargs) -> AsyncGenerator[bytes, None]: + async for chunk in buffered( + func(*args, **kwargs), + buffer_size=buffer_size, + min_buffer_before_yield=min_buffer_before_yield, + ): + yield chunk + + return wrapper + + return decorator diff --git a/music_assistant/helpers/util.py b/music_assistant/helpers/util.py index 2a373e35..212fe78e 100644 --- a/music_assistant/helpers/util.py +++ b/music_assistant/helpers/util.py @@ -796,7 +796,12 @@ def guard_single_request[ProviderT: "Provider | CoreController", **P, R]( cache_key_parts.append(f"{key}{kwargs[key]}") task_id = ".".join(map(str, cache_key_parts)) task: asyncio.Task[R] = mass.create_task( - func, self, *args, **kwargs, task_id=task_id, abort_existing=False + func, + self, + *args, + task_id=task_id, + abort_existing=False, + **kwargs, ) return await task diff --git a/music_assistant/providers/sonos/player.py b/music_assistant/providers/sonos/player.py index 32502053..1e252148 100644 --- a/music_assistant/providers/sonos/player.py +++ b/music_assistant/providers/sonos/player.py @@ -427,11 +427,9 @@ class SonosPlayer(Player): # Regular Queue item playback # create a sonos cloud queue and load it cloud_queue_url = f"{self.mass.streams.base_url}/sonos_queue/v2.3/" - track_data = self.provider._parse_sonos_queue_item(media) await self.client.player.group.play_cloud_queue( cloud_queue_url, item_id=media.queue_item_id, - track_metadata=track_data["track"], ) return diff --git a/music_assistant/providers/sonos/provider.py b/music_assistant/providers/sonos/provider.py index 0626f17c..8b28afc7 100644 --- a/music_assistant/providers/sonos/provider.py +++ b/music_assistant/providers/sonos/provider.py @@ -292,7 +292,7 @@ class SonosPlayerProvider(PlayerProvider): "service": {"name": "Music Assistant", "id": "mass"}, "name": media.title, "imageUrl": media.image_url, - "durationMillis": media.duration * 1000 if media.duration else 0, + "durationMillis": int(media.duration * 1000) if media.duration else 0, "artist": { "name": media.artist, }