From: Marcel van der Veldt Date: Fri, 31 Oct 2025 01:16:05 +0000 (+0100) Subject: add AsyncNamedPipe helper X-Git-Url: https://git.kitaultman.com/?a=commitdiff_plain;h=8d9507f60bbaebb8a58ff15d19cd595badbb4dfb;p=music-assistant-server.git add AsyncNamedPipe helper --- diff --git a/music_assistant/helpers/namedpipe.py b/music_assistant/helpers/namedpipe.py new file mode 100644 index 00000000..58ee4aa6 --- /dev/null +++ b/music_assistant/helpers/namedpipe.py @@ -0,0 +1,189 @@ +"""Helper for async-friendly named pipe operations.""" + +from __future__ import annotations + +import asyncio +import fcntl +import os +from contextlib import suppress +from types import TracebackType +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from logging import Logger + + +class AsyncNamedPipeWriter: + """Async writer for named pipes (FIFOs) with non-blocking I/O. + + Handles opening named pipes in non-blocking mode and writing data + asynchronously using the event loop without consuming thread pool resources. + Automatically uses optimal write chunk size based on actual pipe buffer size. + """ + + # Default pipe buffer sizes (platform-specific) + DEFAULT_PIPE_SIZE_MACOS = 16384 # macOS default: 16KB + DEFAULT_PIPE_SIZE_LINUX = 65536 # Linux default: 64KB + + # Target pipe buffer size (512KB for ~1+ seconds of 44.1kHz stereo audio buffering) + TARGET_PIPE_SIZE = 524288 + + def __init__(self, pipe_path: str, logger: Logger | None = None) -> None: + """Initialize async named pipe writer. + + Args: + pipe_path: Path to the named pipe + logger: Optional logger for debug/error messages + """ + self.pipe_path = pipe_path + self.logger = logger + self._fd: int | None = None + self._pipe_buffer_size: int = self.DEFAULT_PIPE_SIZE_MACOS + self._write_chunk_size: int = 8192 # Conservative default (8KB) + + @property + def is_open(self) -> bool: + """Return True if the pipe is currently open.""" + return self._fd is not None + + async def open(self, increase_buffer: bool = True) -> None: + """Open the named pipe in non-blocking mode for writing. + + Args: + increase_buffer: Whether to attempt increasing the pipe buffer size + """ + if self._fd is not None: + return # Already open + + def _open() -> tuple[int, int]: + # Open pipe in non-blocking binary write mode + fd = os.open(self.pipe_path, os.O_WRONLY | os.O_NONBLOCK) + + actual_size = self._pipe_buffer_size # Default + + if increase_buffer: + try: + # macOS/Linux: query current pipe buffer size + if hasattr(fcntl, "F_GETPIPE_SZ"): + current_size = fcntl.fcntl(fd, fcntl.F_GETPIPE_SZ) + actual_size = current_size + if self.logger: + self.logger.debug( + "Pipe %s buffer size: %d bytes", self.pipe_path, current_size + ) + + # Linux only: try to set larger size if F_SETPIPE_SZ exists + if hasattr(fcntl, "F_SETPIPE_SZ"): + try: + fcntl.fcntl(fd, fcntl.F_SETPIPE_SZ, self.TARGET_PIPE_SIZE) + # Verify the new size + actual_size = fcntl.fcntl(fd, fcntl.F_GETPIPE_SZ) + if self.logger: + self.logger.info( + "Pipe %s buffer increased to %d bytes", + self.pipe_path, + actual_size, + ) + except OSError as e: + if self.logger: + self.logger.debug("Could not increase pipe buffer size: %s", e) + except (OSError, AttributeError) as e: + if self.logger: + self.logger.debug("Cannot query/adjust pipe buffer size: %s", e) + + return fd, actual_size + + self._fd, self._pipe_buffer_size = await asyncio.to_thread(_open) + + # Set write chunk size based on actual pipe buffer size + # Use 1/4 of pipe buffer to avoid filling it completely in one write + # This allows better flow control and prevents blocking + self._write_chunk_size = max(8192, self._pipe_buffer_size // 4) + + if self.logger: + self.logger.debug( + "Pipe %s opened: buffer=%d bytes, write_chunk_size=%d bytes", + self.pipe_path, + self._pipe_buffer_size, + self._write_chunk_size, + ) + + async def write(self, data: bytes, timeout_per_wait: float = 0.1) -> None: + """Write data to the named pipe asynchronously. + + Writes data in chunks sized according to the pipe's buffer capacity. + Uses non-blocking I/O with event loop (no thread pool consumption). + + Args: + data: Data to write to the pipe + timeout_per_wait: Timeout for each wait iteration (default: 100ms) + + Raises: + RuntimeError: If pipe is not open + TimeoutError: If pipe write is blocked for too long (>400 waits) + """ + if self._fd is None: + raise RuntimeError(f"Pipe {self.pipe_path} is not open") + + bytes_written = 0 + wait_count = 0 + + while bytes_written < len(data): + # Write up to write_chunk_size bytes at a time + to_write = min(self._write_chunk_size, len(data) - bytes_written) + try: + # Try non-blocking write + n = os.write(self._fd, data[bytes_written : bytes_written + to_write]) + bytes_written += n + wait_count = 0 # Reset wait counter on successful write + except BlockingIOError: + # Pipe buffer is full, wait until writable + wait_count += 1 + if wait_count > 400: # Too many waits (~40+ seconds at 100ms each) + raise TimeoutError( + f"Pipe write blocked after {wait_count} waits on {self.pipe_path}" + ) + + loop = asyncio.get_event_loop() + future: asyncio.Future[None] = loop.create_future() + assert self._fd is not None # Already checked at method entry + fd = self._fd # Capture fd for closure + + def on_writable( + _loop: asyncio.AbstractEventLoop = loop, + _future: asyncio.Future[None] = future, + _fd: int = fd, + ) -> None: + _loop.remove_writer(_fd) + if not _future.done(): + _future.set_result(None) + + loop.add_writer(fd, on_writable) + try: + await asyncio.wait_for(future, timeout=timeout_per_wait) + except TimeoutError: + loop.remove_writer(fd) + # Continue loop - will hit wait_count limit if truly stuck + + async def close(self) -> None: + """Close the named pipe.""" + if self._fd is None: + return + + with suppress(Exception): + await asyncio.to_thread(os.close, self._fd) + self._fd = None + + async def __aenter__(self) -> AsyncNamedPipeWriter: + """Context manager entry.""" + await self.open() + return self + + async def __aexit__( + self, + exc_type: type[BaseException] | None, + exc_val: BaseException | None, + exc_tb: TracebackType | None, + ) -> None: + """Context manager exit.""" + await self.close()