import os
import time
from contextlib import suppress
-from types import TracebackType
from typing import TYPE_CHECKING
if TYPE_CHECKING:
"""
self._pipe_path = pipe_path
self.logger = logger
- self._fd: int | None = None
@property
def path(self) -> str:
"""Return the named pipe path."""
return self._pipe_path
- @property
- def is_open(self) -> bool:
- """Return True if the pipe is currently open."""
- return self._fd is not None
-
async def create(self) -> None:
"""Create the named pipe (if it does not exist)."""
def _create() -> None:
with suppress(FileExistsError):
- os.mkfifo(self._pipe_path, 0o600)
+ os.mkfifo(self._pipe_path)
await asyncio.to_thread(_create)
- async def open(self) -> None:
- """Open the named pipe for writing (blocking operation runs in thread)."""
- if self._fd is not None:
- return # Already open
-
- def _open() -> int:
- # Open pipe in BLOCKING write mode (simple approach)
- # - open() blocks until the reader (cliraop) connects
- # - write() blocks if pipe buffer is full
- # Both operations run in thread pool via asyncio.to_thread(),
- # so they won't block the event loop
- return os.open(self._pipe_path, os.O_WRONLY)
-
- self._fd = await asyncio.to_thread(_open)
-
- if self.logger:
- self.logger.debug("Pipe %s opened for writing", self._pipe_path)
-
async def write(self, data: bytes, log_slow_writes: bool = True) -> None:
"""Write data to the named pipe (blocking operation runs in thread).
Raises:
RuntimeError: If pipe is not open
"""
- if self._fd is None:
- raise RuntimeError(f"Pipe {self._pipe_path} is not open")
-
start_time = time.time()
def _write() -> None:
- assert self._fd is not None
- # Write all data (may block if pipe buffer is full)
- bytes_written = 0
- while bytes_written < len(data):
- n = os.write(self._fd, data[bytes_written:])
- bytes_written += n
+ with open(self._pipe_path, "wb", buffering=0) as pipe_file:
+ pipe_file.write(data)
# Run blocking write in thread pool
await asyncio.to_thread(_write)
self._pipe_path,
)
- async def close(self) -> None:
- """Close the named pipe."""
- if self._fd is None:
- return
-
- fd = self._fd
- self._fd = None
-
- def _close() -> None:
- with suppress(Exception):
- os.close(fd)
-
- await asyncio.to_thread(_close)
-
async def remove(self) -> None:
"""Remove the named pipe."""
- await self.close()
def _remove() -> None:
with suppress(Exception):
await asyncio.to_thread(_remove)
- async def __aenter__(self) -> AsyncNamedPipeWriter:
- """Context manager entry."""
- await self.open()
- return self
-
- async def __aexit__(
- self,
- exc_type: type[BaseException] | None,
- exc_val: BaseException | None,
- exc_tb: TracebackType | None,
- ) -> None:
- """Context manager exit."""
- await self.close()
-
def __str__(self) -> str:
"""Return string representation."""
return self._pipe_path
from __future__ import annotations
-import asyncio
import time
from abc import ABC, abstractmethod
from random import randint
self.logger,
)
# State tracking
- self._started = asyncio.Event()
self._stopped = False
self._total_bytes_sent = 0
self._stream_bytes_sent = 0
@property
def running(self) -> bool:
"""Return boolean if this stream is running."""
- return (
- not self._stopped
- and self._started.is_set()
- and self._cli_proc is not None
- and not self._cli_proc.closed
- )
+ return not self._stopped and self._cli_proc is not None and not self._cli_proc.closed
@abstractmethod
async def start(self, start_ntp: int) -> None:
if not self.commands_pipe:
return
- if not self.commands_pipe.is_open:
- await self.commands_pipe.open()
-
- await self._started.wait()
-
self.player.logger.log(VERBOSE_LOG_LEVEL, "sending command %s", command)
self.player.last_command_sent = time.time()
await self.commands_pipe.write(command.encode("utf-8"))
supports_pairing = True
_stderr_reader_task: asyncio.Task[None] | None = None
- @property
- def running(self) -> bool:
- """Return boolean if this stream is running."""
- return (
- not self._stopped
- and self._started.is_set()
- and self._cli_proc is not None
- and not self._cli_proc.closed
- )
-
async def start(self, start_ntp: int) -> None:
"""Initialize CLIRaop process for a player."""
assert self.player.discovery_info is not None # for type checker
self.player.logger.debug(line)
if "connected to " in line:
self.player.logger.info("AirPlay device connected. Starting playback.")
- self._started.set()
break
if "Cannot connect to AirPlay device" in line:
raise PlayerCommandFailed("Cannot connect to AirPlay device")
# because we reuse an existing stream session for new play_media requests,
# we need to track when the last stream was started
self.last_stream_started: float = 0.0
- self._clients_ready_event = asyncio.Event()
+ self._clients_ready = asyncio.Event()
async def start(self) -> None:
"""Initialize stream session for all players."""
# create the named pipes
await airplay_player.stream.audio_pipe.create()
await airplay_player.stream.commands_pipe.create()
- # start the stream
- await airplay_player.stream.start(self.start_ntp)
# start the (player-specific) ffmpeg process
# note that ffmpeg will open the named pipe for writing
await self._start_client_ffmpeg(airplay_player)
- # open the command pipe for writing
- await airplay_player.stream.commands_pipe.open()
+
+ # start the stream
+ await airplay_player.stream.start(self.start_ntp)
+
# repeat sending the volume level to the player because some players seem
# to ignore it the first time
# https://github.com/music-assistant/support/issues/3330
for _airplay_player in self.sync_clients:
tm.create_task(_start_client(_airplay_player))
# All clients started
- self._clients_ready_event.set()
+ self._clients_ready.set()
async def stop(self) -> None:
"""Stop playback and cleanup."""
# Link stream session to player stream
airplay_player.stream.session = self
+ # create the named pipes
+ await airplay_player.stream.audio_pipe.create()
+ await airplay_player.stream.commands_pipe.create()
# Snapshot seconds_streamed inside lock to prevent race conditions
# Keep lock held during stream.start() to ensure player doesn't miss any chunks
async with self._lock:
- # (re)start the player specific ffmpeg process
- await self._start_client_ffmpeg(airplay_player)
-
# Calculate skip_seconds based on how many chunks have been sent
skip_seconds = self.seconds_streamed
# Start the stream at compensated NTP timestamp
if airplay_player not in self.sync_clients:
self.sync_clients.append(airplay_player)
+ # start the player specific ffmpeg process
+ await self._start_client_ffmpeg(airplay_player)
await airplay_player.stream.start(start_ntp)
async def replace_stream(self, audio_source: AsyncGenerator[bytes, None]) -> None:
time.time() - self.start_time,
)
# wait until the clients are ready to receive audio
- await asyncio.wait_for(self._clients_ready_event.wait(), timeout=10)
+ await asyncio.wait_for(self._clients_ready.wait(), timeout=10)
async with self._lock:
sync_clients = [x for x in self.sync_clients if x.stream and x.stream.running]
if not sync_clients:
# and outputs in the correct format for the player stream
# to the named pipe associated with the player's stream
if ffmpeg := self._player_ffmpeg.get(player_id):
+ if ffmpeg.closed:
+ raise RuntimeError(f"FFMpeg process for player {player_id} is closed")
await ffmpeg.write(chunk)
stream_write_start = time.time()