Ensure exception in buffer reading task gets propagated to outer process
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Sun, 26 Oct 2025 20:44:52 +0000 (21:44 +0100)
committerMarcel van der Veldt <m.vanderveldt@outlook.com>
Sun, 26 Oct 2025 20:44:52 +0000 (21:44 +0100)
music_assistant/helpers/audio.py
music_assistant/helpers/audio_buffer.py
music_assistant/helpers/buffered_generator.py
music_assistant/helpers/ffmpeg.py

index fede30dd73301c7d2a79866328cfccfa5f36811f..2b63cb8172a1a44103ac3d65e7d31a3b0547d197 100644 (file)
@@ -442,6 +442,7 @@ async def get_buffered_media_stream(
     async def fill_buffer_task() -> None:
         """Background task to fill the audio buffer."""
         chunk_count = 0
+        status = "running"
         try:
             async for chunk in get_media_stream(
                 mass, streamdetails, pcm_format, seek_position=0, filter_params=filter_params
@@ -449,25 +450,17 @@ async def get_buffered_media_stream(
                 chunk_count += 1
                 await audio_buffer.put(chunk)
         except asyncio.CancelledError:
-            LOGGER.log(
-                VERBOSE_LOG_LEVEL,
-                "fill_buffer_task: Cancelled after %s chunks for %s",
-                chunk_count,
-                streamdetails.uri,
-            )
+            status = "cancelled"
+            raise
+        except Exception:
+            status = "aborted with error"
             raise
-        except Exception as err:
-            LOGGER.error(
-                "fill audio buffer task: Error after %s chunks for %s: %s",
-                chunk_count,
-                streamdetails.uri,
-                err,
-            )
         finally:
             await audio_buffer.set_eof()
             LOGGER.log(
                 VERBOSE_LOG_LEVEL,
-                "fill_buffer_task: Completed (%s chunks) for %s",
+                "fill_buffer_task: %s (%s chunks) for %s",
+                status,
                 chunk_count,
                 streamdetails.uri,
             )
@@ -526,7 +519,7 @@ async def get_buffered_media_stream(
         audio_buffer = AudioBuffer(pcm_format, checksum)
         streamdetails.buffer = audio_buffer
         task = mass.loop.create_task(fill_buffer_task())
-        audio_buffer.attach_fill_task(task)
+        audio_buffer.attach_producer_task(task)
 
     # special case: pcm format mismatch, resample on the fly
     # this may happen in some special situations such as crossfading
index 45777c0430b80aad8b45ada1e2205bece42dedaa..93f58b64c212c54f582c0e2b09d4967af596a76d 100644 (file)
@@ -23,6 +23,10 @@ LOGGER = logging.getLogger(f"{MASS_LOGGER_NAME}.audio_buffer")
 DEFAULT_MAX_BUFFER_SIZE_SECONDS: int = 60 * 8  # 8 minutes
 
 
+class AudioBufferEOF(Exception):
+    """Exception raised when the audio buffer reaches end-of-file."""
+
+
 class AudioBuffer:
     """Simple buffer to hold (PCM) audio chunks with seek capability.
 
@@ -55,17 +59,18 @@ class AudioBuffer:
         self._data_available = asyncio.Condition(self._lock)
         self._space_available = asyncio.Condition(self._lock)
         self._eof_received = False
-        self._buffer_fill_task: asyncio.Task[None] | None = None
+        self._producer_task: asyncio.Task[None] | None = None
         self._last_access_time: float = time.time()
         self._inactivity_task: asyncio.Task[None] | None = None
         self._cancelled = False  # Set to True when buffer is cleared/cancelled
+        self._producer_error: Exception | None = None
 
     @property
     def cancelled(self) -> bool:
         """Return whether the buffer has been cancelled or cleared."""
         if self._cancelled:
             return True
-        return self._buffer_fill_task is not None and self._buffer_fill_task.cancelled()
+        return self._producer_task is not None and self._producer_task.cancelled()
 
     @staticmethod
     def _cleanup_chunks(chunks: deque[bytes]) -> None:
@@ -176,8 +181,9 @@ class AudioBuffer:
             Bytes containing one second of audio data
 
         Raises:
-            AudioError: If EOF is reached before chunk is available or
-                       if chunk has been discarded
+            AudioBufferEOF: If EOF is reached before chunk is available
+            AudioError: If chunk has been discarded
+            Exception: Any exception that occurred in the producer task
         """
         # Update last access time
         self._last_access_time = time.time()
@@ -195,7 +201,10 @@ class AudioBuffer:
             buffer_index = chunk_number - self._discarded_chunks
             while buffer_index >= len(self._chunks):
                 if self._eof_received:
-                    raise AudioError("EOF")
+                    # Check if producer had an error before raising EOF
+                    if self._producer_error:
+                        raise self._producer_error
+                    raise AudioBufferEOF
                 await self._data_available.wait()
                 buffer_index = chunk_number - self._discarded_chunks
 
@@ -233,34 +242,29 @@ class AudioBuffer:
             try:
                 yield await self.get(chunk_number=chunk_number)
                 chunk_number += 1
-            except AudioError:
+            except AudioBufferEOF:
                 break  # EOF reached
 
-    async def clear(self) -> None:
+    async def clear(self, cancel_inactivity_task: bool = True) -> None:
         """Reset the buffer completely, clearing all data."""
         chunk_count = len(self._chunks)
         LOGGER.log(
             VERBOSE_LOG_LEVEL,
-            "AudioBuffer.clear: Resetting buffer (had %s chunks, has fill task: %s)",
+            "AudioBuffer.clear: Resetting buffer (had %s chunks, has producer task: %s)",
             chunk_count,
-            self._buffer_fill_task is not None,
+            self._producer_task is not None,
         )
-        if self._buffer_fill_task:
-            self._buffer_fill_task.cancel()
+        # Cancel producer task if present
+        if self._producer_task and not self._producer_task.done():
+            self._producer_task.cancel()
             with suppress(asyncio.CancelledError):
-                await self._buffer_fill_task
-        # cancel the inactivity task
-        if self._inactivity_task:
-            current_task = asyncio.current_task()
-            # Don't await inactivity task cancellation if we're being called from it
-            # to avoid deadlock/blocking
-            if current_task != self._inactivity_task:
-                self._inactivity_task.cancel()
-                with suppress(asyncio.CancelledError):
-                    await self._inactivity_task
-            else:
-                # Just cancel it without waiting since we're inside it
-                self._inactivity_task.cancel()
+                await self._producer_task
+        # Cancel inactivity task if present
+        if cancel_inactivity_task and self._inactivity_task and not self._inactivity_task.done():
+            self._inactivity_task.cancel()
+            with suppress(asyncio.CancelledError):
+                await self._inactivity_task
+
         async with self._lock:
             # Replace the deque instead of clearing it to avoid blocking
             # Clearing a large deque can take >100ms
@@ -269,6 +273,7 @@ class AudioBuffer:
             self._discarded_chunks = 0
             self._eof_received = False
             self._cancelled = True  # Mark buffer as cancelled
+            self._producer_error = None  # Clear any producer error
             # Notify all waiting tasks
             self._data_available.notify_all()
             self._space_available.notify_all()
@@ -297,23 +302,10 @@ class AudioBuffer:
         check_interval = 30  # Check every 30 seconds
         while True:
             await asyncio.sleep(check_interval)
-
             # Check if buffer has been inactive (no data and no activity)
             time_since_access = time.time() - self._last_access_time
-
-            # If buffer is empty and hasn't been accessed for timeout period,
+            # If buffer hasn't been accessed for timeout period,
             # it likely means the producer failed or stream was abandoned
-            if len(self._chunks) == 0 and time_since_access > inactivity_timeout:
-                LOGGER.log(
-                    VERBOSE_LOG_LEVEL,
-                    "AudioBuffer: Empty buffer with no activity for %.1f seconds, "
-                    "clearing (likely abandoned stream)",
-                    time_since_access,
-                )
-                await self.clear()
-                break  # Stop monitoring after clearing
-
-            # If buffer has data but hasn't been consumed, clear it
             if len(self._chunks) > 0 and time_since_access > inactivity_timeout:
                 LOGGER.log(
                     VERBOSE_LOG_LEVEL,
@@ -321,12 +313,28 @@ class AudioBuffer:
                     time_since_access,
                     len(self._chunks),
                 )
-                await self.clear()
                 break  # Stop monitoring after clearing
+        # if we reach here, we have broken out of the loop due to inactivity
+        await self.clear(cancel_inactivity_task=False)
 
-    def attach_fill_task(self, task: asyncio.Task[Any]) -> None:
+    def attach_producer_task(self, task: asyncio.Task[Any]) -> None:
         """Attach a background task that fills the buffer."""
-        self._buffer_fill_task = task
+        self._producer_task = task
+
+        # Add a callback to capture any exceptions from the producer task
+        def _on_producer_done(t: asyncio.Task[Any]) -> None:
+            """Handle producer task completion."""
+            if t.cancelled():
+                return
+            # Capture any exception that occurred
+            exc = t.exception()
+            if exc is not None and isinstance(exc, Exception):
+                self._producer_error = exc
+                # Wake up any waiting consumers so they can see the error
+                loop = asyncio.get_running_loop()
+                loop.call_soon_threadsafe(self._data_available.notify_all)
+
+        task.add_done_callback(_on_producer_done)
 
         # Start inactivity monitor if not already running
         if self._inactivity_task is None or self._inactivity_task.done():
index 11b6c18b259c4f0e4284a7a008ff791006964835..42902d122e2101b174b23b1e05e32b06b7dda4c9 100644 (file)
@@ -6,9 +6,9 @@ import asyncio
 import contextlib
 from collections.abc import AsyncGenerator, Callable
 from functools import wraps
-from typing import Final, ParamSpec
+from typing import Any, Final, ParamSpec
 
-from music_assistant.helpers.util import empty_queue
+from music_assistant.helpers.util import close_async_generator, empty_queue
 
 # Type variables for the buffered decorator
 _P = ParamSpec("_P")
@@ -16,6 +16,10 @@ _P = ParamSpec("_P")
 DEFAULT_BUFFER_SIZE: Final = 30
 DEFAULT_MIN_BUFFER_BEFORE_YIELD: Final = 5
 
+# Keep strong references to producer tasks to prevent garbage collection
+# The event loop only keeps weak references to tasks
+_ACTIVE_PRODUCER_TASKS: set[asyncio.Task[Any]] = set()
+
 
 async def buffered(
     generator: AsyncGenerator[bytes, None],
@@ -52,21 +56,27 @@ async def buffered(
     async def producer() -> None:
         """Read from the original generator and fill the buffer."""
         nonlocal producer_error
+        generator_consumed = False
         try:
             async for chunk in generator:
+                generator_consumed = True
                 if cancelled.is_set():
                     # Consumer has stopped, exit cleanly
                     break
                 await buffer.put(chunk)
                 if not threshold_reached.is_set() and buffer.qsize() >= min_buffer_before_yield:
                     threshold_reached.set()
+                # Yield to event loop every chunk to prevent blocking
+                await asyncio.sleep(0)
         except Exception as err:
             producer_error = err
+            if isinstance(err, asyncio.CancelledError):
+                raise
         finally:
             threshold_reached.set()
-            # Clean up the generator
-            with contextlib.suppress(RuntimeError, asyncio.CancelledError):
-                await generator.aclose()
+            # Clean up the generator if needed
+            if not generator_consumed:
+                await close_async_generator(generator)
             # Signal end of stream by putting None
             with contextlib.suppress(asyncio.QueueFull):
                 buffer.put_nowait(None)
@@ -77,14 +87,10 @@ async def buffered(
 
     # Keep a strong reference to prevent garbage collection issues
     # The event loop only keeps weak references to tasks
-    _active_tasks = getattr(loop, "_buffered_generator_tasks", None)
-    if _active_tasks is None:
-        _active_tasks = set()
-        loop._buffered_generator_tasks = _active_tasks  # type: ignore[attr-defined]
-    _active_tasks.add(producer_task)
+    _ACTIVE_PRODUCER_TASKS.add(producer_task)
 
     # Remove from set when done
-    producer_task.add_done_callback(_active_tasks.discard)
+    producer_task.add_done_callback(_ACTIVE_PRODUCER_TASKS.discard)
 
     try:
         # Wait for initial buffer to fill
@@ -105,9 +111,9 @@ async def buffered(
         cancelled.set()
         # Drain the queue to unblock the producer if it's waiting on put()
         empty_queue(buffer)
-        # Wait for the producer to finish cleanly
-        with contextlib.suppress(asyncio.CancelledError, RuntimeError):
-            await producer_task
+        # Wait for the producer to finish cleanly with a timeout to prevent blocking
+        with contextlib.suppress(asyncio.CancelledError, RuntimeError, asyncio.TimeoutError):
+            await asyncio.wait_for(asyncio.shield(producer_task), timeout=1.0)
 
 
 def use_buffer(
index 822ef157f7876add7ce8ad0baee991acd70bcadc..38577a8d06d43fe8c6d21e8b7d7ccb7d81bc9a40 100644 (file)
@@ -165,27 +165,33 @@ class FFMpeg(AsyncProcess):
     async def _feed_stdin(self) -> None:
         """Feed stdin with audio chunks from an AsyncGenerator."""
         assert not isinstance(self.audio_input, str | int)
-
         generator_exhausted = False
         cancelled = False
+        status = "running"
+        chunk_count = 0
+        self.logger.log(VERBOSE_LOG_LEVEL, "Start reading audio data from source...")
         try:
             start = time.time()
-            self.logger.debug("Start reading audio data from source...")
             async for chunk in self.audio_input:
+                chunk_count += 1
                 if self.closed:
                     return
                 await self.write(chunk)
-            self.logger.debug("Audio data source exhausted in %.2fs", time.time() - start)
             generator_exhausted = True
-        except Exception as err:
-            cancelled = isinstance(err, asyncio.CancelledError)
-            self.logger.error(
-                "Stream error: %s",
-                str(err) or err.__class__.__name__,
-                exc_info=err if self.logger.isEnabledFor(logging.DEBUG) else None,
-            )
+        except asyncio.CancelledError:
+            status = "cancelled"
+            raise
+        except Exception:
+            status = "aborted with error"
             raise
         finally:
+            LOGGER.log(
+                VERBOSE_LOG_LEVEL,
+                "fill_buffer_task: %s (%s chunks received) in in %.2fs",
+                status,
+                chunk_count,
+                time.time() - start,
+            )
             if not cancelled:
                 await self.write_eof()
             # we need to ensure that we close the async generator