From 5cb4c3c93f1cfeeb26179c09d238f8c24244098b Mon Sep 17 00:00:00 2001 From: Marcel van der Veldt Date: Fri, 31 Oct 2025 02:07:38 +0100 Subject: [PATCH] Fix buffered generator: must yield 1 second chunks --- music_assistant/controllers/streams.py | 24 ++- music_assistant/helpers/buffered_generator.py | 153 ++++++++++++------ 2 files changed, 124 insertions(+), 53 deletions(-) diff --git a/music_assistant/controllers/streams.py b/music_assistant/controllers/streams.py index 4abd186b..efed6383 100644 --- a/music_assistant/controllers/streams.py +++ b/music_assistant/controllers/streams.py @@ -66,7 +66,7 @@ from music_assistant.helpers.audio import ( get_stream_details, resample_pcm_audio, ) -from music_assistant.helpers.buffered_generator import use_buffer +from music_assistant.helpers.buffered_generator import use_audio_buffer from music_assistant.helpers.ffmpeg import LOGGER as FFMPEG_LOGGER from music_assistant.helpers.ffmpeg import check_ffmpeg_version, get_ffmpeg_stream from music_assistant.helpers.smart_fades import ( @@ -319,7 +319,8 @@ class StreamsController(CoreController): ) # Start periodic garbage collection task # This ensures memory from audio buffers and streams is cleaned up regularly - self.mass.call_later(900, self._periodic_garbage_collection) # 15 minutes + # DISABLED FOR TESTING - may cause event loop blocking + # self.mass.call_later(900, self._periodic_garbage_collection) # 15 minutes async def close(self) -> None: """Cleanup on exit.""" @@ -827,14 +828,18 @@ class StreamsController(CoreController): # like https hosts and it also offers the pre-announce 'bell' return f"{self.base_url}/announcement/{player_id}.{content_type.value}" - @use_buffer(30, 1) + @use_audio_buffer(buffer_size=30, min_buffer_before_yield=4) async def get_queue_flow_stream( self, queue: PlayerQueue, start_queue_item: QueueItem, pcm_format: AudioFormat, ) -> AsyncGenerator[bytes, None]: - """Get a flow stream of all tracks in the queue as raw PCM audio.""" + """ + Get a flow stream of all tracks in the queue as raw PCM audio. + + yields chunks of exactly 1 second of audio in the given pcm_format. + """ # ruff: noqa: PLR0915 assert pcm_format.content_type.is_pcm() queue_track = None @@ -917,6 +922,7 @@ class StreamsController(CoreController): buffer = b"" # handle incoming audio chunks first_chunk_received = False + buffer_filled = False async for chunk in self.get_queue_item_stream( queue_track, pcm_format=pcm_format, @@ -941,8 +947,14 @@ class StreamsController(CoreController): del chunk if len(buffer) < req_buffer_size: # buffer is not full enough, move on + # yield control to event loop to prevent blocking pipe writes + # use 10ms delay to ensure I/O operations can complete + await asyncio.sleep(0.01) continue + if not buffer_filled and last_fadeout_part: + buffer_filled = True + #### HANDLE CROSSFADE OF PREVIOUS TRACK AND NEW TRACK if last_fadeout_part and last_streamdetails: # perform crossfade @@ -967,7 +979,7 @@ class StreamsController(CoreController): last_play_log_entry.seconds_streamed += ( crossfade_part_len / 2 / pcm_sample_size ) - # send crossfade_part (as one big chunk) + # yield crossfade_part - buffered_generator will rechunk to 1-second yield crossfade_part del crossfade_part # also write the leftover bytes from the crossfade action @@ -1263,7 +1275,7 @@ class StreamsController(CoreController): self.mass.create_task(music_prov.on_streamed(streamdetails)) # Periodic GC task will handle memory cleanup every 15 minutes - @use_buffer(30, 1) + @use_audio_buffer(buffer_size=30, min_buffer_before_yield=4) async def get_queue_item_stream_with_smartfade( self, queue_item: QueueItem, diff --git a/music_assistant/helpers/buffered_generator.py b/music_assistant/helpers/buffered_generator.py index 523b4883..a950ae85 100644 --- a/music_assistant/helpers/buffered_generator.py +++ b/music_assistant/helpers/buffered_generator.py @@ -1,4 +1,4 @@ -"""Helper for adding buffering to async generators.""" +"""Helper for adding buffering to async audio generators.""" from __future__ import annotations @@ -8,7 +8,9 @@ from collections.abc import AsyncGenerator, Callable from functools import wraps from typing import Any, Final, ParamSpec -from music_assistant.helpers.util import close_async_generator, empty_queue +from music_assistant_models.streamdetails import AudioFormat + +from music_assistant.helpers.util import close_async_generator # Type variables for the buffered decorator _P = ParamSpec("_P") @@ -21,31 +23,42 @@ DEFAULT_MIN_BUFFER_BEFORE_YIELD: Final = 5 _ACTIVE_PRODUCER_TASKS: set[asyncio.Task[Any]] = set() -async def buffered( +async def buffered_audio( generator: AsyncGenerator[bytes, None], + pcm_format: AudioFormat, buffer_size: int = DEFAULT_BUFFER_SIZE, min_buffer_before_yield: int = DEFAULT_MIN_BUFFER_BEFORE_YIELD, ) -> AsyncGenerator[bytes, None]: """ - Add buffering to an async generator that yields bytes. + Add buffering to an async audio generator that yields PCM audio bytes. + + This function uses a shared buffer with asyncio.Condition to decouple the producer + (reading from the stream) from the consumer (yielding to the client). - This function uses an asyncio.Queue to decouple the producer (reading from the stream) - from the consumer (yielding to the client). The producer runs in a separate task and - fills the buffer, while the consumer yields from the buffer. + Ensures chunks yielded to the consumer are exactly 1 second of audio + (critical for sync timing calculations). Args: generator: The async generator to buffer - buffer_size: Maximum number of chunks to buffer (default: 30) + pcm_format: AudioFormat - defines chunk size for 1-second audio chunks + buffer_size: Maximum number of 1-second chunks to buffer (default: 30) min_buffer_before_yield: Minimum chunks to buffer before starting to yield (default: 5) Example: - async for chunk in buffered(my_generator(), buffer_size=100): + async for chunk in buffered_audio(my_generator(), pcm_format, buffer_size=100): + # Each chunk is exactly 1 second of audio process(chunk) """ - buffer: asyncio.Queue[bytes | None] = asyncio.Queue(maxsize=buffer_size) + # Shared state between producer and consumer + data_buffer = bytearray() # Shared buffer for audio data + condition = asyncio.Condition() # Synchronization primitive producer_error: Exception | None = None - threshold_reached = asyncio.Event() - cancelled = asyncio.Event() + producer_done = False + cancelled = False + + # Calculate chunk size and buffer limits + chunk_size = pcm_format.pcm_sample_size # Size of 1 second of audio + max_buffer_bytes = buffer_size * chunk_size if buffer_size <= 1: # No buffering needed, yield directly @@ -55,33 +68,43 @@ async def buffered( async def producer() -> None: """Read from the original generator and fill the buffer.""" - nonlocal producer_error + nonlocal producer_error, producer_done, cancelled generator_consumed = False try: async for chunk in generator: generator_consumed = True - if cancelled.is_set(): - # Consumer has stopped, exit cleanly + if cancelled: break - await buffer.put(chunk) - if not threshold_reached.is_set() and buffer.qsize() >= min_buffer_before_yield: - threshold_reached.set() - # Yield to event loop every chunk to prevent blocking - await asyncio.sleep(0) + + # Wait if buffer is too full + async with condition: + while len(data_buffer) >= max_buffer_bytes and not cancelled: + await condition.wait() + + if cancelled: + break + + # Append to shared buffer + data_buffer.extend(chunk) + # Notify consumer that data is available + condition.notify() + + # Yield to event loop to prevent blocking + # Use 10ms delay to ensure I/O operations (pipe writes) can complete + await asyncio.sleep(0.01) + except Exception as err: producer_error = err if isinstance(err, asyncio.CancelledError): raise finally: - threshold_reached.set() # Clean up the generator if needed if not generator_consumed: await close_async_generator(generator) - # Signal end of stream by putting None - # We must wait for space in the queue if needed, otherwise the consumer may - # hang waiting for data that will never come - if not cancelled.is_set(): - await buffer.put(None) + # Signal end of stream + async with condition: + producer_done = True + condition.notify() # Start the producer task loop = asyncio.get_running_loop() @@ -94,31 +117,56 @@ async def buffered( # Remove from set when done producer_task.add_done_callback(_ACTIVE_PRODUCER_TASKS.discard) + # Calculate minimum buffer level before yielding + min_buffer_bytes = min_buffer_before_yield * chunk_size + try: # Wait for initial buffer to fill - await threshold_reached.wait() + async with condition: + while len(data_buffer) < min_buffer_bytes and not producer_done: + await condition.wait() - # Consume from buffer and yield + # Consume from buffer and yield 1-second audio chunks while True: - data = await buffer.get() - if data is None: - # End of stream - if producer_error: - raise producer_error - break - yield data + async with condition: + # Wait for enough data or end of stream + while len(data_buffer) < chunk_size and not producer_done: + await condition.wait() + + # Check if we're done + if len(data_buffer) < chunk_size and producer_done: + # Yield any remaining partial chunk + if data_buffer: + chunk = bytes(data_buffer) + data_buffer.clear() + condition.notify() + yield chunk + if producer_error: + raise producer_error + break + + # Extract exactly 1 second of audio + chunk = bytes(data_buffer[:chunk_size]) + del data_buffer[:chunk_size] + + # Notify producer that space is available + condition.notify() + + # Yield outside the lock to avoid holding it during I/O + yield chunk finally: # Signal the producer to stop - cancelled.set() - # Drain the queue to unblock the producer if it's waiting on put() - empty_queue(buffer) + async with condition: + cancelled = True + condition.notify() # Wait for the producer to finish cleanly with a timeout to prevent blocking with contextlib.suppress(asyncio.CancelledError, RuntimeError, asyncio.TimeoutError): await asyncio.wait_for(asyncio.shield(producer_task), timeout=1.0) -def use_buffer( +def use_audio_buffer( + pcm_format_arg: str = "pcm_format", buffer_size: int = DEFAULT_BUFFER_SIZE, min_buffer_before_yield: int = DEFAULT_MIN_BUFFER_BEFORE_YIELD, ) -> Callable[ @@ -126,19 +174,23 @@ def use_buffer( Callable[_P, AsyncGenerator[bytes, None]], ]: """ - Add buffering to async generator functions that yield bytes (decorator). + Add buffering to async audio generator functions that yield PCM audio bytes (decorator). - This decorator uses an asyncio.Queue to decouple the producer (reading from the stream) - from the consumer (yielding to the client). The producer runs in a separate task and - fills the buffer, while the consumer yields from the buffer. + This decorator uses a shared buffer with asyncio.Condition to decouple the producer + (reading from the stream) from the consumer (yielding to the client). + + Ensures chunks yielded are exactly 1 second of audio (critical for sync timing). Args: - buffer_size: Maximum number of chunks to buffer (default: 30) + pcm_format_arg: Name of the argument containing AudioFormat (default: "pcm_format") + buffer_size: Maximum number of 1-second chunks to buffer (default: 30) min_buffer_before_yield: Minimum chunks to buffer before starting to yield (default: 5) Example: - @use_buffer(buffer_size=100) - async def my_stream() -> AsyncGenerator[bytes, None]: + @use_audio_buffer(pcm_format_arg="pcm_format", buffer_size=100) + async def my_stream(pcm_format: AudioFormat) -> AsyncGenerator[bytes, None]: + # Generator can yield variable-sized chunks + # Decorator ensures output is exactly 1-second chunks ... """ @@ -147,8 +199,15 @@ def use_buffer( ) -> Callable[_P, AsyncGenerator[bytes, None]]: @wraps(func) async def wrapper(*args: _P.args, **kwargs: _P.kwargs) -> AsyncGenerator[bytes, None]: - async for chunk in buffered( + # Extract pcm_format from function arguments + pcm_format = kwargs.get(pcm_format_arg) + if pcm_format is None: + msg = f"Audio buffer decorator requires '{pcm_format_arg}' argument" + raise ValueError(msg) + + async for chunk in buffered_audio( func(*args, **kwargs), + pcm_format=pcm_format, buffer_size=buffer_size, min_buffer_before_yield=min_buffer_before_yield, ): -- 2.34.1