Fix buffered generator: must yield 1 second chunks
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Fri, 31 Oct 2025 01:07:38 +0000 (02:07 +0100)
committerMarcel van der Veldt <m.vanderveldt@outlook.com>
Fri, 31 Oct 2025 01:07:38 +0000 (02:07 +0100)
music_assistant/controllers/streams.py
music_assistant/helpers/buffered_generator.py

index 4abd186ba792d97c581ad580daabdc7ab14d5504..efed638325fd075f6496bc9a77734168b49534fe 100644 (file)
@@ -66,7 +66,7 @@ from music_assistant.helpers.audio import (
     get_stream_details,
     resample_pcm_audio,
 )
-from music_assistant.helpers.buffered_generator import use_buffer
+from music_assistant.helpers.buffered_generator import use_audio_buffer
 from music_assistant.helpers.ffmpeg import LOGGER as FFMPEG_LOGGER
 from music_assistant.helpers.ffmpeg import check_ffmpeg_version, get_ffmpeg_stream
 from music_assistant.helpers.smart_fades import (
@@ -319,7 +319,8 @@ class StreamsController(CoreController):
         )
         # Start periodic garbage collection task
         # This ensures memory from audio buffers and streams is cleaned up regularly
-        self.mass.call_later(900, self._periodic_garbage_collection)  # 15 minutes
+        # DISABLED FOR TESTING - may cause event loop blocking
+        # self.mass.call_later(900, self._periodic_garbage_collection)  # 15 minutes
 
     async def close(self) -> None:
         """Cleanup on exit."""
@@ -827,14 +828,18 @@ class StreamsController(CoreController):
         # like https hosts and it also offers the pre-announce 'bell'
         return f"{self.base_url}/announcement/{player_id}.{content_type.value}"
 
-    @use_buffer(30, 1)
+    @use_audio_buffer(buffer_size=30, min_buffer_before_yield=4)
     async def get_queue_flow_stream(
         self,
         queue: PlayerQueue,
         start_queue_item: QueueItem,
         pcm_format: AudioFormat,
     ) -> AsyncGenerator[bytes, None]:
-        """Get a flow stream of all tracks in the queue as raw PCM audio."""
+        """
+        Get a flow stream of all tracks in the queue as raw PCM audio.
+
+        yields chunks of exactly 1 second of audio in the given pcm_format.
+        """
         # ruff: noqa: PLR0915
         assert pcm_format.content_type.is_pcm()
         queue_track = None
@@ -917,6 +922,7 @@ class StreamsController(CoreController):
             buffer = b""
             # handle incoming audio chunks
             first_chunk_received = False
+            buffer_filled = False
             async for chunk in self.get_queue_item_stream(
                 queue_track,
                 pcm_format=pcm_format,
@@ -941,8 +947,14 @@ class StreamsController(CoreController):
                 del chunk
                 if len(buffer) < req_buffer_size:
                     # buffer is not full enough, move on
+                    # yield control to event loop to prevent blocking pipe writes
+                    # use 10ms delay to ensure I/O operations can complete
+                    await asyncio.sleep(0.01)
                     continue
 
+                if not buffer_filled and last_fadeout_part:
+                    buffer_filled = True
+
                 ####  HANDLE CROSSFADE OF PREVIOUS TRACK AND NEW TRACK
                 if last_fadeout_part and last_streamdetails:
                     # perform crossfade
@@ -967,7 +979,7 @@ class StreamsController(CoreController):
                         last_play_log_entry.seconds_streamed += (
                             crossfade_part_len / 2 / pcm_sample_size
                         )
-                    # send crossfade_part (as one big chunk)
+                    # yield crossfade_part - buffered_generator will rechunk to 1-second
                     yield crossfade_part
                     del crossfade_part
                     # also write the leftover bytes from the crossfade action
@@ -1263,7 +1275,7 @@ class StreamsController(CoreController):
                 self.mass.create_task(music_prov.on_streamed(streamdetails))
             # Periodic GC task will handle memory cleanup every 15 minutes
 
-    @use_buffer(30, 1)
+    @use_audio_buffer(buffer_size=30, min_buffer_before_yield=4)
     async def get_queue_item_stream_with_smartfade(
         self,
         queue_item: QueueItem,
index 523b4883376c174df36acbb23793c81f4471f9d6..a950ae85f0c1be6240cae8114426dff89e1b80e0 100644 (file)
@@ -1,4 +1,4 @@
-"""Helper for adding buffering to async generators."""
+"""Helper for adding buffering to async audio generators."""
 
 from __future__ import annotations
 
@@ -8,7 +8,9 @@ from collections.abc import AsyncGenerator, Callable
 from functools import wraps
 from typing import Any, Final, ParamSpec
 
-from music_assistant.helpers.util import close_async_generator, empty_queue
+from music_assistant_models.streamdetails import AudioFormat
+
+from music_assistant.helpers.util import close_async_generator
 
 # Type variables for the buffered decorator
 _P = ParamSpec("_P")
@@ -21,31 +23,42 @@ DEFAULT_MIN_BUFFER_BEFORE_YIELD: Final = 5
 _ACTIVE_PRODUCER_TASKS: set[asyncio.Task[Any]] = set()
 
 
-async def buffered(
+async def buffered_audio(
     generator: AsyncGenerator[bytes, None],
+    pcm_format: AudioFormat,
     buffer_size: int = DEFAULT_BUFFER_SIZE,
     min_buffer_before_yield: int = DEFAULT_MIN_BUFFER_BEFORE_YIELD,
 ) -> AsyncGenerator[bytes, None]:
     """
-    Add buffering to an async generator that yields bytes.
+    Add buffering to an async audio generator that yields PCM audio bytes.
+
+    This function uses a shared buffer with asyncio.Condition to decouple the producer
+    (reading from the stream) from the consumer (yielding to the client).
 
-    This function uses an asyncio.Queue to decouple the producer (reading from the stream)
-    from the consumer (yielding to the client). The producer runs in a separate task and
-    fills the buffer, while the consumer yields from the buffer.
+    Ensures chunks yielded to the consumer are exactly 1 second of audio
+    (critical for sync timing calculations).
 
     Args:
         generator: The async generator to buffer
-        buffer_size: Maximum number of chunks to buffer (default: 30)
+        pcm_format: AudioFormat - defines chunk size for 1-second audio chunks
+        buffer_size: Maximum number of 1-second chunks to buffer (default: 30)
         min_buffer_before_yield: Minimum chunks to buffer before starting to yield (default: 5)
 
     Example:
-        async for chunk in buffered(my_generator(), buffer_size=100):
+        async for chunk in buffered_audio(my_generator(), pcm_format, buffer_size=100):
+            # Each chunk is exactly 1 second of audio
             process(chunk)
     """
-    buffer: asyncio.Queue[bytes | None] = asyncio.Queue(maxsize=buffer_size)
+    # Shared state between producer and consumer
+    data_buffer = bytearray()  # Shared buffer for audio data
+    condition = asyncio.Condition()  # Synchronization primitive
     producer_error: Exception | None = None
-    threshold_reached = asyncio.Event()
-    cancelled = asyncio.Event()
+    producer_done = False
+    cancelled = False
+
+    # Calculate chunk size and buffer limits
+    chunk_size = pcm_format.pcm_sample_size  # Size of 1 second of audio
+    max_buffer_bytes = buffer_size * chunk_size
 
     if buffer_size <= 1:
         # No buffering needed, yield directly
@@ -55,33 +68,43 @@ async def buffered(
 
     async def producer() -> None:
         """Read from the original generator and fill the buffer."""
-        nonlocal producer_error
+        nonlocal producer_error, producer_done, cancelled
         generator_consumed = False
         try:
             async for chunk in generator:
                 generator_consumed = True
-                if cancelled.is_set():
-                    # Consumer has stopped, exit cleanly
+                if cancelled:
                     break
-                await buffer.put(chunk)
-                if not threshold_reached.is_set() and buffer.qsize() >= min_buffer_before_yield:
-                    threshold_reached.set()
-                # Yield to event loop every chunk to prevent blocking
-                await asyncio.sleep(0)
+
+                # Wait if buffer is too full
+                async with condition:
+                    while len(data_buffer) >= max_buffer_bytes and not cancelled:
+                        await condition.wait()
+
+                    if cancelled:
+                        break
+
+                    # Append to shared buffer
+                    data_buffer.extend(chunk)
+                    # Notify consumer that data is available
+                    condition.notify()
+
+                # Yield to event loop to prevent blocking
+                # Use 10ms delay to ensure I/O operations (pipe writes) can complete
+                await asyncio.sleep(0.01)
+
         except Exception as err:
             producer_error = err
             if isinstance(err, asyncio.CancelledError):
                 raise
         finally:
-            threshold_reached.set()
             # Clean up the generator if needed
             if not generator_consumed:
                 await close_async_generator(generator)
-            # Signal end of stream by putting None
-            # We must wait for space in the queue if needed, otherwise the consumer may
-            # hang waiting for data that will never come
-            if not cancelled.is_set():
-                await buffer.put(None)
+            # Signal end of stream
+            async with condition:
+                producer_done = True
+                condition.notify()
 
     # Start the producer task
     loop = asyncio.get_running_loop()
@@ -94,31 +117,56 @@ async def buffered(
     # Remove from set when done
     producer_task.add_done_callback(_ACTIVE_PRODUCER_TASKS.discard)
 
+    # Calculate minimum buffer level before yielding
+    min_buffer_bytes = min_buffer_before_yield * chunk_size
+
     try:
         # Wait for initial buffer to fill
-        await threshold_reached.wait()
+        async with condition:
+            while len(data_buffer) < min_buffer_bytes and not producer_done:
+                await condition.wait()
 
-        # Consume from buffer and yield
+        # Consume from buffer and yield 1-second audio chunks
         while True:
-            data = await buffer.get()
-            if data is None:
-                # End of stream
-                if producer_error:
-                    raise producer_error
-                break
-            yield data
+            async with condition:
+                # Wait for enough data or end of stream
+                while len(data_buffer) < chunk_size and not producer_done:
+                    await condition.wait()
+
+                # Check if we're done
+                if len(data_buffer) < chunk_size and producer_done:
+                    # Yield any remaining partial chunk
+                    if data_buffer:
+                        chunk = bytes(data_buffer)
+                        data_buffer.clear()
+                        condition.notify()
+                        yield chunk
+                    if producer_error:
+                        raise producer_error
+                    break
+
+                # Extract exactly 1 second of audio
+                chunk = bytes(data_buffer[:chunk_size])
+                del data_buffer[:chunk_size]
+
+                # Notify producer that space is available
+                condition.notify()
+
+            # Yield outside the lock to avoid holding it during I/O
+            yield chunk
 
     finally:
         # Signal the producer to stop
-        cancelled.set()
-        # Drain the queue to unblock the producer if it's waiting on put()
-        empty_queue(buffer)
+        async with condition:
+            cancelled = True
+            condition.notify()
         # Wait for the producer to finish cleanly with a timeout to prevent blocking
         with contextlib.suppress(asyncio.CancelledError, RuntimeError, asyncio.TimeoutError):
             await asyncio.wait_for(asyncio.shield(producer_task), timeout=1.0)
 
 
-def use_buffer(
+def use_audio_buffer(
+    pcm_format_arg: str = "pcm_format",
     buffer_size: int = DEFAULT_BUFFER_SIZE,
     min_buffer_before_yield: int = DEFAULT_MIN_BUFFER_BEFORE_YIELD,
 ) -> Callable[
@@ -126,19 +174,23 @@ def use_buffer(
     Callable[_P, AsyncGenerator[bytes, None]],
 ]:
     """
-    Add buffering to async generator functions that yield bytes (decorator).
+    Add buffering to async audio generator functions that yield PCM audio bytes (decorator).
 
-    This decorator uses an asyncio.Queue to decouple the producer (reading from the stream)
-    from the consumer (yielding to the client). The producer runs in a separate task and
-    fills the buffer, while the consumer yields from the buffer.
+    This decorator uses a shared buffer with asyncio.Condition to decouple the producer
+    (reading from the stream) from the consumer (yielding to the client).
+
+    Ensures chunks yielded are exactly 1 second of audio (critical for sync timing).
 
     Args:
-        buffer_size: Maximum number of chunks to buffer (default: 30)
+        pcm_format_arg: Name of the argument containing AudioFormat (default: "pcm_format")
+        buffer_size: Maximum number of 1-second chunks to buffer (default: 30)
         min_buffer_before_yield: Minimum chunks to buffer before starting to yield (default: 5)
 
     Example:
-        @use_buffer(buffer_size=100)
-        async def my_stream() -> AsyncGenerator[bytes, None]:
+        @use_audio_buffer(pcm_format_arg="pcm_format", buffer_size=100)
+        async def my_stream(pcm_format: AudioFormat) -> AsyncGenerator[bytes, None]:
+            # Generator can yield variable-sized chunks
+            # Decorator ensures output is exactly 1-second chunks
             ...
     """
 
@@ -147,8 +199,15 @@ def use_buffer(
     ) -> Callable[_P, AsyncGenerator[bytes, None]]:
         @wraps(func)
         async def wrapper(*args: _P.args, **kwargs: _P.kwargs) -> AsyncGenerator[bytes, None]:
-            async for chunk in buffered(
+            # Extract pcm_format from function arguments
+            pcm_format = kwargs.get(pcm_format_arg)
+            if pcm_format is None:
+                msg = f"Audio buffer decorator requires '{pcm_format_arg}' argument"
+                raise ValueError(msg)
+
+            async for chunk in buffered_audio(
                 func(*args, **kwargs),
+                pcm_format=pcm_format,
                 buffer_size=buffer_size,
                 min_buffer_before_yield=min_buffer_before_yield,
             ):