Simplify named pipe approach
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Sun, 2 Nov 2025 21:54:57 +0000 (22:54 +0100)
committerMarcel van der Veldt <m.vanderveldt@outlook.com>
Sun, 2 Nov 2025 21:54:57 +0000 (22:54 +0100)
music_assistant/helpers/named_pipe.py
music_assistant/providers/airplay/bin/cliraop-macos-arm64
music_assistant/providers/airplay/protocols/_protocol.py
music_assistant/providers/airplay/protocols/raop.py
music_assistant/providers/airplay/stream_session.py

index be0b394edc1d14e285d9c66c608ced50ad59562b..b533982682c071034c61b61ee3c7dd4971cf1057 100644 (file)
@@ -6,7 +6,6 @@ import asyncio
 import os
 import time
 from contextlib import suppress
-from types import TracebackType
 from typing import TYPE_CHECKING
 
 if TYPE_CHECKING:
@@ -25,45 +24,21 @@ class AsyncNamedPipeWriter:
         """
         self._pipe_path = pipe_path
         self.logger = logger
-        self._fd: int | None = None
 
     @property
     def path(self) -> str:
         """Return the named pipe path."""
         return self._pipe_path
 
-    @property
-    def is_open(self) -> bool:
-        """Return True if the pipe is currently open."""
-        return self._fd is not None
-
     async def create(self) -> None:
         """Create the named pipe (if it does not exist)."""
 
         def _create() -> None:
             with suppress(FileExistsError):
-                os.mkfifo(self._pipe_path, 0o600)
+                os.mkfifo(self._pipe_path)
 
         await asyncio.to_thread(_create)
 
-    async def open(self) -> None:
-        """Open the named pipe for writing (blocking operation runs in thread)."""
-        if self._fd is not None:
-            return  # Already open
-
-        def _open() -> int:
-            # Open pipe in BLOCKING write mode (simple approach)
-            # - open() blocks until the reader (cliraop) connects
-            # - write() blocks if pipe buffer is full
-            # Both operations run in thread pool via asyncio.to_thread(),
-            # so they won't block the event loop
-            return os.open(self._pipe_path, os.O_WRONLY)
-
-        self._fd = await asyncio.to_thread(_open)
-
-        if self.logger:
-            self.logger.debug("Pipe %s opened for writing", self._pipe_path)
-
     async def write(self, data: bytes, log_slow_writes: bool = True) -> None:
         """Write data to the named pipe (blocking operation runs in thread).
 
@@ -74,18 +49,11 @@ class AsyncNamedPipeWriter:
         Raises:
             RuntimeError: If pipe is not open
         """
-        if self._fd is None:
-            raise RuntimeError(f"Pipe {self._pipe_path} is not open")
-
         start_time = time.time()
 
         def _write() -> None:
-            assert self._fd is not None
-            # Write all data (may block if pipe buffer is full)
-            bytes_written = 0
-            while bytes_written < len(data):
-                n = os.write(self._fd, data[bytes_written:])
-                bytes_written += n
+            with open(self._pipe_path, "wb", buffering=0) as pipe_file:
+                pipe_file.write(data)
 
         # Run blocking write in thread pool
         await asyncio.to_thread(_write)
@@ -101,23 +69,8 @@ class AsyncNamedPipeWriter:
                     self._pipe_path,
                 )
 
-    async def close(self) -> None:
-        """Close the named pipe."""
-        if self._fd is None:
-            return
-
-        fd = self._fd
-        self._fd = None
-
-        def _close() -> None:
-            with suppress(Exception):
-                os.close(fd)
-
-        await asyncio.to_thread(_close)
-
     async def remove(self) -> None:
         """Remove the named pipe."""
-        await self.close()
 
         def _remove() -> None:
             with suppress(Exception):
@@ -125,20 +78,6 @@ class AsyncNamedPipeWriter:
 
         await asyncio.to_thread(_remove)
 
-    async def __aenter__(self) -> AsyncNamedPipeWriter:
-        """Context manager entry."""
-        await self.open()
-        return self
-
-    async def __aexit__(
-        self,
-        exc_type: type[BaseException] | None,
-        exc_val: BaseException | None,
-        exc_tb: TracebackType | None,
-    ) -> None:
-        """Context manager exit."""
-        await self.close()
-
     def __str__(self) -> str:
         """Return string representation."""
         return self._pipe_path
index 4279b317cc47f182286282d9eb05d8fc352b396f..c630463abe163fa00b84bd9df4b6160de2cb3378 100755 (executable)
Binary files a/music_assistant/providers/airplay/bin/cliraop-macos-arm64 and b/music_assistant/providers/airplay/bin/cliraop-macos-arm64 differ
index cc72940f24dd4bad126b8fcc1f5f654b195fe4e6..93a57d003e8ae489baa402d708a83455805ad691 100644 (file)
@@ -2,7 +2,6 @@
 
 from __future__ import annotations
 
-import asyncio
 import time
 from abc import ABC, abstractmethod
 from random import randint
@@ -65,7 +64,6 @@ class AirPlayProtocol(ABC):
             self.logger,
         )
         # State tracking
-        self._started = asyncio.Event()
         self._stopped = False
         self._total_bytes_sent = 0
         self._stream_bytes_sent = 0
@@ -73,12 +71,7 @@ class AirPlayProtocol(ABC):
     @property
     def running(self) -> bool:
         """Return boolean if this stream is running."""
-        return (
-            not self._stopped
-            and self._started.is_set()
-            and self._cli_proc is not None
-            and not self._cli_proc.closed
-        )
+        return not self._stopped and self._cli_proc is not None and not self._cli_proc.closed
 
     @abstractmethod
     async def start(self, start_ntp: int) -> None:
@@ -119,11 +112,6 @@ class AirPlayProtocol(ABC):
         if not self.commands_pipe:
             return
 
-        if not self.commands_pipe.is_open:
-            await self.commands_pipe.open()
-
-        await self._started.wait()
-
         self.player.logger.log(VERBOSE_LOG_LEVEL, "sending command %s", command)
         self.player.last_command_sent = time.time()
         await self.commands_pipe.write(command.encode("utf-8"))
index 24bf3a49396d845cb6b3052506f20841efd90265..5be509d5090f02475b2da44552a251224b9c08bc 100644 (file)
@@ -39,16 +39,6 @@ class RaopStream(AirPlayProtocol):
     supports_pairing = True
     _stderr_reader_task: asyncio.Task[None] | None = None
 
-    @property
-    def running(self) -> bool:
-        """Return boolean if this stream is running."""
-        return (
-            not self._stopped
-            and self._started.is_set()
-            and self._cli_proc is not None
-            and not self._cli_proc.closed
-        )
-
     async def start(self, start_ntp: int) -> None:
         """Initialize CLIRaop process for a player."""
         assert self.player.discovery_info is not None  # for type checker
@@ -122,7 +112,6 @@ class RaopStream(AirPlayProtocol):
             self.player.logger.debug(line)
             if "connected to " in line:
                 self.player.logger.info("AirPlay device connected. Starting playback.")
-                self._started.set()
                 break
             if "Cannot connect to AirPlay device" in line:
                 raise PlayerCommandFailed("Cannot connect to AirPlay device")
index 31b9ea6f855fdd8bb42797c389821dc0d8c4f595..f7118b3632b7d855e433f898ebddefa825365f65 100644 (file)
@@ -61,7 +61,7 @@ class AirPlayStreamSession:
         # because we reuse an existing stream session for new play_media requests,
         # we need to track when the last stream was started
         self.last_stream_started: float = 0.0
-        self._clients_ready_event = asyncio.Event()
+        self._clients_ready = asyncio.Event()
 
     async def start(self) -> None:
         """Initialize stream session for all players."""
@@ -98,13 +98,13 @@ class AirPlayStreamSession:
             # create the named pipes
             await airplay_player.stream.audio_pipe.create()
             await airplay_player.stream.commands_pipe.create()
-            # start the stream
-            await airplay_player.stream.start(self.start_ntp)
             # start the (player-specific) ffmpeg process
             # note that ffmpeg will open the named pipe for writing
             await self._start_client_ffmpeg(airplay_player)
-            # open the command pipe for writing
-            await airplay_player.stream.commands_pipe.open()
+
+            # start the stream
+            await airplay_player.stream.start(self.start_ntp)
+
             # repeat sending the volume level to the player because some players seem
             # to ignore it the first time
             # https://github.com/music-assistant/support/issues/3330
@@ -114,7 +114,7 @@ class AirPlayStreamSession:
             for _airplay_player in self.sync_clients:
                 tm.create_task(_start_client(_airplay_player))
         # All clients started
-        self._clients_ready_event.set()
+        self._clients_ready.set()
 
     async def stop(self) -> None:
         """Stop playback and cleanup."""
@@ -186,13 +186,13 @@ class AirPlayStreamSession:
 
         # Link stream session to player stream
         airplay_player.stream.session = self
+        # create the named pipes
+        await airplay_player.stream.audio_pipe.create()
+        await airplay_player.stream.commands_pipe.create()
 
         # Snapshot seconds_streamed inside lock to prevent race conditions
         # Keep lock held during stream.start() to ensure player doesn't miss any chunks
         async with self._lock:
-            # (re)start the player specific ffmpeg process
-            await self._start_client_ffmpeg(airplay_player)
-
             # Calculate skip_seconds based on how many chunks have been sent
             skip_seconds = self.seconds_streamed
             # Start the stream at compensated NTP timestamp
@@ -207,6 +207,8 @@ class AirPlayStreamSession:
             if airplay_player not in self.sync_clients:
                 self.sync_clients.append(airplay_player)
 
+            # start the player specific ffmpeg process
+            await self._start_client_ffmpeg(airplay_player)
             await airplay_player.stream.start(start_ntp)
 
     async def replace_stream(self, audio_source: AsyncGenerator[bytes, None]) -> None:
@@ -251,7 +253,7 @@ class AirPlayStreamSession:
                         time.time() - self.start_time,
                     )
                     # wait until the clients are ready to receive audio
-                    await asyncio.wait_for(self._clients_ready_event.wait(), timeout=10)
+                    await asyncio.wait_for(self._clients_ready.wait(), timeout=10)
                 async with self._lock:
                     sync_clients = [x for x in self.sync_clients if x.stream and x.stream.running]
                     if not sync_clients:
@@ -370,6 +372,8 @@ class AirPlayStreamSession:
         # and outputs in the correct format for the player stream
         # to the named pipe associated with the player's stream
         if ffmpeg := self._player_ffmpeg.get(player_id):
+            if ffmpeg.closed:
+                raise RuntimeError(f"FFMpeg process for player {player_id} is closed")
             await ffmpeg.write(chunk)
 
         stream_write_start = time.time()