from __future__ import annotations
import asyncio
+import gc
import logging
import os
import urllib.parse
from collections.abc import AsyncGenerator
from dataclasses import dataclass
-from typing import TYPE_CHECKING
+from typing import TYPE_CHECKING, Final
from aiofiles.os import wrap
from aiohttp import web
from music_assistant.helpers.audio import (
get_chunksize,
get_media_stream,
+ get_media_stream_with_buffer,
get_player_filter_params,
get_silence,
get_stream_details,
resample_pcm_audio,
)
+from music_assistant.helpers.buffered_generator import use_buffer
from music_assistant.helpers.ffmpeg import LOGGER as FFMPEG_LOGGER
from music_assistant.helpers.ffmpeg import check_ffmpeg_version, get_ffmpeg_stream
from music_assistant.helpers.smart_fades import (
from music_assistant.helpers.util import get_ip_addresses, select_free_port
from music_assistant.helpers.webserver import Webserver
from music_assistant.models.core_controller import CoreController
+from music_assistant.models.music_provider import MusicProvider
from music_assistant.models.plugin import PluginProvider
if TYPE_CHECKING:
isfile = wrap(os.path.isfile)
+CONF_ALLOW_BUFFER: Final[str] = "allow_buffering"
+
def parse_pcm_info(content_type: str) -> tuple[int, int, int]:
"""Parse PCM info from a codec/content_type string."""
"Make sure that this server can be reached "
"on the given IP and TCP port by players on the local network.",
),
+ ConfigEntry(
+ key=CONF_ALLOW_BUFFER,
+ type=ConfigEntryType.BOOLEAN,
+ default_value=False,
+ label="Allow (in-memory) buffering of (track) audio",
+ description="By default, Music Assistant tries to be as resource "
+ "efficient as possible when streaming audio, especially considering "
+ "low-end devices such as Raspberry Pi's. This means that audio "
+ "buffering is disabled by default to reduce memory usage. \n\n"
+ "Enabling this option allows for in-memory buffering of audio, "
+ "which (massively) improves playback (and seeking) performance but it comes "
+ "at the cost of increased memory usage. "
+ "If you run Music Assistant on a capable device with enough memory, "
+ "enabling this option is strongly recommended.",
+ required=False,
+ category="audio",
+ ),
ConfigEntry(
key=CONF_VOLUME_NORMALIZATION_RADIO,
type=ConfigEntryType.STRING,
audio_input = self.get_queue_item_stream(
queue_item=queue_item,
pcm_format=pcm_format,
+ seek_position=queue_item.streamdetails.seek_position,
)
# stream the audio
# this final ffmpeg process in the chain will convert the raw, lossless PCM audio into
input_format=pcm_format,
output_format=output_format,
),
- chunk_size=get_chunksize(output_format),
+ # we need to slowly feed the music to avoid the player stopping and later
+ # restarting (or completely failing) the audio stream by keeping the buffer short.
+ # this is reported to be an issue especially with Chromecast players.
+ # see for example: https://github.com/music-assistant/support/issues/3717
+ extra_input_args=["-readrate", "1.0", "-readrate_initial_burst", "2"],
):
try:
await resp.write(chunk)
filter_params=get_player_filter_params(
self.mass, queue_player.player_id, flow_pcm_format, output_format
),
+ # we need to slowly feed the music to avoid the player stopping and later
+ # restarting (or completely failing) the audio stream by keeping the buffer short.
+ # this is reported to be an issue especially with Chromecast players.
+ # see for example: https://github.com/music-assistant/support/issues/3717
+ extra_input_args=["-readrate", "1.0", "-readrate_initial_burst", "5"],
chunk_size=icy_meta_interval if enable_icy else get_chunksize(output_format),
):
try:
)
http_profile = str(http_profile_value) if http_profile_value is not None else "default"
if http_profile == "forced_content_length":
- # guess content length based on duration
+ # just set an insanely high content length to make sure the player keeps playing
resp.content_length = get_chunksize(output_format, 12 * 3600)
elif http_profile == "chunked":
resp.enable_chunked_encoding()
# like https hosts and it also offers the pre-announce 'bell'
return f"{self.base_url}/announcement/{player_id}.{content_type.value}"
+ @use_buffer(30, 4)
async def get_queue_flow_stream(
self,
queue: PlayerQueue,
async for chunk in self.get_queue_item_stream(
queue_track,
pcm_format=pcm_format,
+ seek_position=queue_track.streamdetails.seek_position,
):
# buffer size needs to be big enough to include the crossfade part
req_buffer_size = (
self,
queue_item: QueueItem,
pcm_format: AudioFormat,
+ seek_position: int = 0,
) -> AsyncGenerator[bytes, None]:
"""Get the (PCM) audio stream for a single queue item."""
# collect all arguments for ffmpeg
filter_params.append(f"volume={gain_correct}dB")
streamdetails.volume_normalization_gain_correct = gain_correct
- first_chunk_received = False
- async for chunk in get_media_stream(
- self.mass,
- streamdetails=streamdetails,
- pcm_format=pcm_format,
- filter_params=filter_params,
- ):
- if not first_chunk_received:
- first_chunk_received = True
- # inform the queue that the track is now loaded in the buffer
- # so for example the next track can be enqueued
- self.mass.player_queues.track_loaded_in_buffer(
- queue_item.queue_id, queue_item.queue_item_id
- )
- yield chunk
- del chunk
+ allow_buffer = bool(
+ self.mass.config.get_raw_core_config_value(self.domain, CONF_ALLOW_BUFFER, False)
+ and streamdetails.duration
+ )
+
+ self.logger.debug(
+ "Starting queue item stream for %s (%s)"
+ " - using buffer: %s"
+ " - using fade-in: %s"
+ " - volume normalization: %s",
+ queue_item.name,
+ queue_item.uri,
+ allow_buffer,
+ streamdetails.fade_in,
+ streamdetails.volume_normalization_mode,
+ )
+ if allow_buffer:
+ media_stream_gen = get_media_stream_with_buffer(
+ self.mass,
+ streamdetails=streamdetails,
+ pcm_format=pcm_format,
+ seek_position=int(seek_position),
+ filter_params=filter_params,
+ )
+ else:
+ media_stream_gen = get_media_stream(
+ self.mass,
+ streamdetails=streamdetails,
+ pcm_format=pcm_format,
+ seek_position=int(seek_position),
+ filter_params=filter_params,
+ )
+ first_chunk_received = False
+ fade_in_buffer = b""
+ bytes_received = 0
+ aborted = False
+ try:
+ async for chunk in media_stream_gen:
+ bytes_received += len(chunk)
+ if not first_chunk_received:
+ first_chunk_received = True
+ # inform the queue that the track is now loaded in the buffer
+ # so for example the next track can be enqueued
+ self.mass.player_queues.track_loaded_in_buffer(
+ queue_item.queue_id, queue_item.queue_item_id
+ )
+ # handle optional fade-in
+ if streamdetails.fade_in:
+ if len(fade_in_buffer) < pcm_format.pcm_sample_size * 4:
+ fade_in_buffer += chunk
+ elif fade_in_buffer:
+ async for fade_chunk in get_ffmpeg_stream(
+ audio_input=fade_in_buffer + chunk,
+ input_format=pcm_format,
+ output_format=pcm_format,
+ filter_params=["afade=type=in:start_time=0:duration=3"],
+ ):
+ yield fade_chunk
+ fade_in_buffer = b""
+ streamdetails.fade_in = False
+ else:
+ yield chunk
+ # help garbage collection by explicitly deleting chunk
+ del chunk
+ except (Exception, GeneratorExit):
+ aborted = True
+ raise
+ finally:
+ # determine how many seconds we've streamed
+ # for pcm output we can calculate this easily
+ seconds_streamed = bytes_received / pcm_format.pcm_sample_size
+ streamdetails.seconds_streamed = seconds_streamed
+ self.logger.debug(
+ "stream %s for %s - seconds streamed: %s",
+ "aborted" if aborted else "finished",
+ streamdetails.uri,
+ seconds_streamed,
+ )
+ # report stream to provider
+ if (not aborted and seconds_streamed >= 30) and (
+ music_prov := self.mass.get_provider(streamdetails.provider)
+ ):
+ if TYPE_CHECKING: # avoid circular import
+ assert isinstance(music_prov, MusicProvider)
+ self.mass.create_task(music_prov.on_streamed(streamdetails))
+ # Run garbage collection in executor to reclaim memory from large buffers
+ loop = asyncio.get_running_loop()
+ await loop.run_in_executor(None, gc.collect)
+
+ @use_buffer(30, 4)
async def get_queue_item_stream_with_smartfade(
self,
queue_item: QueueItem,
crossfade_size = int(pcm_format.pcm_sample_size * standard_crossfade_duration + 4)
fade_out_data: bytes | None = None
- async for chunk in self.get_queue_item_stream(queue_item, pcm_format):
+ if crossfade_data:
+ discard_seconds = int(crossfade_data.fade_in_size / pcm_format.pcm_sample_size) - 1
+ discard_bytes = int(discard_seconds * pcm_format.pcm_sample_size)
+ discard_leftover = int(crossfade_data.fade_in_size - discard_bytes)
+ else:
+ discard_seconds = streamdetails.seek_position
+ discard_leftover = 0
+
+ async for chunk in self.get_queue_item_stream(
+ queue_item, pcm_format, seek_position=discard_seconds
+ ):
# ALWAYS APPEND CHUNK TO BUFFER
buffer += chunk
del chunk
#### HANDLE CROSSFADE DATA FROM PREVIOUS TRACK
if crossfade_data:
- # discard the fade_in_part from the crossfade data
- buffer = buffer[crossfade_data.fade_in_size :]
+ # discard the fade_in_part from the crossfade data (minus what we already seeked)
+ buffer = buffer[discard_leftover:]
# send the (second half of the) crossfade data
if crossfade_data.pcm_format != pcm_format:
# pcm format mismatch, we need to resample the crossfade data
from music_assistant.helpers.throttle_retry import BYPASS_THROTTLER
from music_assistant.helpers.util import clean_stream_title, remove_file
+from .audio_buffer import AudioBuffer
from .datetime import utc
from .dsp import filter_to_ffmpeg_params
from .ffmpeg import FFMpeg, get_ffmpeg_stream
raise MediaNotFoundError(
f"Unable to retrieve streamdetails for {queue_item.name} ({queue_item.uri})"
)
- if (
- queue_item.streamdetails
- and (utc() - queue_item.streamdetails.created_at).seconds < STREAMDETAILS_EXPIRATION
+ if queue_item.streamdetails and (
+ (utc() - queue_item.streamdetails.created_at).seconds < STREAMDETAILS_EXPIRATION
+ or queue_item.streamdetails.buffer
):
# already got a fresh/unused (or cached) streamdetails
# we assume that the streamdetails are valid for max STREAMDETAILS_EXPIRATION seconds
resolved_url, stream_type = await resolve_radio_stream(mass, streamdetails.path)
streamdetails.path = resolved_url
streamdetails.stream_type = stream_type
+ # handle volume normalization details
+ if result := await mass.music.get_loudness(
+ streamdetails.item_id,
+ streamdetails.provider,
+ media_type=queue_item.media_type,
+ ):
+ streamdetails.loudness = result[0]
+ streamdetails.loudness_album = result[1]
# set queue_id on the streamdetails so we know what is being streamed
streamdetails.queue_id = queue_item.queue_id
streamdetails.fade_in = fade_in
if not streamdetails.duration:
streamdetails.duration = queue_item.duration
-
- # handle volume normalization details
- if result := await mass.music.get_loudness(
- streamdetails.item_id,
- streamdetails.provider,
- media_type=queue_item.media_type,
- ):
- streamdetails.loudness = result[0]
- streamdetails.loudness_album = result[1]
streamdetails.prefer_album_loudness = prefer_album_loudness
player_settings = await mass.config.get_player_config(streamdetails.queue_id)
core_config = await mass.config.get_core_config("streams")
return streamdetails
+async def get_media_stream_with_buffer(
+ mass: MusicAssistant,
+ streamdetails: StreamDetails,
+ pcm_format: AudioFormat,
+ seek_position: int = 0,
+ filter_params: list[str] | None = None,
+) -> AsyncGenerator[bytes, None]:
+ """Get audio stream for given media details as raw PCM with buffering."""
+ LOGGER.debug(
+ "get_media_stream_with_buffer: Starting for %s (seek: %s)",
+ streamdetails.uri,
+ seek_position,
+ )
+
+ # checksum based on pcm_format and filter_params
+ checksum = f"{pcm_format}-{filter_params}"
+
+ async def fill_buffer_task() -> None:
+ """Background task to fill the audio buffer."""
+ chunk_count = 0
+ try:
+ async for chunk in get_media_stream(
+ mass, streamdetails, pcm_format, seek_position=0, filter_params=filter_params
+ ):
+ chunk_count += 1
+ await audio_buffer.put(chunk)
+ except asyncio.CancelledError:
+ LOGGER.log(
+ VERBOSE_LOG_LEVEL,
+ "fill_buffer_task: Cancelled after %s chunks for %s",
+ chunk_count,
+ streamdetails.uri,
+ )
+ raise
+ except Exception as err:
+ LOGGER.error(
+ "fill audio buffer task: Error after %s chunks for %s: %s",
+ chunk_count,
+ streamdetails.uri,
+ err,
+ )
+ finally:
+ await audio_buffer.set_eof()
+ LOGGER.log(
+ VERBOSE_LOG_LEVEL,
+ "fill_buffer_task: Completed (%s chunks) for %s",
+ chunk_count,
+ streamdetails.uri,
+ )
+
+ # check for existing buffer and reuse if possible
+ existing_buffer: AudioBuffer | None = streamdetails.buffer
+ if existing_buffer is not None:
+ if not existing_buffer.is_valid(checksum, seek_position):
+ LOGGER.log(
+ VERBOSE_LOG_LEVEL,
+ "get_media_stream_with_buffer: Existing buffer invalid for %s "
+ "(seek: %s, discarded: %s)",
+ streamdetails.uri,
+ seek_position,
+ existing_buffer._discarded_chunks,
+ )
+ await existing_buffer.clear()
+ streamdetails.buffer = None
+ existing_buffer = None
+ else:
+ LOGGER.debug(
+ "get_media_stream_with_buffer: Reusing existing buffer for %s - "
+ "available: %ss, seek: %s, discarded: %s",
+ streamdetails.uri,
+ existing_buffer.seconds_available,
+ seek_position,
+ existing_buffer._discarded_chunks,
+ )
+ audio_buffer = existing_buffer
+
+ if not existing_buffer and seek_position > 60:
+ # If seeking into the track and no valid buffer exists,
+ # just start a normal stream without buffering,
+ # otherwise we would need to fill the buffer up to the seek position first
+ # which is not efficient.
+ LOGGER.debug(
+ "get_media_stream_with_buffer: No existing buffer and seek >60s for %s, "
+ "starting normal stream",
+ streamdetails.uri,
+ )
+ async for chunk in get_media_stream(
+ mass,
+ streamdetails,
+ pcm_format,
+ seek_position=seek_position,
+ filter_params=filter_params,
+ ):
+ yield chunk
+ return
+
+ if not existing_buffer:
+ # create new audio buffer and start fill task
+ LOGGER.debug(
+ "get_media_stream_with_buffer: Creating new buffer for %s",
+ streamdetails.uri,
+ )
+ audio_buffer = AudioBuffer(pcm_format, checksum)
+ streamdetails.buffer = audio_buffer
+ task = mass.loop.create_task(fill_buffer_task())
+ audio_buffer.attach_fill_task(task)
+
+ # yield data from the buffer
+ chunk_count = 0
+ try:
+ async for chunk in audio_buffer.iter(seek_position=seek_position):
+ chunk_count += 1
+ yield chunk
+ finally:
+ LOGGER.log(
+ VERBOSE_LOG_LEVEL,
+ "get_media_stream_with_buffer: Completed, yielded %s chunks",
+ chunk_count,
+ )
+
+
async def get_media_stream(
mass: MusicAssistant,
streamdetails: StreamDetails,
pcm_format: AudioFormat,
+ seek_position: int = 0,
filter_params: list[str] | None = None,
) -> AsyncGenerator[bytes, None]:
"""Get audio stream for given media details as raw PCM."""
logger = LOGGER.getChild("media_stream")
logger.log(VERBOSE_LOG_LEVEL, "Starting media stream for %s", streamdetails.uri)
extra_input_args = streamdetails.extra_input_args or []
- if filter_params is None:
- filter_params = []
- if streamdetails.fade_in:
- filter_params.append("afade=type=in:start_time=0:duration=3")
- seek_position = streamdetails.seek_position
# work out audio source for these streamdetails
audio_source: str | AsyncGenerator[bytes, None]
stream_type = streamdetails.stream_type
await ffmpeg_proc.start()
assert ffmpeg_proc.proc is not None # for type checking
logger.debug(
- "Started media stream for %s"
- " - using streamtype: %s"
- " - volume normalization: %s"
- " - output format: %s"
- " - ffmpeg PID: %s",
+ "Started media stream for %s - using streamtype: %s - pcm format: %s - ffmpeg PID: %s",
streamdetails.uri,
streamdetails.stream_type,
- streamdetails.volume_normalization_mode,
pcm_format.content_type.value,
ffmpeg_proc.proc.pid,
)
first_chunk_received = True
streamdetails.audio_format.codec_type = ffmpeg_proc.input_format.codec_type
logger.debug(
- "First chunk received after %s seconds",
+ "First chunk received after %s seconds (codec detected: %s)",
mass.loop.time() - stream_start,
+ ffmpeg_proc.input_format.codec_type,
)
yield chunk
bytes_sent += len(chunk)
finally:
# always ensure close is called which also handles all cleanup
await ffmpeg_proc.close()
- # determine how many seconds we've streamed
+ # determine how many seconds we've received
# for pcm output we can calculate this easily
- seconds_streamed = bytes_sent / pcm_format.pcm_sample_size if bytes_sent else 0
- streamdetails.seconds_streamed = seconds_streamed
+ seconds_received = bytes_sent / pcm_format.pcm_sample_size if bytes_sent else 0
# store accurate duration
- if finished and not streamdetails.seek_position and seconds_streamed:
- streamdetails.duration = int(seconds_streamed)
+ if finished and not seek_position and seconds_received:
+ streamdetails.duration = int(seconds_received)
- logger.debug(
- "stream %s (with code %s) for %s - seconds streamed: %s",
+ logger.log(
+ VERBOSE_LOG_LEVEL,
+ "stream %s (with code %s) for %s",
"cancelled" if cancelled else "finished" if finished else "aborted",
ffmpeg_proc.returncode,
streamdetails.uri,
- seconds_streamed,
)
# parse loudnorm data if we have that collected (and enabled)
if (
(streamdetails.loudness is None or finished)
and streamdetails.volume_normalization_mode == VolumeNormalizationMode.DYNAMIC
- and (finished or (seconds_streamed >= 300))
+ and (finished or (seconds_received >= 300))
):
# if dynamic volume normalization is enabled and the entire track is streamed
# the loudnorm filter will output the measurement in the log,
streamdetails.uri,
loudness_details,
)
- streamdetails.loudness = loudness_details
mass.create_task(
mass.music.set_loudness(
streamdetails.item_id,
VolumeNormalizationMode.DISABLED,
VolumeNormalizationMode.FIXED_GAIN,
)
- and (finished or (seconds_streamed >= 300))
+ and (finished or (seconds_received >= 300))
):
# dynamic mode not allowed and no measurement known, we need to analyze the audio
# add background task to start analyzing the audio
task_id = f"analyze_loudness_{streamdetails.uri}"
mass.call_later(5, analyze_loudness, mass, streamdetails, task_id=task_id)
- # report stream to provider
- if (finished or seconds_streamed >= 30) and (
- music_prov := mass.get_provider(streamdetails.provider)
- ):
- if TYPE_CHECKING: # avoid circular import
- assert isinstance(music_prov, MusicProvider)
- mass.create_task(music_prov.on_streamed(streamdetails))
-
def create_wave_header(
samplerate: int = 44100, channels: int = 2, bitspersample: int = 16, duration: int | None = None
streamdetails: StreamDetails,
) -> None:
"""Analyze media item's audio, to calculate EBU R128 loudness."""
- if result := await mass.music.get_loudness(
+ if await mass.music.get_loudness(
streamdetails.item_id,
streamdetails.provider,
media_type=streamdetails.media_type,
):
# only when needed we do the analyze job
- streamdetails.loudness = result[0]
- streamdetails.loudness_album = result[1]
return
logger = LOGGER.getChild("analyze_loudness")
log_lines_str or "received empty value",
)
else:
- streamdetails.loudness = loudness
await mass.music.set_loudness(
streamdetails.item_id,
streamdetails.provider,
--- /dev/null
+"""Audio buffer implementation for PCM audio streaming."""
+
+from __future__ import annotations
+
+import asyncio
+import gc
+import logging
+import time
+from collections import deque
+from collections.abc import AsyncGenerator
+from contextlib import suppress
+from typing import TYPE_CHECKING, Any
+
+from music_assistant_models.errors import AudioError
+
+from music_assistant.constants import MASS_LOGGER_NAME, VERBOSE_LOG_LEVEL
+
+if TYPE_CHECKING:
+ from music_assistant_models.media_items import AudioFormat
+
+LOGGER = logging.getLogger(f"{MASS_LOGGER_NAME}.audio_buffer")
+
+DEFAULT_MAX_BUFFER_SIZE_SECONDS: int = 60 * 5 # 5 minutes
+
+
+class AudioBuffer:
+ """Simple buffer to hold (PCM) audio chunks with seek capability.
+
+ Each chunk represents exactly 1 second of audio.
+ Chunks are stored in a deque for efficient O(1) append and popleft operations.
+ """
+
+ def __init__(
+ self,
+ pcm_format: AudioFormat,
+ checksum: str,
+ max_size_seconds: int = DEFAULT_MAX_BUFFER_SIZE_SECONDS,
+ ) -> None:
+ """
+ Initialize AudioBuffer.
+
+ Args:
+ pcm_format: The PCM audio format specification
+ checksum: The checksum for the audio data (for validation purposes)
+ max_size_seconds: Maximum buffer size in seconds
+ """
+ self.pcm_format = pcm_format
+ self.checksum = checksum
+ self.max_size_seconds = max_size_seconds
+ # Store chunks in a deque for O(1) append and popleft operations
+ self._chunks: deque[bytes] = deque()
+ # Track how many chunks have been discarded from the start
+ self._discarded_chunks = 0
+ self._lock = asyncio.Lock()
+ self._data_available = asyncio.Condition(self._lock)
+ self._space_available = asyncio.Condition(self._lock)
+ self._eof_received = False
+ self._buffer_fill_task: asyncio.Task[None] | None = None
+ self._last_access_time: float = time.time()
+ self._inactivity_task: asyncio.Task[None] | None = None
+ self._cancelled = False # Set to True when buffer is cleared/cancelled
+
+ @property
+ def cancelled(self) -> bool:
+ """Return whether the buffer has been cancelled or cleared."""
+ if self._cancelled:
+ return True
+ return self._buffer_fill_task is not None and self._buffer_fill_task.cancelled()
+
+ @property
+ def chunk_size_bytes(self) -> int:
+ """Return the size in bytes of one second of PCM audio."""
+ return self.pcm_format.pcm_sample_size
+
+ @property
+ def size_seconds(self) -> int:
+ """Return current size of the buffer in seconds."""
+ return len(self._chunks)
+
+ @property
+ def seconds_available(self) -> int:
+ """Return number of seconds of audio currently available in the buffer."""
+ return len(self._chunks)
+
+ def is_valid(self, checksum: str, seek_position: int = 0) -> bool:
+ """
+ Validate the buffer's checksum and check if seek position is available.
+
+ Args:
+ checksum: The checksum to validate against
+ seek_position: The position we want to seek to (0-based)
+
+ Returns:
+ True if buffer is valid and seek position is available
+ """
+ if self.cancelled:
+ return False
+
+ if self.checksum != checksum:
+ return False
+
+ # Check if buffer is close to inactivity timeout (within 30 seconds)
+ # to prevent race condition where buffer gets cleared right after validation
+ time_since_access = time.time() - self._last_access_time
+ inactivity_timeout = 60 * 5 # 5 minutes
+ if time_since_access > (inactivity_timeout - 30):
+ # Buffer is close to being cleared, don't reuse it
+ return False
+
+ # Check if the seek position has already been discarded
+ return seek_position >= self._discarded_chunks
+
+ async def put(self, chunk: bytes) -> None:
+ """
+ Put a chunk of data into the buffer.
+
+ Each chunk represents exactly 1 second of PCM audio.
+ Waits if buffer is full.
+
+ Args:
+ chunk: Bytes representing 1 second of PCM audio
+ """
+ async with self._space_available:
+ # Wait until there's space in the buffer
+ while len(self._chunks) >= self.max_size_seconds and not self._eof_received:
+ if LOGGER.isEnabledFor(VERBOSE_LOG_LEVEL):
+ LOGGER.log(
+ VERBOSE_LOG_LEVEL,
+ "AudioBuffer.put: Buffer full (%s/%s), waiting for space...",
+ len(self._chunks),
+ self.max_size_seconds,
+ )
+ await self._space_available.wait()
+
+ if self._eof_received:
+ # Don't accept new data after EOF
+ LOGGER.log(
+ VERBOSE_LOG_LEVEL, "AudioBuffer.put: EOF already received, rejecting chunk"
+ )
+ return
+
+ # Add chunk to the list (index = second position)
+ self._chunks.append(chunk)
+ if LOGGER.isEnabledFor(VERBOSE_LOG_LEVEL):
+ LOGGER.log(
+ VERBOSE_LOG_LEVEL,
+ "AudioBuffer.put: Added chunk at position %s (size: %s bytes, buffer size: %s)",
+ self._discarded_chunks + len(self._chunks) - 1,
+ len(chunk),
+ len(self._chunks),
+ )
+
+ # Notify waiting consumers
+ self._data_available.notify_all()
+
+ async def get(self, chunk_number: int = 0) -> bytes:
+ """
+ Get one second of data from the buffer at the specified chunk number.
+
+ Waits until requested chunk is available.
+ Discards old chunks if buffer is full.
+
+ Args:
+ chunk_number: The chunk index to retrieve (0-based, absolute position).
+
+ Returns:
+ Bytes containing one second of audio data
+
+ Raises:
+ AudioError: If EOF is reached before chunk is available or
+ if chunk has been discarded
+ """
+ # Update last access time
+ self._last_access_time = time.time()
+
+ async with self._data_available:
+ # Check if the chunk was already discarded
+ if chunk_number < self._discarded_chunks:
+ msg = (
+ f"Chunk {chunk_number} has been discarded "
+ f"(buffer starts at {self._discarded_chunks})"
+ )
+ raise AudioError(msg)
+
+ # Wait until the requested chunk is available or EOF
+ buffer_index = chunk_number - self._discarded_chunks
+ while buffer_index >= len(self._chunks):
+ if self._eof_received:
+ raise AudioError("EOF")
+ await self._data_available.wait()
+ buffer_index = chunk_number - self._discarded_chunks
+
+ # If buffer is at max size, discard the oldest chunk to make room
+ if len(self._chunks) >= self.max_size_seconds:
+ discarded = self._chunks.popleft() # O(1) operation with deque
+ self._discarded_chunks += 1
+ if LOGGER.isEnabledFor(VERBOSE_LOG_LEVEL):
+ LOGGER.log(
+ VERBOSE_LOG_LEVEL,
+ "AudioBuffer.get: Discarded chunk %s (size: %s bytes) to free space",
+ self._discarded_chunks - 1,
+ len(discarded),
+ )
+ # Notify producers waiting for space
+ self._space_available.notify_all()
+ # Recalculate buffer index after discard
+ buffer_index = chunk_number - self._discarded_chunks
+
+ # Return the chunk at the requested index
+ return self._chunks[buffer_index]
+
+ async def iter(self, seek_position: int = 0) -> AsyncGenerator[bytes, None]:
+ """
+ Iterate over seconds of audio data until EOF.
+
+ Args:
+ seek_position: Optional starting position in seconds (default: 0).
+
+ Yields:
+ Bytes containing one second of audio data
+ """
+ chunk_number = seek_position
+ while True:
+ try:
+ yield await self.get(chunk_number=chunk_number)
+ chunk_number += 1
+ except AudioError:
+ break # EOF reached
+
+ async def clear(self) -> None:
+ """Reset the buffer completely, clearing all data."""
+ chunk_count = len(self._chunks)
+ LOGGER.log(
+ VERBOSE_LOG_LEVEL,
+ "AudioBuffer.clear: Resetting buffer (had %s chunks, has fill task: %s)",
+ chunk_count,
+ self._buffer_fill_task is not None,
+ )
+ if self._buffer_fill_task:
+ self._buffer_fill_task.cancel()
+ with suppress(asyncio.CancelledError):
+ await self._buffer_fill_task
+ if self._inactivity_task:
+ self._inactivity_task.cancel()
+ with suppress(asyncio.CancelledError):
+ await self._inactivity_task
+ async with self._lock:
+ self._chunks.clear()
+ self._discarded_chunks = 0
+ self._eof_received = False
+ self._cancelled = True # Mark buffer as cancelled
+ # Notify all waiting tasks
+ self._data_available.notify_all()
+ self._space_available.notify_all()
+
+ # Run garbage collection in executor to reclaim memory from large buffers
+ loop = asyncio.get_running_loop()
+ await loop.run_in_executor(None, gc.collect)
+
+ async def set_eof(self) -> None:
+ """Signal that no more data will be added to the buffer."""
+ async with self._lock:
+ LOGGER.log(
+ VERBOSE_LOG_LEVEL,
+ "AudioBuffer.set_eof: Marking EOF (buffer has %s chunks)",
+ len(self._chunks),
+ )
+ self._eof_received = True
+ # Wake up all waiting consumers and producers
+ self._data_available.notify_all()
+ self._space_available.notify_all()
+
+ async def _monitor_inactivity(self) -> None:
+ """Monitor buffer for inactivity and clear if inactive for 5 minutes."""
+ inactivity_timeout = 60 * 5 # 5 minutes
+ check_interval = 30 # Check every 30 seconds
+ while True:
+ await asyncio.sleep(check_interval)
+
+ # Check if buffer has been inactive (no data and no activity)
+ time_since_access = time.time() - self._last_access_time
+
+ # If buffer is empty and hasn't been accessed for timeout period,
+ # it likely means the producer failed or stream was abandoned
+ if len(self._chunks) == 0 and time_since_access > inactivity_timeout:
+ LOGGER.log(
+ VERBOSE_LOG_LEVEL,
+ "AudioBuffer: Empty buffer with no activity for %.1f seconds, "
+ "clearing (likely abandoned stream)",
+ time_since_access,
+ )
+ await self.clear()
+ break # Stop monitoring after clearing
+
+ # If buffer has data but hasn't been consumed, clear it
+ if len(self._chunks) > 0 and time_since_access > inactivity_timeout:
+ LOGGER.log(
+ VERBOSE_LOG_LEVEL,
+ "AudioBuffer: No activity for %.1f seconds, clearing buffer (had %s chunks)",
+ time_since_access,
+ len(self._chunks),
+ )
+ await self.clear()
+ break # Stop monitoring after clearing
+
+ def attach_fill_task(self, task: asyncio.Task[Any]) -> None:
+ """Attach a background task that fills the buffer."""
+ self._buffer_fill_task = task
+
+ # Start inactivity monitor if not already running
+ if self._inactivity_task is None or self._inactivity_task.done():
+ self._last_access_time = time.time() # Initialize access time
+ loop = asyncio.get_running_loop()
+ self._inactivity_task = loop.create_task(self._monitor_inactivity())
--- /dev/null
+"""Helper for adding buffering to async generators."""
+
+from __future__ import annotations
+
+import asyncio
+import contextlib
+from collections.abc import AsyncGenerator, Callable
+from functools import wraps
+from typing import Final, ParamSpec
+
+# Type variables for the buffered decorator
+_P = ParamSpec("_P")
+
+DEFAULT_BUFFER_SIZE: Final = 30
+DEFAULT_MIN_BUFFER_BEFORE_YIELD: Final = 5
+
+
+async def buffered(
+ generator: AsyncGenerator[bytes, None],
+ buffer_size: int = DEFAULT_BUFFER_SIZE,
+ min_buffer_before_yield: int = DEFAULT_MIN_BUFFER_BEFORE_YIELD,
+) -> AsyncGenerator[bytes, None]:
+ """
+ Add buffering to an async generator that yields bytes.
+
+ This function uses an asyncio.Queue to decouple the producer (reading from the stream)
+ from the consumer (yielding to the client). The producer runs in a separate task and
+ fills the buffer, while the consumer yields from the buffer.
+
+ Args:
+ generator: The async generator to buffer
+ buffer_size: Maximum number of chunks to buffer (default: 30)
+ min_buffer_before_yield: Minimum chunks to buffer before starting to yield (default: 5)
+
+ Example:
+ async for chunk in buffered(my_generator(), buffer_size=100):
+ process(chunk)
+ """
+ buffer: asyncio.Queue[bytes | None] = asyncio.Queue(maxsize=buffer_size)
+ producer_error: Exception | None = None
+ threshold_reached = asyncio.Event()
+
+ if buffer_size <= 1:
+ # No buffering needed, yield directly
+ async for chunk in generator:
+ yield chunk
+ return
+
+ async def producer() -> None:
+ """Read from the original generator and fill the buffer."""
+ nonlocal producer_error
+ try:
+ async for chunk in generator:
+ await buffer.put(chunk)
+ if not threshold_reached.is_set() and buffer.qsize() >= min_buffer_before_yield:
+ threshold_reached.set()
+ except asyncio.CancelledError:
+ # Task was cancelled, clean up the generator
+ with contextlib.suppress(RuntimeError, asyncio.CancelledError):
+ await generator.aclose()
+ raise
+ except Exception as err:
+ producer_error = err
+ # Consumer probably stopped consuming, close the original generator
+ with contextlib.suppress(RuntimeError, asyncio.CancelledError):
+ await generator.aclose()
+ finally:
+ threshold_reached.set()
+ # Signal end of stream by putting None
+ with contextlib.suppress(asyncio.QueueFull):
+ await buffer.put(None)
+
+ # Start the producer task
+ loop = asyncio.get_running_loop()
+ producer_task = loop.create_task(producer())
+
+ # Keep a strong reference to prevent garbage collection issues
+ # The event loop only keeps weak references to tasks
+ _active_tasks = getattr(loop, "_buffered_generator_tasks", None)
+ if _active_tasks is None:
+ _active_tasks = set()
+ loop._buffered_generator_tasks = _active_tasks # type: ignore[attr-defined]
+ _active_tasks.add(producer_task)
+
+ # Remove from set when done
+ producer_task.add_done_callback(_active_tasks.discard)
+
+ try:
+ # Wait for initial buffer to fill
+ await threshold_reached.wait()
+
+ # Consume from buffer and yield
+ while True:
+ data = await buffer.get()
+ if data is None:
+ # End of stream
+ if producer_error:
+ raise producer_error
+ break
+ yield data
+
+ finally:
+ # Ensure the producer task is cleaned up
+ if not producer_task.done():
+ producer_task.cancel()
+ with contextlib.suppress(asyncio.CancelledError, RuntimeError):
+ await producer_task
+
+
+def use_buffer(
+ buffer_size: int = DEFAULT_BUFFER_SIZE,
+ min_buffer_before_yield: int = DEFAULT_MIN_BUFFER_BEFORE_YIELD,
+) -> Callable[
+ [Callable[_P, AsyncGenerator[bytes, None]]],
+ Callable[_P, AsyncGenerator[bytes, None]],
+]:
+ """
+ Add buffering to async generator functions that yield bytes (decorator).
+
+ This decorator uses an asyncio.Queue to decouple the producer (reading from the stream)
+ from the consumer (yielding to the client). The producer runs in a separate task and
+ fills the buffer, while the consumer yields from the buffer.
+
+ Args:
+ buffer_size: Maximum number of chunks to buffer (default: 30)
+ min_buffer_before_yield: Minimum chunks to buffer before starting to yield (default: 5)
+
+ Example:
+ @use_buffer(buffer_size=100)
+ async def my_stream() -> AsyncGenerator[bytes, None]:
+ ...
+ """
+
+ def decorator(
+ func: Callable[_P, AsyncGenerator[bytes, None]],
+ ) -> Callable[_P, AsyncGenerator[bytes, None]]:
+ @wraps(func)
+ async def wrapper(*args: _P.args, **kwargs: _P.kwargs) -> AsyncGenerator[bytes, None]:
+ async for chunk in buffered(
+ func(*args, **kwargs),
+ buffer_size=buffer_size,
+ min_buffer_before_yield=min_buffer_before_yield,
+ ):
+ yield chunk
+
+ return wrapper
+
+ return decorator
cache_key_parts.append(f"{key}{kwargs[key]}")
task_id = ".".join(map(str, cache_key_parts))
task: asyncio.Task[R] = mass.create_task(
- func, self, *args, **kwargs, task_id=task_id, abort_existing=False
+ func,
+ self,
+ *args,
+ task_id=task_id,
+ abort_existing=False,
+ **kwargs,
)
return await task
# Regular Queue item playback
# create a sonos cloud queue and load it
cloud_queue_url = f"{self.mass.streams.base_url}/sonos_queue/v2.3/"
- track_data = self.provider._parse_sonos_queue_item(media)
await self.client.player.group.play_cloud_queue(
cloud_queue_url,
item_id=media.queue_item_id,
- track_metadata=track_data["track"],
)
return
"service": {"name": "Music Assistant", "id": "mass"},
"name": media.title,
"imageUrl": media.image_url,
- "durationMillis": media.duration * 1000 if media.duration else 0,
+ "durationMillis": int(media.duration * 1000) if media.duration else 0,
"artist": {
"name": media.artist,
}