-"""Helper for adding buffering to async audio generators."""
+"""Helper for adding buffering to async (audio) generators."""
from __future__ import annotations
import contextlib
from collections.abc import AsyncGenerator, Callable
from functools import wraps
-from typing import TYPE_CHECKING, Any, Final, ParamSpec, cast
+from typing import Any, Final, ParamSpec
-from music_assistant.helpers.util import close_async_generator
-
-if TYPE_CHECKING:
- from music_assistant_models.media_items.audio_format import AudioFormat
+from music_assistant.helpers.util import close_async_generator, empty_queue
# Type variables for the buffered decorator
_P = ParamSpec("_P")
_ACTIVE_PRODUCER_TASKS: set[asyncio.Task[Any]] = set()
-async def buffered_audio( # noqa: PLR0915
+async def buffered(
generator: AsyncGenerator[bytes, None],
- pcm_format: AudioFormat,
buffer_size: int = DEFAULT_BUFFER_SIZE,
min_buffer_before_yield: int = DEFAULT_MIN_BUFFER_BEFORE_YIELD,
) -> AsyncGenerator[bytes, None]:
"""
- Add buffering to an async audio generator that yields PCM audio bytes.
-
- This function uses a shared buffer with asyncio.Condition to decouple the producer
- (reading from the stream) from the consumer (yielding to the client).
+ Add buffering to an async generator that yields (chunks of) bytes.
- Ensures chunks yielded to the consumer are exactly 1 second of audio
- (critical for sync timing calculations).
+ This function uses an asyncio.Queue to decouple the producer (reading from the stream)
+ from the consumer (yielding to the client). The producer runs in a separate task and
+ fills the buffer, while the consumer yields from the buffer.
Args:
generator: The async generator to buffer
- pcm_format: AudioFormat - defines chunk size for 1-second audio chunks
- buffer_size: Maximum number of 1-second chunks to buffer (default: 30)
+ buffer_size: Maximum number of chunks to buffer (default: 30)
min_buffer_before_yield: Minimum chunks to buffer before starting to yield (default: 5)
Example:
- async for chunk in buffered_audio(my_generator(), pcm_format, buffer_size=100):
- # Each chunk is exactly 1 second of audio
+ async for chunk in buffered(my_generator(), buffer_size=100):
process(chunk)
"""
- # Shared state between producer and consumer
- data_buffer = bytearray() # Shared buffer for audio data
- condition = asyncio.Condition() # Synchronization primitive
+ buffer: asyncio.Queue[bytes | None] = asyncio.Queue(maxsize=buffer_size)
producer_error: Exception | None = None
- producer_done = False
- cancelled = False
-
- # Calculate chunk size and buffer limits
- chunk_size = pcm_format.pcm_sample_size # Size of 1 second of audio
- max_buffer_bytes = buffer_size * chunk_size
+ threshold_reached = asyncio.Event()
+ cancelled = asyncio.Event()
if buffer_size <= 1:
# No buffering needed, yield directly
async def producer() -> None:
"""Read from the original generator and fill the buffer."""
- nonlocal producer_error, producer_done, cancelled
+ nonlocal producer_error
generator_consumed = False
try:
async for chunk in generator:
generator_consumed = True
- if cancelled:
+ if cancelled.is_set():
+ # Consumer has stopped, exit cleanly
break
-
- # Wait if buffer is too full
- async with condition:
- while len(data_buffer) >= max_buffer_bytes and not cancelled:
- await condition.wait()
-
- if cancelled:
- break # type: ignore[unreachable]
-
- # Append to shared buffer
- data_buffer.extend(chunk)
- # Notify consumer that data is available
- condition.notify()
-
- # Yield to event loop to prevent blocking
- # Use 10ms delay to ensure I/O operations (pipe writes) can complete
- await asyncio.sleep(0.01)
-
+ await buffer.put(chunk)
+ if not threshold_reached.is_set() and buffer.qsize() >= min_buffer_before_yield:
+ threshold_reached.set()
+ # Yield to event loop every chunk to prevent blocking
+ await asyncio.sleep(0)
except Exception as err:
producer_error = err
if isinstance(err, asyncio.CancelledError):
raise
finally:
+ threshold_reached.set()
# Clean up the generator if needed
if not generator_consumed:
await close_async_generator(generator)
- # Signal end of stream
- async with condition:
- producer_done = True
- condition.notify()
+ # Signal end of stream by putting None
+ # We must wait for space in the queue if needed, otherwise the consumer may
+ # hang waiting for data that will never come
+ if not cancelled.is_set():
+ await buffer.put(None)
# Start the producer task
loop = asyncio.get_running_loop()
# Remove from set when done
producer_task.add_done_callback(_ACTIVE_PRODUCER_TASKS.discard)
- # Calculate minimum buffer level before yielding
- min_buffer_bytes = min_buffer_before_yield * chunk_size
-
try:
# Wait for initial buffer to fill
- async with condition:
- while len(data_buffer) < min_buffer_bytes and not producer_done:
- await condition.wait()
+ await threshold_reached.wait()
- # Consume from buffer and yield 1-second audio chunks
+ # Consume from buffer and yield
while True:
- async with condition:
- # Wait for enough data or end of stream
- while len(data_buffer) < chunk_size and not producer_done:
- await condition.wait()
-
- # Check if we're done
- if len(data_buffer) < chunk_size and producer_done:
- # Yield any remaining partial chunk
- if data_buffer:
- chunk = bytes(data_buffer)
- data_buffer.clear()
- condition.notify()
- yield chunk
- if producer_error:
- raise producer_error
- break
-
- # Extract exactly 1 second of audio
- chunk = bytes(data_buffer[:chunk_size])
- del data_buffer[:chunk_size]
-
- # Notify producer that space is available
- condition.notify()
-
- # Yield outside the lock to avoid holding it during I/O
- yield chunk
+ data = await buffer.get()
+ if data is None:
+ # End of stream
+ if producer_error:
+ raise producer_error
+ break
+ yield data
finally:
# Signal the producer to stop
- async with condition:
- cancelled = True
- condition.notify()
+ cancelled.set()
+ # Drain the queue to unblock the producer if it's waiting on put()
+ empty_queue(buffer)
# Wait for the producer to finish cleanly with a timeout to prevent blocking
with contextlib.suppress(asyncio.CancelledError, RuntimeError, asyncio.TimeoutError):
await asyncio.wait_for(asyncio.shield(producer_task), timeout=1.0)
-def use_audio_buffer(
- pcm_format_arg: str = "pcm_format",
+def use_buffer(
buffer_size: int = DEFAULT_BUFFER_SIZE,
min_buffer_before_yield: int = DEFAULT_MIN_BUFFER_BEFORE_YIELD,
) -> Callable[
Callable[_P, AsyncGenerator[bytes, None]],
]:
"""
- Add buffering to async audio generator functions that yield PCM audio bytes (decorator).
+ Add buffering to async generator functions that yield bytes (decorator).
- This decorator uses a shared buffer with asyncio.Condition to decouple the producer
- (reading from the stream) from the consumer (yielding to the client).
-
- Ensures chunks yielded are exactly 1 second of audio (critical for sync timing).
+ This decorator uses an asyncio.Queue to decouple the producer (reading from the stream)
+ from the consumer (yielding to the client). The producer runs in a separate task and
+ fills the buffer, while the consumer yields from the buffer.
Args:
- pcm_format_arg: Name of the argument containing AudioFormat (default: "pcm_format")
- buffer_size: Maximum number of 1-second chunks to buffer (default: 30)
+ buffer_size: Maximum number of chunks to buffer (default: 30)
min_buffer_before_yield: Minimum chunks to buffer before starting to yield (default: 5)
Example:
- @use_audio_buffer(pcm_format_arg="pcm_format", buffer_size=100)
- async def my_stream(pcm_format: AudioFormat) -> AsyncGenerator[bytes, None]:
- # Generator can yield variable-sized chunks
- # Decorator ensures output is exactly 1-second chunks
+ @use_buffer(buffer_size=100)
+ async def my_stream() -> AsyncGenerator[bytes, None]:
...
"""
) -> Callable[_P, AsyncGenerator[bytes, None]]:
@wraps(func)
async def wrapper(*args: _P.args, **kwargs: _P.kwargs) -> AsyncGenerator[bytes, None]:
- # Extract pcm_format from function arguments
- pcm_format = cast("AudioFormat | None", kwargs.get(pcm_format_arg))
- if pcm_format is None:
- msg = f"Audio buffer decorator requires '{pcm_format_arg}' argument"
- raise ValueError(msg)
-
- async for chunk in buffered_audio(
+ async for chunk in buffered(
func(*args, **kwargs),
- pcm_format=pcm_format,
buffer_size=buffer_size,
min_buffer_before_yield=min_buffer_before_yield,
):
self._lock = asyncio.Lock()
self.start_ntp: int = 0
self.start_time: float = 0.0
- self.chunks_streamed: int = 0 # Total chunks sent to session (each chunk = 1 second)
+ self.seconds_streamed: float = 0 # Total seconds sent to session
# because we reuse an existing stream session for new play_media requests,
# we need to track when the last stream was started
self.last_stream_started: float = 0.0
# Link stream session to player stream
airplay_player.stream.session = self
- # Snapshot chunks_streamed inside lock to prevent race conditions
+ # Snapshot seconds_streamed inside lock to prevent race conditions
# Keep lock held during stream.start() to ensure player doesn't miss any chunks
async with self._lock:
# (re)start the player specific ffmpeg process
await self._start_client_ffmpeg(airplay_player)
# Calculate skip_seconds based on how many chunks have been sent
- skip_seconds = self.chunks_streamed
+ skip_seconds = self.seconds_streamed
# Start the stream at compensated NTP timestamp
start_at = self.start_time + skip_seconds
start_ntp = unix_time_to_ntp(start_at)
"""Stream audio to all players."""
generator_exhausted = False
_last_metadata: str | None = None
- chunk_size = self.pcm_format.pcm_sample_size
+ pcm_sample_size = self.pcm_format.pcm_sample_size
stream_start_time = time.time()
first_chunk_received = False
try:
# each chunk is exactly one second of audio data based on the pcm format.
async for chunk in self._audio_source:
- if len(chunk) != chunk_size:
- self.prov.logger.warning(
- "Audio source yielded chunk of unexpected size %d (expected %d), "
- "this may lead to desync issues",
- len(chunk),
- chunk_size,
- )
if first_chunk_received is False:
first_chunk_received = True
self.prov.logger.debug(
if isinstance(result, asyncio.TimeoutError):
self.prov.logger.error(
- "TIMEOUT writing chunk %d to player %s - REMOVING from sync group!",
- self.chunks_streamed,
+ "TIMEOUT writing chunk to player %s - REMOVING from sync group!",
player.player_id,
)
players_to_remove.append(player)
elif isinstance(result, Exception):
self.prov.logger.error(
(
- "Error writing chunk %d to player %s: %s - "
+ "Error writing chunk to player %s: %s - "
"REMOVING from sync group!"
),
- self.chunks_streamed,
player.player_id,
result,
)
self.mass.create_task(player.stream.stop())
# Update chunk counter (each chunk is exactly one second of audio)
- self.chunks_streamed += 1
+ chunk_seconds = len(chunk) / pcm_sample_size
+ self.seconds_streamed += chunk_seconds
# send metadata if changed
# do this in a separate task to not disturb audio streaming
# NOTE: we should probably move this out of the audio stream task into it's own task
+ metadata: PlayerMedia | None
if (
self.sync_clients
and (_leader := self.sync_clients[0])
"""
Write audio chunk to a specific player.
- each chunk is exactly one second of audio data based on the pcm format.
+ each chunk is (in general) one second of audio data based on the pcm format.
For late joiners, compensates for chunks sent between join time and actual chunk delivery.
Blocks (async) until the data has been written.
"""
write_start = time.time()
- chunk_number = self.chunks_streamed + 1
player_id = airplay_player.player_id
# don't write a chunk if we're paused
# Can take up to ~4s if player's latency buffer is being drained
if total_elapsed > 5.0:
self.prov.logger.error(
- "!!! STALLED WRITE: Player %s chunk %d took %.3fs total (stream write: %.3fs)",
+ "!!! STALLED WRITE: Player %s writing chunk took %.3fs total (stream write: %.3fs)",
player_id,
- chunk_number,
total_elapsed,
stream_write_elapsed,
)