Simplify buffered generator
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Sun, 2 Nov 2025 19:01:04 +0000 (20:01 +0100)
committerMarcel van der Veldt <m.vanderveldt@outlook.com>
Sun, 2 Nov 2025 19:01:04 +0000 (20:01 +0100)
music_assistant/controllers/streams.py
music_assistant/helpers/buffered_generator.py
music_assistant/providers/airplay/stream_session.py

index 2f16cfa7107a3614489f82662fa3ef7828c82e43..da8bbaa9618dd5ffcf5e8dbc192c034d24853067 100644 (file)
@@ -66,7 +66,7 @@ from music_assistant.helpers.audio import (
     get_stream_details,
     resample_pcm_audio,
 )
-from music_assistant.helpers.buffered_generator import buffered_audio, use_audio_buffer
+from music_assistant.helpers.buffered_generator import buffered, use_buffer
 from music_assistant.helpers.ffmpeg import LOGGER as FFMPEG_LOGGER
 from music_assistant.helpers.ffmpeg import check_ffmpeg_version, get_ffmpeg_stream
 from music_assistant.helpers.smart_fades import (
@@ -889,7 +889,7 @@ class StreamsController(CoreController):
             # single item stream (e.g. radio)
             queue_item = self.mass.player_queues.get_item(media.source_id, media.queue_item_id)
             assert queue_item
-            audio_source = buffered_audio(
+            audio_source = buffered(
                 self.get_queue_item_stream(
                     queue_item=queue_item,
                     pcm_format=pcm_format,
@@ -908,7 +908,7 @@ class StreamsController(CoreController):
             )
         return audio_source
 
-    @use_audio_buffer(buffer_size=30, min_buffer_before_yield=2)
+    @use_buffer(buffer_size=30, min_buffer_before_yield=2)
     async def get_queue_flow_stream(
         self,
         queue: PlayerQueue,
@@ -1358,7 +1358,7 @@ class StreamsController(CoreController):
                     assert isinstance(music_prov, MusicProvider)
                 self.mass.create_task(music_prov.on_streamed(streamdetails))
 
-    @use_audio_buffer(buffer_size=30, min_buffer_before_yield=2)
+    @use_buffer(buffer_size=30, min_buffer_before_yield=2)
     async def get_queue_item_stream_with_smartfade(
         self,
         queue_item: QueueItem,
index 6d45442ceaa99f7b714dca3bf3dff63763d99691..9a16823bdaeb63b44fa9b6ec2ce830e1455b85f0 100644 (file)
@@ -1,4 +1,4 @@
-"""Helper for adding buffering to async audio generators."""
+"""Helper for adding buffering to async (audio) generators."""
 
 from __future__ import annotations
 
@@ -6,12 +6,9 @@ import asyncio
 import contextlib
 from collections.abc import AsyncGenerator, Callable
 from functools import wraps
-from typing import TYPE_CHECKING, Any, Final, ParamSpec, cast
+from typing import Any, Final, ParamSpec
 
-from music_assistant.helpers.util import close_async_generator
-
-if TYPE_CHECKING:
-    from music_assistant_models.media_items.audio_format import AudioFormat
+from music_assistant.helpers.util import close_async_generator, empty_queue
 
 # Type variables for the buffered decorator
 _P = ParamSpec("_P")
@@ -24,42 +21,31 @@ DEFAULT_MIN_BUFFER_BEFORE_YIELD: Final = 5
 _ACTIVE_PRODUCER_TASKS: set[asyncio.Task[Any]] = set()
 
 
-async def buffered_audio(  # noqa: PLR0915
+async def buffered(
     generator: AsyncGenerator[bytes, None],
-    pcm_format: AudioFormat,
     buffer_size: int = DEFAULT_BUFFER_SIZE,
     min_buffer_before_yield: int = DEFAULT_MIN_BUFFER_BEFORE_YIELD,
 ) -> AsyncGenerator[bytes, None]:
     """
-    Add buffering to an async audio generator that yields PCM audio bytes.
-
-    This function uses a shared buffer with asyncio.Condition to decouple the producer
-    (reading from the stream) from the consumer (yielding to the client).
+    Add buffering to an async generator that yields (chunks of) bytes.
 
-    Ensures chunks yielded to the consumer are exactly 1 second of audio
-    (critical for sync timing calculations).
+    This function uses an asyncio.Queue to decouple the producer (reading from the stream)
+    from the consumer (yielding to the client). The producer runs in a separate task and
+    fills the buffer, while the consumer yields from the buffer.
 
     Args:
         generator: The async generator to buffer
-        pcm_format: AudioFormat - defines chunk size for 1-second audio chunks
-        buffer_size: Maximum number of 1-second chunks to buffer (default: 30)
+        buffer_size: Maximum number of chunks to buffer (default: 30)
         min_buffer_before_yield: Minimum chunks to buffer before starting to yield (default: 5)
 
     Example:
-        async for chunk in buffered_audio(my_generator(), pcm_format, buffer_size=100):
-            # Each chunk is exactly 1 second of audio
+        async for chunk in buffered(my_generator(), buffer_size=100):
             process(chunk)
     """
-    # Shared state between producer and consumer
-    data_buffer = bytearray()  # Shared buffer for audio data
-    condition = asyncio.Condition()  # Synchronization primitive
+    buffer: asyncio.Queue[bytes | None] = asyncio.Queue(maxsize=buffer_size)
     producer_error: Exception | None = None
-    producer_done = False
-    cancelled = False
-
-    # Calculate chunk size and buffer limits
-    chunk_size = pcm_format.pcm_sample_size  # Size of 1 second of audio
-    max_buffer_bytes = buffer_size * chunk_size
+    threshold_reached = asyncio.Event()
+    cancelled = asyncio.Event()
 
     if buffer_size <= 1:
         # No buffering needed, yield directly
@@ -69,43 +55,33 @@ async def buffered_audio(  # noqa: PLR0915
 
     async def producer() -> None:
         """Read from the original generator and fill the buffer."""
-        nonlocal producer_error, producer_done, cancelled
+        nonlocal producer_error
         generator_consumed = False
         try:
             async for chunk in generator:
                 generator_consumed = True
-                if cancelled:
+                if cancelled.is_set():
+                    # Consumer has stopped, exit cleanly
                     break
-
-                # Wait if buffer is too full
-                async with condition:
-                    while len(data_buffer) >= max_buffer_bytes and not cancelled:
-                        await condition.wait()
-
-                    if cancelled:
-                        break  # type: ignore[unreachable]
-
-                    # Append to shared buffer
-                    data_buffer.extend(chunk)
-                    # Notify consumer that data is available
-                    condition.notify()
-
-                # Yield to event loop to prevent blocking
-                # Use 10ms delay to ensure I/O operations (pipe writes) can complete
-                await asyncio.sleep(0.01)
-
+                await buffer.put(chunk)
+                if not threshold_reached.is_set() and buffer.qsize() >= min_buffer_before_yield:
+                    threshold_reached.set()
+                # Yield to event loop every chunk to prevent blocking
+                await asyncio.sleep(0)
         except Exception as err:
             producer_error = err
             if isinstance(err, asyncio.CancelledError):
                 raise
         finally:
+            threshold_reached.set()
             # Clean up the generator if needed
             if not generator_consumed:
                 await close_async_generator(generator)
-            # Signal end of stream
-            async with condition:
-                producer_done = True
-                condition.notify()
+            # Signal end of stream by putting None
+            # We must wait for space in the queue if needed, otherwise the consumer may
+            # hang waiting for data that will never come
+            if not cancelled.is_set():
+                await buffer.put(None)
 
     # Start the producer task
     loop = asyncio.get_running_loop()
@@ -118,56 +94,31 @@ async def buffered_audio(  # noqa: PLR0915
     # Remove from set when done
     producer_task.add_done_callback(_ACTIVE_PRODUCER_TASKS.discard)
 
-    # Calculate minimum buffer level before yielding
-    min_buffer_bytes = min_buffer_before_yield * chunk_size
-
     try:
         # Wait for initial buffer to fill
-        async with condition:
-            while len(data_buffer) < min_buffer_bytes and not producer_done:
-                await condition.wait()
+        await threshold_reached.wait()
 
-        # Consume from buffer and yield 1-second audio chunks
+        # Consume from buffer and yield
         while True:
-            async with condition:
-                # Wait for enough data or end of stream
-                while len(data_buffer) < chunk_size and not producer_done:
-                    await condition.wait()
-
-                # Check if we're done
-                if len(data_buffer) < chunk_size and producer_done:
-                    # Yield any remaining partial chunk
-                    if data_buffer:
-                        chunk = bytes(data_buffer)
-                        data_buffer.clear()
-                        condition.notify()
-                        yield chunk
-                    if producer_error:
-                        raise producer_error
-                    break
-
-                # Extract exactly 1 second of audio
-                chunk = bytes(data_buffer[:chunk_size])
-                del data_buffer[:chunk_size]
-
-                # Notify producer that space is available
-                condition.notify()
-
-            # Yield outside the lock to avoid holding it during I/O
-            yield chunk
+            data = await buffer.get()
+            if data is None:
+                # End of stream
+                if producer_error:
+                    raise producer_error
+                break
+            yield data
 
     finally:
         # Signal the producer to stop
-        async with condition:
-            cancelled = True
-            condition.notify()
+        cancelled.set()
+        # Drain the queue to unblock the producer if it's waiting on put()
+        empty_queue(buffer)
         # Wait for the producer to finish cleanly with a timeout to prevent blocking
         with contextlib.suppress(asyncio.CancelledError, RuntimeError, asyncio.TimeoutError):
             await asyncio.wait_for(asyncio.shield(producer_task), timeout=1.0)
 
 
-def use_audio_buffer(
-    pcm_format_arg: str = "pcm_format",
+def use_buffer(
     buffer_size: int = DEFAULT_BUFFER_SIZE,
     min_buffer_before_yield: int = DEFAULT_MIN_BUFFER_BEFORE_YIELD,
 ) -> Callable[
@@ -175,23 +126,19 @@ def use_audio_buffer(
     Callable[_P, AsyncGenerator[bytes, None]],
 ]:
     """
-    Add buffering to async audio generator functions that yield PCM audio bytes (decorator).
+    Add buffering to async generator functions that yield bytes (decorator).
 
-    This decorator uses a shared buffer with asyncio.Condition to decouple the producer
-    (reading from the stream) from the consumer (yielding to the client).
-
-    Ensures chunks yielded are exactly 1 second of audio (critical for sync timing).
+    This decorator uses an asyncio.Queue to decouple the producer (reading from the stream)
+    from the consumer (yielding to the client). The producer runs in a separate task and
+    fills the buffer, while the consumer yields from the buffer.
 
     Args:
-        pcm_format_arg: Name of the argument containing AudioFormat (default: "pcm_format")
-        buffer_size: Maximum number of 1-second chunks to buffer (default: 30)
+        buffer_size: Maximum number of chunks to buffer (default: 30)
         min_buffer_before_yield: Minimum chunks to buffer before starting to yield (default: 5)
 
     Example:
-        @use_audio_buffer(pcm_format_arg="pcm_format", buffer_size=100)
-        async def my_stream(pcm_format: AudioFormat) -> AsyncGenerator[bytes, None]:
-            # Generator can yield variable-sized chunks
-            # Decorator ensures output is exactly 1-second chunks
+        @use_buffer(buffer_size=100)
+        async def my_stream() -> AsyncGenerator[bytes, None]:
             ...
     """
 
@@ -200,15 +147,8 @@ def use_audio_buffer(
     ) -> Callable[_P, AsyncGenerator[bytes, None]]:
         @wraps(func)
         async def wrapper(*args: _P.args, **kwargs: _P.kwargs) -> AsyncGenerator[bytes, None]:
-            # Extract pcm_format from function arguments
-            pcm_format = cast("AudioFormat | None", kwargs.get(pcm_format_arg))
-            if pcm_format is None:
-                msg = f"Audio buffer decorator requires '{pcm_format_arg}' argument"
-                raise ValueError(msg)
-
-            async for chunk in buffered_audio(
+            async for chunk in buffered(
                 func(*args, **kwargs),
-                pcm_format=pcm_format,
                 buffer_size=buffer_size,
                 min_buffer_before_yield=min_buffer_before_yield,
             ):
index b504c3ae96b8f6cd2a8911ceab88b99da4cb9b34..31b9ea6f855fdd8bb42797c389821dc0d8c4f595 100644 (file)
@@ -57,7 +57,7 @@ class AirPlayStreamSession:
         self._lock = asyncio.Lock()
         self.start_ntp: int = 0
         self.start_time: float = 0.0
-        self.chunks_streamed: int = 0  # Total chunks sent to session (each chunk = 1 second)
+        self.seconds_streamed: float = 0  # Total seconds sent to session
         # because we reuse an existing stream session for new play_media requests,
         # we need to track when the last stream was started
         self.last_stream_started: float = 0.0
@@ -187,14 +187,14 @@ class AirPlayStreamSession:
         # Link stream session to player stream
         airplay_player.stream.session = self
 
-        # Snapshot chunks_streamed inside lock to prevent race conditions
+        # Snapshot seconds_streamed inside lock to prevent race conditions
         # Keep lock held during stream.start() to ensure player doesn't miss any chunks
         async with self._lock:
             # (re)start the player specific ffmpeg process
             await self._start_client_ffmpeg(airplay_player)
 
             # Calculate skip_seconds based on how many chunks have been sent
-            skip_seconds = self.chunks_streamed
+            skip_seconds = self.seconds_streamed
             # Start the stream at compensated NTP timestamp
             start_at = self.start_time + skip_seconds
             start_ntp = unix_time_to_ntp(start_at)
@@ -236,19 +236,12 @@ class AirPlayStreamSession:
         """Stream audio to all players."""
         generator_exhausted = False
         _last_metadata: str | None = None
-        chunk_size = self.pcm_format.pcm_sample_size
+        pcm_sample_size = self.pcm_format.pcm_sample_size
         stream_start_time = time.time()
         first_chunk_received = False
         try:
             # each chunk is exactly one second of audio data based on the pcm format.
             async for chunk in self._audio_source:
-                if len(chunk) != chunk_size:
-                    self.prov.logger.warning(
-                        "Audio source yielded chunk of unexpected size %d (expected %d), "
-                        "this may lead to desync issues",
-                        len(chunk),
-                        chunk_size,
-                    )
                 if first_chunk_received is False:
                     first_chunk_received = True
                     self.prov.logger.debug(
@@ -282,18 +275,16 @@ class AirPlayStreamSession:
 
                         if isinstance(result, asyncio.TimeoutError):
                             self.prov.logger.error(
-                                "TIMEOUT writing chunk %d to player %s - REMOVING from sync group!",
-                                self.chunks_streamed,
+                                "TIMEOUT writing chunk to player %s - REMOVING from sync group!",
                                 player.player_id,
                             )
                             players_to_remove.append(player)
                         elif isinstance(result, Exception):
                             self.prov.logger.error(
                                 (
-                                    "Error writing chunk %d to player %s: %s - "
+                                    "Error writing chunk to player %s: %s - "
                                     "REMOVING from sync group!"
                                 ),
-                                self.chunks_streamed,
                                 player.player_id,
                                 result,
                             )
@@ -312,11 +303,13 @@ class AirPlayStreamSession:
                                 self.mass.create_task(player.stream.stop())
 
                     # Update chunk counter (each chunk is exactly one second of audio)
-                    self.chunks_streamed += 1
+                    chunk_seconds = len(chunk) / pcm_sample_size
+                    self.seconds_streamed += chunk_seconds
 
                 # send metadata if changed
                 # do this in a separate task to not disturb audio streaming
                 # NOTE: we should probably move this out of the audio stream task into it's own task
+                metadata: PlayerMedia | None
                 if (
                     self.sync_clients
                     and (_leader := self.sync_clients[0])
@@ -361,12 +354,11 @@ class AirPlayStreamSession:
         """
         Write audio chunk to a specific player.
 
-        each chunk is exactly one second of audio data based on the pcm format.
+        each chunk is (in general) one second of audio data based on the pcm format.
         For late joiners, compensates for chunks sent between join time and actual chunk delivery.
         Blocks (async) until the data has been written.
         """
         write_start = time.time()
-        chunk_number = self.chunks_streamed + 1
         player_id = airplay_player.player_id
 
         # don't write a chunk if we're paused
@@ -387,9 +379,8 @@ class AirPlayStreamSession:
         # Can take up to ~4s if player's latency buffer is being drained
         if total_elapsed > 5.0:
             self.prov.logger.error(
-                "!!! STALLED WRITE: Player %s chunk %d took %.3fs total (stream write: %.3fs)",
+                "!!! STALLED WRITE: Player %s writing chunk took %.3fs total (stream write: %.3fs)",
                 player_id,
-                chunk_number,
                 total_elapsed,
                 stream_write_elapsed,
             )