async def fill_buffer_task() -> None:
"""Background task to fill the audio buffer."""
chunk_count = 0
+ status = "running"
try:
async for chunk in get_media_stream(
mass, streamdetails, pcm_format, seek_position=0, filter_params=filter_params
chunk_count += 1
await audio_buffer.put(chunk)
except asyncio.CancelledError:
- LOGGER.log(
- VERBOSE_LOG_LEVEL,
- "fill_buffer_task: Cancelled after %s chunks for %s",
- chunk_count,
- streamdetails.uri,
- )
+ status = "cancelled"
+ raise
+ except Exception:
+ status = "aborted with error"
raise
- except Exception as err:
- LOGGER.error(
- "fill audio buffer task: Error after %s chunks for %s: %s",
- chunk_count,
- streamdetails.uri,
- err,
- )
finally:
await audio_buffer.set_eof()
LOGGER.log(
VERBOSE_LOG_LEVEL,
- "fill_buffer_task: Completed (%s chunks) for %s",
+ "fill_buffer_task: %s (%s chunks) for %s",
+ status,
chunk_count,
streamdetails.uri,
)
audio_buffer = AudioBuffer(pcm_format, checksum)
streamdetails.buffer = audio_buffer
task = mass.loop.create_task(fill_buffer_task())
- audio_buffer.attach_fill_task(task)
+ audio_buffer.attach_producer_task(task)
# special case: pcm format mismatch, resample on the fly
# this may happen in some special situations such as crossfading
DEFAULT_MAX_BUFFER_SIZE_SECONDS: int = 60 * 8 # 8 minutes
+class AudioBufferEOF(Exception):
+ """Exception raised when the audio buffer reaches end-of-file."""
+
+
class AudioBuffer:
"""Simple buffer to hold (PCM) audio chunks with seek capability.
self._data_available = asyncio.Condition(self._lock)
self._space_available = asyncio.Condition(self._lock)
self._eof_received = False
- self._buffer_fill_task: asyncio.Task[None] | None = None
+ self._producer_task: asyncio.Task[None] | None = None
self._last_access_time: float = time.time()
self._inactivity_task: asyncio.Task[None] | None = None
self._cancelled = False # Set to True when buffer is cleared/cancelled
+ self._producer_error: Exception | None = None
@property
def cancelled(self) -> bool:
"""Return whether the buffer has been cancelled or cleared."""
if self._cancelled:
return True
- return self._buffer_fill_task is not None and self._buffer_fill_task.cancelled()
+ return self._producer_task is not None and self._producer_task.cancelled()
@staticmethod
def _cleanup_chunks(chunks: deque[bytes]) -> None:
Bytes containing one second of audio data
Raises:
- AudioError: If EOF is reached before chunk is available or
- if chunk has been discarded
+ AudioBufferEOF: If EOF is reached before chunk is available
+ AudioError: If chunk has been discarded
+ Exception: Any exception that occurred in the producer task
"""
# Update last access time
self._last_access_time = time.time()
buffer_index = chunk_number - self._discarded_chunks
while buffer_index >= len(self._chunks):
if self._eof_received:
- raise AudioError("EOF")
+ # Check if producer had an error before raising EOF
+ if self._producer_error:
+ raise self._producer_error
+ raise AudioBufferEOF
await self._data_available.wait()
buffer_index = chunk_number - self._discarded_chunks
try:
yield await self.get(chunk_number=chunk_number)
chunk_number += 1
- except AudioError:
+ except AudioBufferEOF:
break # EOF reached
- async def clear(self) -> None:
+ async def clear(self, cancel_inactivity_task: bool = True) -> None:
"""Reset the buffer completely, clearing all data."""
chunk_count = len(self._chunks)
LOGGER.log(
VERBOSE_LOG_LEVEL,
- "AudioBuffer.clear: Resetting buffer (had %s chunks, has fill task: %s)",
+ "AudioBuffer.clear: Resetting buffer (had %s chunks, has producer task: %s)",
chunk_count,
- self._buffer_fill_task is not None,
+ self._producer_task is not None,
)
- if self._buffer_fill_task:
- self._buffer_fill_task.cancel()
+ # Cancel producer task if present
+ if self._producer_task and not self._producer_task.done():
+ self._producer_task.cancel()
with suppress(asyncio.CancelledError):
- await self._buffer_fill_task
- # cancel the inactivity task
- if self._inactivity_task:
- current_task = asyncio.current_task()
- # Don't await inactivity task cancellation if we're being called from it
- # to avoid deadlock/blocking
- if current_task != self._inactivity_task:
- self._inactivity_task.cancel()
- with suppress(asyncio.CancelledError):
- await self._inactivity_task
- else:
- # Just cancel it without waiting since we're inside it
- self._inactivity_task.cancel()
+ await self._producer_task
+ # Cancel inactivity task if present
+ if cancel_inactivity_task and self._inactivity_task and not self._inactivity_task.done():
+ self._inactivity_task.cancel()
+ with suppress(asyncio.CancelledError):
+ await self._inactivity_task
+
async with self._lock:
# Replace the deque instead of clearing it to avoid blocking
# Clearing a large deque can take >100ms
self._discarded_chunks = 0
self._eof_received = False
self._cancelled = True # Mark buffer as cancelled
+ self._producer_error = None # Clear any producer error
# Notify all waiting tasks
self._data_available.notify_all()
self._space_available.notify_all()
check_interval = 30 # Check every 30 seconds
while True:
await asyncio.sleep(check_interval)
-
# Check if buffer has been inactive (no data and no activity)
time_since_access = time.time() - self._last_access_time
-
- # If buffer is empty and hasn't been accessed for timeout period,
+ # If buffer hasn't been accessed for timeout period,
# it likely means the producer failed or stream was abandoned
- if len(self._chunks) == 0 and time_since_access > inactivity_timeout:
- LOGGER.log(
- VERBOSE_LOG_LEVEL,
- "AudioBuffer: Empty buffer with no activity for %.1f seconds, "
- "clearing (likely abandoned stream)",
- time_since_access,
- )
- await self.clear()
- break # Stop monitoring after clearing
-
- # If buffer has data but hasn't been consumed, clear it
if len(self._chunks) > 0 and time_since_access > inactivity_timeout:
LOGGER.log(
VERBOSE_LOG_LEVEL,
time_since_access,
len(self._chunks),
)
- await self.clear()
break # Stop monitoring after clearing
+ # if we reach here, we have broken out of the loop due to inactivity
+ await self.clear(cancel_inactivity_task=False)
- def attach_fill_task(self, task: asyncio.Task[Any]) -> None:
+ def attach_producer_task(self, task: asyncio.Task[Any]) -> None:
"""Attach a background task that fills the buffer."""
- self._buffer_fill_task = task
+ self._producer_task = task
+
+ # Add a callback to capture any exceptions from the producer task
+ def _on_producer_done(t: asyncio.Task[Any]) -> None:
+ """Handle producer task completion."""
+ if t.cancelled():
+ return
+ # Capture any exception that occurred
+ exc = t.exception()
+ if exc is not None and isinstance(exc, Exception):
+ self._producer_error = exc
+ # Wake up any waiting consumers so they can see the error
+ loop = asyncio.get_running_loop()
+ loop.call_soon_threadsafe(self._data_available.notify_all)
+
+ task.add_done_callback(_on_producer_done)
# Start inactivity monitor if not already running
if self._inactivity_task is None or self._inactivity_task.done():
import contextlib
from collections.abc import AsyncGenerator, Callable
from functools import wraps
-from typing import Final, ParamSpec
+from typing import Any, Final, ParamSpec
-from music_assistant.helpers.util import empty_queue
+from music_assistant.helpers.util import close_async_generator, empty_queue
# Type variables for the buffered decorator
_P = ParamSpec("_P")
DEFAULT_BUFFER_SIZE: Final = 30
DEFAULT_MIN_BUFFER_BEFORE_YIELD: Final = 5
+# Keep strong references to producer tasks to prevent garbage collection
+# The event loop only keeps weak references to tasks
+_ACTIVE_PRODUCER_TASKS: set[asyncio.Task[Any]] = set()
+
async def buffered(
generator: AsyncGenerator[bytes, None],
async def producer() -> None:
"""Read from the original generator and fill the buffer."""
nonlocal producer_error
+ generator_consumed = False
try:
async for chunk in generator:
+ generator_consumed = True
if cancelled.is_set():
# Consumer has stopped, exit cleanly
break
await buffer.put(chunk)
if not threshold_reached.is_set() and buffer.qsize() >= min_buffer_before_yield:
threshold_reached.set()
+ # Yield to event loop every chunk to prevent blocking
+ await asyncio.sleep(0)
except Exception as err:
producer_error = err
+ if isinstance(err, asyncio.CancelledError):
+ raise
finally:
threshold_reached.set()
- # Clean up the generator
- with contextlib.suppress(RuntimeError, asyncio.CancelledError):
- await generator.aclose()
+ # Clean up the generator if needed
+ if not generator_consumed:
+ await close_async_generator(generator)
# Signal end of stream by putting None
with contextlib.suppress(asyncio.QueueFull):
buffer.put_nowait(None)
# Keep a strong reference to prevent garbage collection issues
# The event loop only keeps weak references to tasks
- _active_tasks = getattr(loop, "_buffered_generator_tasks", None)
- if _active_tasks is None:
- _active_tasks = set()
- loop._buffered_generator_tasks = _active_tasks # type: ignore[attr-defined]
- _active_tasks.add(producer_task)
+ _ACTIVE_PRODUCER_TASKS.add(producer_task)
# Remove from set when done
- producer_task.add_done_callback(_active_tasks.discard)
+ producer_task.add_done_callback(_ACTIVE_PRODUCER_TASKS.discard)
try:
# Wait for initial buffer to fill
cancelled.set()
# Drain the queue to unblock the producer if it's waiting on put()
empty_queue(buffer)
- # Wait for the producer to finish cleanly
- with contextlib.suppress(asyncio.CancelledError, RuntimeError):
- await producer_task
+ # Wait for the producer to finish cleanly with a timeout to prevent blocking
+ with contextlib.suppress(asyncio.CancelledError, RuntimeError, asyncio.TimeoutError):
+ await asyncio.wait_for(asyncio.shield(producer_task), timeout=1.0)
def use_buffer(