From: Marcel van der Veldt Date: Sun, 26 Oct 2025 20:44:52 +0000 (+0100) Subject: Ensure exception in buffer reading task gets propagated to outer process X-Git-Url: https://git.kitaultman.com/?a=commitdiff_plain;h=d2a4c9d3c02d6bfd85205b626a8c892e410bd9f5;p=music-assistant-server.git Ensure exception in buffer reading task gets propagated to outer process --- diff --git a/music_assistant/helpers/audio.py b/music_assistant/helpers/audio.py index fede30dd..2b63cb81 100644 --- a/music_assistant/helpers/audio.py +++ b/music_assistant/helpers/audio.py @@ -442,6 +442,7 @@ async def get_buffered_media_stream( async def fill_buffer_task() -> None: """Background task to fill the audio buffer.""" chunk_count = 0 + status = "running" try: async for chunk in get_media_stream( mass, streamdetails, pcm_format, seek_position=0, filter_params=filter_params @@ -449,25 +450,17 @@ async def get_buffered_media_stream( chunk_count += 1 await audio_buffer.put(chunk) except asyncio.CancelledError: - LOGGER.log( - VERBOSE_LOG_LEVEL, - "fill_buffer_task: Cancelled after %s chunks for %s", - chunk_count, - streamdetails.uri, - ) + status = "cancelled" + raise + except Exception: + status = "aborted with error" raise - except Exception as err: - LOGGER.error( - "fill audio buffer task: Error after %s chunks for %s: %s", - chunk_count, - streamdetails.uri, - err, - ) finally: await audio_buffer.set_eof() LOGGER.log( VERBOSE_LOG_LEVEL, - "fill_buffer_task: Completed (%s chunks) for %s", + "fill_buffer_task: %s (%s chunks) for %s", + status, chunk_count, streamdetails.uri, ) @@ -526,7 +519,7 @@ async def get_buffered_media_stream( audio_buffer = AudioBuffer(pcm_format, checksum) streamdetails.buffer = audio_buffer task = mass.loop.create_task(fill_buffer_task()) - audio_buffer.attach_fill_task(task) + audio_buffer.attach_producer_task(task) # special case: pcm format mismatch, resample on the fly # this may happen in some special situations such as crossfading diff --git a/music_assistant/helpers/audio_buffer.py b/music_assistant/helpers/audio_buffer.py index 45777c04..93f58b64 100644 --- a/music_assistant/helpers/audio_buffer.py +++ b/music_assistant/helpers/audio_buffer.py @@ -23,6 +23,10 @@ LOGGER = logging.getLogger(f"{MASS_LOGGER_NAME}.audio_buffer") DEFAULT_MAX_BUFFER_SIZE_SECONDS: int = 60 * 8 # 8 minutes +class AudioBufferEOF(Exception): + """Exception raised when the audio buffer reaches end-of-file.""" + + class AudioBuffer: """Simple buffer to hold (PCM) audio chunks with seek capability. @@ -55,17 +59,18 @@ class AudioBuffer: self._data_available = asyncio.Condition(self._lock) self._space_available = asyncio.Condition(self._lock) self._eof_received = False - self._buffer_fill_task: asyncio.Task[None] | None = None + self._producer_task: asyncio.Task[None] | None = None self._last_access_time: float = time.time() self._inactivity_task: asyncio.Task[None] | None = None self._cancelled = False # Set to True when buffer is cleared/cancelled + self._producer_error: Exception | None = None @property def cancelled(self) -> bool: """Return whether the buffer has been cancelled or cleared.""" if self._cancelled: return True - return self._buffer_fill_task is not None and self._buffer_fill_task.cancelled() + return self._producer_task is not None and self._producer_task.cancelled() @staticmethod def _cleanup_chunks(chunks: deque[bytes]) -> None: @@ -176,8 +181,9 @@ class AudioBuffer: Bytes containing one second of audio data Raises: - AudioError: If EOF is reached before chunk is available or - if chunk has been discarded + AudioBufferEOF: If EOF is reached before chunk is available + AudioError: If chunk has been discarded + Exception: Any exception that occurred in the producer task """ # Update last access time self._last_access_time = time.time() @@ -195,7 +201,10 @@ class AudioBuffer: buffer_index = chunk_number - self._discarded_chunks while buffer_index >= len(self._chunks): if self._eof_received: - raise AudioError("EOF") + # Check if producer had an error before raising EOF + if self._producer_error: + raise self._producer_error + raise AudioBufferEOF await self._data_available.wait() buffer_index = chunk_number - self._discarded_chunks @@ -233,34 +242,29 @@ class AudioBuffer: try: yield await self.get(chunk_number=chunk_number) chunk_number += 1 - except AudioError: + except AudioBufferEOF: break # EOF reached - async def clear(self) -> None: + async def clear(self, cancel_inactivity_task: bool = True) -> None: """Reset the buffer completely, clearing all data.""" chunk_count = len(self._chunks) LOGGER.log( VERBOSE_LOG_LEVEL, - "AudioBuffer.clear: Resetting buffer (had %s chunks, has fill task: %s)", + "AudioBuffer.clear: Resetting buffer (had %s chunks, has producer task: %s)", chunk_count, - self._buffer_fill_task is not None, + self._producer_task is not None, ) - if self._buffer_fill_task: - self._buffer_fill_task.cancel() + # Cancel producer task if present + if self._producer_task and not self._producer_task.done(): + self._producer_task.cancel() with suppress(asyncio.CancelledError): - await self._buffer_fill_task - # cancel the inactivity task - if self._inactivity_task: - current_task = asyncio.current_task() - # Don't await inactivity task cancellation if we're being called from it - # to avoid deadlock/blocking - if current_task != self._inactivity_task: - self._inactivity_task.cancel() - with suppress(asyncio.CancelledError): - await self._inactivity_task - else: - # Just cancel it without waiting since we're inside it - self._inactivity_task.cancel() + await self._producer_task + # Cancel inactivity task if present + if cancel_inactivity_task and self._inactivity_task and not self._inactivity_task.done(): + self._inactivity_task.cancel() + with suppress(asyncio.CancelledError): + await self._inactivity_task + async with self._lock: # Replace the deque instead of clearing it to avoid blocking # Clearing a large deque can take >100ms @@ -269,6 +273,7 @@ class AudioBuffer: self._discarded_chunks = 0 self._eof_received = False self._cancelled = True # Mark buffer as cancelled + self._producer_error = None # Clear any producer error # Notify all waiting tasks self._data_available.notify_all() self._space_available.notify_all() @@ -297,23 +302,10 @@ class AudioBuffer: check_interval = 30 # Check every 30 seconds while True: await asyncio.sleep(check_interval) - # Check if buffer has been inactive (no data and no activity) time_since_access = time.time() - self._last_access_time - - # If buffer is empty and hasn't been accessed for timeout period, + # If buffer hasn't been accessed for timeout period, # it likely means the producer failed or stream was abandoned - if len(self._chunks) == 0 and time_since_access > inactivity_timeout: - LOGGER.log( - VERBOSE_LOG_LEVEL, - "AudioBuffer: Empty buffer with no activity for %.1f seconds, " - "clearing (likely abandoned stream)", - time_since_access, - ) - await self.clear() - break # Stop monitoring after clearing - - # If buffer has data but hasn't been consumed, clear it if len(self._chunks) > 0 and time_since_access > inactivity_timeout: LOGGER.log( VERBOSE_LOG_LEVEL, @@ -321,12 +313,28 @@ class AudioBuffer: time_since_access, len(self._chunks), ) - await self.clear() break # Stop monitoring after clearing + # if we reach here, we have broken out of the loop due to inactivity + await self.clear(cancel_inactivity_task=False) - def attach_fill_task(self, task: asyncio.Task[Any]) -> None: + def attach_producer_task(self, task: asyncio.Task[Any]) -> None: """Attach a background task that fills the buffer.""" - self._buffer_fill_task = task + self._producer_task = task + + # Add a callback to capture any exceptions from the producer task + def _on_producer_done(t: asyncio.Task[Any]) -> None: + """Handle producer task completion.""" + if t.cancelled(): + return + # Capture any exception that occurred + exc = t.exception() + if exc is not None and isinstance(exc, Exception): + self._producer_error = exc + # Wake up any waiting consumers so they can see the error + loop = asyncio.get_running_loop() + loop.call_soon_threadsafe(self._data_available.notify_all) + + task.add_done_callback(_on_producer_done) # Start inactivity monitor if not already running if self._inactivity_task is None or self._inactivity_task.done(): diff --git a/music_assistant/helpers/buffered_generator.py b/music_assistant/helpers/buffered_generator.py index 11b6c18b..42902d12 100644 --- a/music_assistant/helpers/buffered_generator.py +++ b/music_assistant/helpers/buffered_generator.py @@ -6,9 +6,9 @@ import asyncio import contextlib from collections.abc import AsyncGenerator, Callable from functools import wraps -from typing import Final, ParamSpec +from typing import Any, Final, ParamSpec -from music_assistant.helpers.util import empty_queue +from music_assistant.helpers.util import close_async_generator, empty_queue # Type variables for the buffered decorator _P = ParamSpec("_P") @@ -16,6 +16,10 @@ _P = ParamSpec("_P") DEFAULT_BUFFER_SIZE: Final = 30 DEFAULT_MIN_BUFFER_BEFORE_YIELD: Final = 5 +# Keep strong references to producer tasks to prevent garbage collection +# The event loop only keeps weak references to tasks +_ACTIVE_PRODUCER_TASKS: set[asyncio.Task[Any]] = set() + async def buffered( generator: AsyncGenerator[bytes, None], @@ -52,21 +56,27 @@ async def buffered( async def producer() -> None: """Read from the original generator and fill the buffer.""" nonlocal producer_error + generator_consumed = False try: async for chunk in generator: + generator_consumed = True if cancelled.is_set(): # Consumer has stopped, exit cleanly break await buffer.put(chunk) if not threshold_reached.is_set() and buffer.qsize() >= min_buffer_before_yield: threshold_reached.set() + # Yield to event loop every chunk to prevent blocking + await asyncio.sleep(0) except Exception as err: producer_error = err + if isinstance(err, asyncio.CancelledError): + raise finally: threshold_reached.set() - # Clean up the generator - with contextlib.suppress(RuntimeError, asyncio.CancelledError): - await generator.aclose() + # Clean up the generator if needed + if not generator_consumed: + await close_async_generator(generator) # Signal end of stream by putting None with contextlib.suppress(asyncio.QueueFull): buffer.put_nowait(None) @@ -77,14 +87,10 @@ async def buffered( # Keep a strong reference to prevent garbage collection issues # The event loop only keeps weak references to tasks - _active_tasks = getattr(loop, "_buffered_generator_tasks", None) - if _active_tasks is None: - _active_tasks = set() - loop._buffered_generator_tasks = _active_tasks # type: ignore[attr-defined] - _active_tasks.add(producer_task) + _ACTIVE_PRODUCER_TASKS.add(producer_task) # Remove from set when done - producer_task.add_done_callback(_active_tasks.discard) + producer_task.add_done_callback(_ACTIVE_PRODUCER_TASKS.discard) try: # Wait for initial buffer to fill @@ -105,9 +111,9 @@ async def buffered( cancelled.set() # Drain the queue to unblock the producer if it's waiting on put() empty_queue(buffer) - # Wait for the producer to finish cleanly - with contextlib.suppress(asyncio.CancelledError, RuntimeError): - await producer_task + # Wait for the producer to finish cleanly with a timeout to prevent blocking + with contextlib.suppress(asyncio.CancelledError, RuntimeError, asyncio.TimeoutError): + await asyncio.wait_for(asyncio.shield(producer_task), timeout=1.0) def use_buffer( diff --git a/music_assistant/helpers/ffmpeg.py b/music_assistant/helpers/ffmpeg.py index 822ef157..38577a8d 100644 --- a/music_assistant/helpers/ffmpeg.py +++ b/music_assistant/helpers/ffmpeg.py @@ -165,27 +165,33 @@ class FFMpeg(AsyncProcess): async def _feed_stdin(self) -> None: """Feed stdin with audio chunks from an AsyncGenerator.""" assert not isinstance(self.audio_input, str | int) - generator_exhausted = False cancelled = False + status = "running" + chunk_count = 0 + self.logger.log(VERBOSE_LOG_LEVEL, "Start reading audio data from source...") try: start = time.time() - self.logger.debug("Start reading audio data from source...") async for chunk in self.audio_input: + chunk_count += 1 if self.closed: return await self.write(chunk) - self.logger.debug("Audio data source exhausted in %.2fs", time.time() - start) generator_exhausted = True - except Exception as err: - cancelled = isinstance(err, asyncio.CancelledError) - self.logger.error( - "Stream error: %s", - str(err) or err.__class__.__name__, - exc_info=err if self.logger.isEnabledFor(logging.DEBUG) else None, - ) + except asyncio.CancelledError: + status = "cancelled" + raise + except Exception: + status = "aborted with error" raise finally: + LOGGER.log( + VERBOSE_LOG_LEVEL, + "fill_buffer_task: %s (%s chunks received) in in %.2fs", + status, + chunk_count, + time.time() - start, + ) if not cancelled: await self.write_eof() # we need to ensure that we close the async generator