get_stream_details,
resample_pcm_audio,
)
-from music_assistant.helpers.buffered_generator import use_buffer
+from music_assistant.helpers.buffered_generator import use_audio_buffer
from music_assistant.helpers.ffmpeg import LOGGER as FFMPEG_LOGGER
from music_assistant.helpers.ffmpeg import check_ffmpeg_version, get_ffmpeg_stream
from music_assistant.helpers.smart_fades import (
)
# Start periodic garbage collection task
# This ensures memory from audio buffers and streams is cleaned up regularly
- self.mass.call_later(900, self._periodic_garbage_collection) # 15 minutes
+ # DISABLED FOR TESTING - may cause event loop blocking
+ # self.mass.call_later(900, self._periodic_garbage_collection) # 15 minutes
async def close(self) -> None:
"""Cleanup on exit."""
# like https hosts and it also offers the pre-announce 'bell'
return f"{self.base_url}/announcement/{player_id}.{content_type.value}"
- @use_buffer(30, 1)
+ @use_audio_buffer(buffer_size=30, min_buffer_before_yield=4)
async def get_queue_flow_stream(
self,
queue: PlayerQueue,
start_queue_item: QueueItem,
pcm_format: AudioFormat,
) -> AsyncGenerator[bytes, None]:
- """Get a flow stream of all tracks in the queue as raw PCM audio."""
+ """
+ Get a flow stream of all tracks in the queue as raw PCM audio.
+
+ yields chunks of exactly 1 second of audio in the given pcm_format.
+ """
# ruff: noqa: PLR0915
assert pcm_format.content_type.is_pcm()
queue_track = None
buffer = b""
# handle incoming audio chunks
first_chunk_received = False
+ buffer_filled = False
async for chunk in self.get_queue_item_stream(
queue_track,
pcm_format=pcm_format,
del chunk
if len(buffer) < req_buffer_size:
# buffer is not full enough, move on
+ # yield control to event loop to prevent blocking pipe writes
+ # use 10ms delay to ensure I/O operations can complete
+ await asyncio.sleep(0.01)
continue
+ if not buffer_filled and last_fadeout_part:
+ buffer_filled = True
+
#### HANDLE CROSSFADE OF PREVIOUS TRACK AND NEW TRACK
if last_fadeout_part and last_streamdetails:
# perform crossfade
last_play_log_entry.seconds_streamed += (
crossfade_part_len / 2 / pcm_sample_size
)
- # send crossfade_part (as one big chunk)
+ # yield crossfade_part - buffered_generator will rechunk to 1-second
yield crossfade_part
del crossfade_part
# also write the leftover bytes from the crossfade action
self.mass.create_task(music_prov.on_streamed(streamdetails))
# Periodic GC task will handle memory cleanup every 15 minutes
- @use_buffer(30, 1)
+ @use_audio_buffer(buffer_size=30, min_buffer_before_yield=4)
async def get_queue_item_stream_with_smartfade(
self,
queue_item: QueueItem,
-"""Helper for adding buffering to async generators."""
+"""Helper for adding buffering to async audio generators."""
from __future__ import annotations
from functools import wraps
from typing import Any, Final, ParamSpec
-from music_assistant.helpers.util import close_async_generator, empty_queue
+from music_assistant_models.streamdetails import AudioFormat
+
+from music_assistant.helpers.util import close_async_generator
# Type variables for the buffered decorator
_P = ParamSpec("_P")
_ACTIVE_PRODUCER_TASKS: set[asyncio.Task[Any]] = set()
-async def buffered(
+async def buffered_audio(
generator: AsyncGenerator[bytes, None],
+ pcm_format: AudioFormat,
buffer_size: int = DEFAULT_BUFFER_SIZE,
min_buffer_before_yield: int = DEFAULT_MIN_BUFFER_BEFORE_YIELD,
) -> AsyncGenerator[bytes, None]:
"""
- Add buffering to an async generator that yields bytes.
+ Add buffering to an async audio generator that yields PCM audio bytes.
+
+ This function uses a shared buffer with asyncio.Condition to decouple the producer
+ (reading from the stream) from the consumer (yielding to the client).
- This function uses an asyncio.Queue to decouple the producer (reading from the stream)
- from the consumer (yielding to the client). The producer runs in a separate task and
- fills the buffer, while the consumer yields from the buffer.
+ Ensures chunks yielded to the consumer are exactly 1 second of audio
+ (critical for sync timing calculations).
Args:
generator: The async generator to buffer
- buffer_size: Maximum number of chunks to buffer (default: 30)
+ pcm_format: AudioFormat - defines chunk size for 1-second audio chunks
+ buffer_size: Maximum number of 1-second chunks to buffer (default: 30)
min_buffer_before_yield: Minimum chunks to buffer before starting to yield (default: 5)
Example:
- async for chunk in buffered(my_generator(), buffer_size=100):
+ async for chunk in buffered_audio(my_generator(), pcm_format, buffer_size=100):
+ # Each chunk is exactly 1 second of audio
process(chunk)
"""
- buffer: asyncio.Queue[bytes | None] = asyncio.Queue(maxsize=buffer_size)
+ # Shared state between producer and consumer
+ data_buffer = bytearray() # Shared buffer for audio data
+ condition = asyncio.Condition() # Synchronization primitive
producer_error: Exception | None = None
- threshold_reached = asyncio.Event()
- cancelled = asyncio.Event()
+ producer_done = False
+ cancelled = False
+
+ # Calculate chunk size and buffer limits
+ chunk_size = pcm_format.pcm_sample_size # Size of 1 second of audio
+ max_buffer_bytes = buffer_size * chunk_size
if buffer_size <= 1:
# No buffering needed, yield directly
async def producer() -> None:
"""Read from the original generator and fill the buffer."""
- nonlocal producer_error
+ nonlocal producer_error, producer_done, cancelled
generator_consumed = False
try:
async for chunk in generator:
generator_consumed = True
- if cancelled.is_set():
- # Consumer has stopped, exit cleanly
+ if cancelled:
break
- await buffer.put(chunk)
- if not threshold_reached.is_set() and buffer.qsize() >= min_buffer_before_yield:
- threshold_reached.set()
- # Yield to event loop every chunk to prevent blocking
- await asyncio.sleep(0)
+
+ # Wait if buffer is too full
+ async with condition:
+ while len(data_buffer) >= max_buffer_bytes and not cancelled:
+ await condition.wait()
+
+ if cancelled:
+ break
+
+ # Append to shared buffer
+ data_buffer.extend(chunk)
+ # Notify consumer that data is available
+ condition.notify()
+
+ # Yield to event loop to prevent blocking
+ # Use 10ms delay to ensure I/O operations (pipe writes) can complete
+ await asyncio.sleep(0.01)
+
except Exception as err:
producer_error = err
if isinstance(err, asyncio.CancelledError):
raise
finally:
- threshold_reached.set()
# Clean up the generator if needed
if not generator_consumed:
await close_async_generator(generator)
- # Signal end of stream by putting None
- # We must wait for space in the queue if needed, otherwise the consumer may
- # hang waiting for data that will never come
- if not cancelled.is_set():
- await buffer.put(None)
+ # Signal end of stream
+ async with condition:
+ producer_done = True
+ condition.notify()
# Start the producer task
loop = asyncio.get_running_loop()
# Remove from set when done
producer_task.add_done_callback(_ACTIVE_PRODUCER_TASKS.discard)
+ # Calculate minimum buffer level before yielding
+ min_buffer_bytes = min_buffer_before_yield * chunk_size
+
try:
# Wait for initial buffer to fill
- await threshold_reached.wait()
+ async with condition:
+ while len(data_buffer) < min_buffer_bytes and not producer_done:
+ await condition.wait()
- # Consume from buffer and yield
+ # Consume from buffer and yield 1-second audio chunks
while True:
- data = await buffer.get()
- if data is None:
- # End of stream
- if producer_error:
- raise producer_error
- break
- yield data
+ async with condition:
+ # Wait for enough data or end of stream
+ while len(data_buffer) < chunk_size and not producer_done:
+ await condition.wait()
+
+ # Check if we're done
+ if len(data_buffer) < chunk_size and producer_done:
+ # Yield any remaining partial chunk
+ if data_buffer:
+ chunk = bytes(data_buffer)
+ data_buffer.clear()
+ condition.notify()
+ yield chunk
+ if producer_error:
+ raise producer_error
+ break
+
+ # Extract exactly 1 second of audio
+ chunk = bytes(data_buffer[:chunk_size])
+ del data_buffer[:chunk_size]
+
+ # Notify producer that space is available
+ condition.notify()
+
+ # Yield outside the lock to avoid holding it during I/O
+ yield chunk
finally:
# Signal the producer to stop
- cancelled.set()
- # Drain the queue to unblock the producer if it's waiting on put()
- empty_queue(buffer)
+ async with condition:
+ cancelled = True
+ condition.notify()
# Wait for the producer to finish cleanly with a timeout to prevent blocking
with contextlib.suppress(asyncio.CancelledError, RuntimeError, asyncio.TimeoutError):
await asyncio.wait_for(asyncio.shield(producer_task), timeout=1.0)
-def use_buffer(
+def use_audio_buffer(
+ pcm_format_arg: str = "pcm_format",
buffer_size: int = DEFAULT_BUFFER_SIZE,
min_buffer_before_yield: int = DEFAULT_MIN_BUFFER_BEFORE_YIELD,
) -> Callable[
Callable[_P, AsyncGenerator[bytes, None]],
]:
"""
- Add buffering to async generator functions that yield bytes (decorator).
+ Add buffering to async audio generator functions that yield PCM audio bytes (decorator).
- This decorator uses an asyncio.Queue to decouple the producer (reading from the stream)
- from the consumer (yielding to the client). The producer runs in a separate task and
- fills the buffer, while the consumer yields from the buffer.
+ This decorator uses a shared buffer with asyncio.Condition to decouple the producer
+ (reading from the stream) from the consumer (yielding to the client).
+
+ Ensures chunks yielded are exactly 1 second of audio (critical for sync timing).
Args:
- buffer_size: Maximum number of chunks to buffer (default: 30)
+ pcm_format_arg: Name of the argument containing AudioFormat (default: "pcm_format")
+ buffer_size: Maximum number of 1-second chunks to buffer (default: 30)
min_buffer_before_yield: Minimum chunks to buffer before starting to yield (default: 5)
Example:
- @use_buffer(buffer_size=100)
- async def my_stream() -> AsyncGenerator[bytes, None]:
+ @use_audio_buffer(pcm_format_arg="pcm_format", buffer_size=100)
+ async def my_stream(pcm_format: AudioFormat) -> AsyncGenerator[bytes, None]:
+ # Generator can yield variable-sized chunks
+ # Decorator ensures output is exactly 1-second chunks
...
"""
) -> Callable[_P, AsyncGenerator[bytes, None]]:
@wraps(func)
async def wrapper(*args: _P.args, **kwargs: _P.kwargs) -> AsyncGenerator[bytes, None]:
- async for chunk in buffered(
+ # Extract pcm_format from function arguments
+ pcm_format = kwargs.get(pcm_format_arg)
+ if pcm_format is None:
+ msg = f"Audio buffer decorator requires '{pcm_format_arg}' argument"
+ raise ValueError(msg)
+
+ async for chunk in buffered_audio(
func(*args, **kwargs),
+ pcm_format=pcm_format,
buffer_size=buffer_size,
min_buffer_before_yield=min_buffer_before_yield,
):