return True
return self._buffer_fill_task is not None and self._buffer_fill_task.cancelled()
+ @staticmethod
+ def _cleanup_chunks(chunks: deque[bytes]) -> None:
+ """Clear chunks and run garbage collection (runs in executor)."""
+ chunks.clear()
+ gc.collect()
+
@property
def chunk_size_bytes(self) -> int:
"""Return the size in bytes of one second of PCM audio."""
with suppress(asyncio.CancelledError):
await self._inactivity_task
async with self._lock:
- self._chunks.clear()
+ # Replace the deque instead of clearing it to avoid blocking
+ # Clearing a large deque can take >100ms
+ old_chunks = self._chunks
+ self._chunks = deque()
self._discarded_chunks = 0
self._eof_received = False
self._cancelled = True # Mark buffer as cancelled
self._data_available.notify_all()
self._space_available.notify_all()
- # Run garbage collection in background to reclaim memory from large buffers
- # Don't await it to avoid blocking during task cancellation
+ # Clear the old deque and run garbage collection in background
+ # to avoid blocking the event loop
loop = asyncio.get_running_loop()
- loop.run_in_executor(None, gc.collect)
+ loop.run_in_executor(None, self._cleanup_chunks, old_chunks)
async def set_eof(self) -> None:
"""Signal that no more data will be added to the buffer."""