From: Marcel van der Veldt Date: Sun, 2 Nov 2025 21:54:57 +0000 (+0100) Subject: Simplify named pipe approach X-Git-Url: https://git.kitaultman.com/?a=commitdiff_plain;h=c534a0598377160dfd35357a2ce0f3687ce14a74;p=music-assistant-server.git Simplify named pipe approach --- diff --git a/music_assistant/helpers/named_pipe.py b/music_assistant/helpers/named_pipe.py index be0b394e..b5339826 100644 --- a/music_assistant/helpers/named_pipe.py +++ b/music_assistant/helpers/named_pipe.py @@ -6,7 +6,6 @@ import asyncio import os import time from contextlib import suppress -from types import TracebackType from typing import TYPE_CHECKING if TYPE_CHECKING: @@ -25,45 +24,21 @@ class AsyncNamedPipeWriter: """ self._pipe_path = pipe_path self.logger = logger - self._fd: int | None = None @property def path(self) -> str: """Return the named pipe path.""" return self._pipe_path - @property - def is_open(self) -> bool: - """Return True if the pipe is currently open.""" - return self._fd is not None - async def create(self) -> None: """Create the named pipe (if it does not exist).""" def _create() -> None: with suppress(FileExistsError): - os.mkfifo(self._pipe_path, 0o600) + os.mkfifo(self._pipe_path) await asyncio.to_thread(_create) - async def open(self) -> None: - """Open the named pipe for writing (blocking operation runs in thread).""" - if self._fd is not None: - return # Already open - - def _open() -> int: - # Open pipe in BLOCKING write mode (simple approach) - # - open() blocks until the reader (cliraop) connects - # - write() blocks if pipe buffer is full - # Both operations run in thread pool via asyncio.to_thread(), - # so they won't block the event loop - return os.open(self._pipe_path, os.O_WRONLY) - - self._fd = await asyncio.to_thread(_open) - - if self.logger: - self.logger.debug("Pipe %s opened for writing", self._pipe_path) - async def write(self, data: bytes, log_slow_writes: bool = True) -> None: """Write data to the named pipe (blocking operation runs in thread). @@ -74,18 +49,11 @@ class AsyncNamedPipeWriter: Raises: RuntimeError: If pipe is not open """ - if self._fd is None: - raise RuntimeError(f"Pipe {self._pipe_path} is not open") - start_time = time.time() def _write() -> None: - assert self._fd is not None - # Write all data (may block if pipe buffer is full) - bytes_written = 0 - while bytes_written < len(data): - n = os.write(self._fd, data[bytes_written:]) - bytes_written += n + with open(self._pipe_path, "wb", buffering=0) as pipe_file: + pipe_file.write(data) # Run blocking write in thread pool await asyncio.to_thread(_write) @@ -101,23 +69,8 @@ class AsyncNamedPipeWriter: self._pipe_path, ) - async def close(self) -> None: - """Close the named pipe.""" - if self._fd is None: - return - - fd = self._fd - self._fd = None - - def _close() -> None: - with suppress(Exception): - os.close(fd) - - await asyncio.to_thread(_close) - async def remove(self) -> None: """Remove the named pipe.""" - await self.close() def _remove() -> None: with suppress(Exception): @@ -125,20 +78,6 @@ class AsyncNamedPipeWriter: await asyncio.to_thread(_remove) - async def __aenter__(self) -> AsyncNamedPipeWriter: - """Context manager entry.""" - await self.open() - return self - - async def __aexit__( - self, - exc_type: type[BaseException] | None, - exc_val: BaseException | None, - exc_tb: TracebackType | None, - ) -> None: - """Context manager exit.""" - await self.close() - def __str__(self) -> str: """Return string representation.""" return self._pipe_path diff --git a/music_assistant/providers/airplay/bin/cliraop-macos-arm64 b/music_assistant/providers/airplay/bin/cliraop-macos-arm64 index 4279b317..c630463a 100755 Binary files a/music_assistant/providers/airplay/bin/cliraop-macos-arm64 and b/music_assistant/providers/airplay/bin/cliraop-macos-arm64 differ diff --git a/music_assistant/providers/airplay/protocols/_protocol.py b/music_assistant/providers/airplay/protocols/_protocol.py index cc72940f..93a57d00 100644 --- a/music_assistant/providers/airplay/protocols/_protocol.py +++ b/music_assistant/providers/airplay/protocols/_protocol.py @@ -2,7 +2,6 @@ from __future__ import annotations -import asyncio import time from abc import ABC, abstractmethod from random import randint @@ -65,7 +64,6 @@ class AirPlayProtocol(ABC): self.logger, ) # State tracking - self._started = asyncio.Event() self._stopped = False self._total_bytes_sent = 0 self._stream_bytes_sent = 0 @@ -73,12 +71,7 @@ class AirPlayProtocol(ABC): @property def running(self) -> bool: """Return boolean if this stream is running.""" - return ( - not self._stopped - and self._started.is_set() - and self._cli_proc is not None - and not self._cli_proc.closed - ) + return not self._stopped and self._cli_proc is not None and not self._cli_proc.closed @abstractmethod async def start(self, start_ntp: int) -> None: @@ -119,11 +112,6 @@ class AirPlayProtocol(ABC): if not self.commands_pipe: return - if not self.commands_pipe.is_open: - await self.commands_pipe.open() - - await self._started.wait() - self.player.logger.log(VERBOSE_LOG_LEVEL, "sending command %s", command) self.player.last_command_sent = time.time() await self.commands_pipe.write(command.encode("utf-8")) diff --git a/music_assistant/providers/airplay/protocols/raop.py b/music_assistant/providers/airplay/protocols/raop.py index 24bf3a49..5be509d5 100644 --- a/music_assistant/providers/airplay/protocols/raop.py +++ b/music_assistant/providers/airplay/protocols/raop.py @@ -39,16 +39,6 @@ class RaopStream(AirPlayProtocol): supports_pairing = True _stderr_reader_task: asyncio.Task[None] | None = None - @property - def running(self) -> bool: - """Return boolean if this stream is running.""" - return ( - not self._stopped - and self._started.is_set() - and self._cli_proc is not None - and not self._cli_proc.closed - ) - async def start(self, start_ntp: int) -> None: """Initialize CLIRaop process for a player.""" assert self.player.discovery_info is not None # for type checker @@ -122,7 +112,6 @@ class RaopStream(AirPlayProtocol): self.player.logger.debug(line) if "connected to " in line: self.player.logger.info("AirPlay device connected. Starting playback.") - self._started.set() break if "Cannot connect to AirPlay device" in line: raise PlayerCommandFailed("Cannot connect to AirPlay device") diff --git a/music_assistant/providers/airplay/stream_session.py b/music_assistant/providers/airplay/stream_session.py index 31b9ea6f..f7118b36 100644 --- a/music_assistant/providers/airplay/stream_session.py +++ b/music_assistant/providers/airplay/stream_session.py @@ -61,7 +61,7 @@ class AirPlayStreamSession: # because we reuse an existing stream session for new play_media requests, # we need to track when the last stream was started self.last_stream_started: float = 0.0 - self._clients_ready_event = asyncio.Event() + self._clients_ready = asyncio.Event() async def start(self) -> None: """Initialize stream session for all players.""" @@ -98,13 +98,13 @@ class AirPlayStreamSession: # create the named pipes await airplay_player.stream.audio_pipe.create() await airplay_player.stream.commands_pipe.create() - # start the stream - await airplay_player.stream.start(self.start_ntp) # start the (player-specific) ffmpeg process # note that ffmpeg will open the named pipe for writing await self._start_client_ffmpeg(airplay_player) - # open the command pipe for writing - await airplay_player.stream.commands_pipe.open() + + # start the stream + await airplay_player.stream.start(self.start_ntp) + # repeat sending the volume level to the player because some players seem # to ignore it the first time # https://github.com/music-assistant/support/issues/3330 @@ -114,7 +114,7 @@ class AirPlayStreamSession: for _airplay_player in self.sync_clients: tm.create_task(_start_client(_airplay_player)) # All clients started - self._clients_ready_event.set() + self._clients_ready.set() async def stop(self) -> None: """Stop playback and cleanup.""" @@ -186,13 +186,13 @@ class AirPlayStreamSession: # Link stream session to player stream airplay_player.stream.session = self + # create the named pipes + await airplay_player.stream.audio_pipe.create() + await airplay_player.stream.commands_pipe.create() # Snapshot seconds_streamed inside lock to prevent race conditions # Keep lock held during stream.start() to ensure player doesn't miss any chunks async with self._lock: - # (re)start the player specific ffmpeg process - await self._start_client_ffmpeg(airplay_player) - # Calculate skip_seconds based on how many chunks have been sent skip_seconds = self.seconds_streamed # Start the stream at compensated NTP timestamp @@ -207,6 +207,8 @@ class AirPlayStreamSession: if airplay_player not in self.sync_clients: self.sync_clients.append(airplay_player) + # start the player specific ffmpeg process + await self._start_client_ffmpeg(airplay_player) await airplay_player.stream.start(start_ntp) async def replace_stream(self, audio_source: AsyncGenerator[bytes, None]) -> None: @@ -251,7 +253,7 @@ class AirPlayStreamSession: time.time() - self.start_time, ) # wait until the clients are ready to receive audio - await asyncio.wait_for(self._clients_ready_event.wait(), timeout=10) + await asyncio.wait_for(self._clients_ready.wait(), timeout=10) async with self._lock: sync_clients = [x for x in self.sync_clients if x.stream and x.stream.running] if not sync_clients: @@ -370,6 +372,8 @@ class AirPlayStreamSession: # and outputs in the correct format for the player stream # to the named pipe associated with the player's stream if ffmpeg := self._player_ffmpeg.get(player_id): + if ffmpeg.closed: + raise RuntimeError(f"FFMpeg process for player {player_id} is closed") await ffmpeg.write(chunk) stream_write_start = time.time()