import asyncio
import os
-import time
+import stat
from contextlib import suppress
-from typing import TYPE_CHECKING
-
-if TYPE_CHECKING:
- from logging import Logger
class AsyncNamedPipeWriter:
"""Simple async writer for named pipes using thread pool for blocking I/O."""
- def __init__(self, pipe_path: str, logger: Logger | None = None) -> None:
+ def __init__(self, pipe_path: str) -> None:
"""Initialize named pipe writer.
Args:
pipe_path: Path to the named pipe
- logger: Optional logger for debug/error messages
"""
self._pipe_path = pipe_path
- self.logger = logger
@property
def path(self) -> str:
"""Create the named pipe (if it does not exist)."""
def _create() -> None:
- with suppress(FileExistsError):
+ try:
os.mkfifo(self._pipe_path)
- # Should we handle the FileExistsError and check to make
- # sure the file is indeed a named pipe using os.stat()
- # and if it isn't then delete and re-create?
+ except FileExistsError:
+ # Check if existing file is actually a named pipe
+ file_stat = os.stat(self._pipe_path)
+ if not stat.S_ISFIFO(file_stat.st_mode):
+ # Not a FIFO - remove and recreate
+ os.remove(self._pipe_path)
+ os.mkfifo(self._pipe_path)
await asyncio.to_thread(_create)
- async def write(self, data: bytes, log_slow_writes: bool = True) -> None:
- """Write data to the named pipe (blocking operation runs in thread).
-
- Args:
- data: Data to write to the pipe
- log_slow_writes: Whether to log slow writes (>5s)
-
- Raises:
- RuntimeError: If pipe is not open
- """
- start_time = time.time()
+ async def write(self, data: bytes) -> None:
+ """Write data to the named pipe (blocking operation runs in thread)."""
def _write() -> None:
with open(self._pipe_path, "wb") as pipe_file:
# Run blocking write in thread pool
await asyncio.to_thread(_write)
- if log_slow_writes:
- elapsed = time.time() - start_time
- # Only log if it took more than 5 seconds (real stall)
- if elapsed > 5.0 and self.logger:
- self.logger.error(
- "!!! STALLED PIPE WRITE: Took %.3fs to write %d bytes to %s",
- elapsed,
- len(data),
- self._pipe_path,
- )
-
async def remove(self) -> None:
"""Remove the named pipe."""
from propcache import under_cached_property as cached_property
from music_assistant.constants import (
+ ATTR_ANNOUNCEMENT_IN_PROGRESS,
ATTR_FAKE_MUTE,
ATTR_FAKE_POWER,
ATTR_FAKE_VOLUME,
def __calculate_current_media(self) -> PlayerMedia | None:
"""Calculate the current media for the player."""
+ if self.extra_data.get(ATTR_ANNOUNCEMENT_IN_PROGRESS):
+ # if an announcement is in progress, return announcement details
+ return PlayerMedia(
+ uri="announcement",
+ media_type=MediaType.ANNOUNCEMENT,
+ title="ANNOUNCEMENT",
+ )
# if the player is grouped/synced, use the current_media of the group/parent player
if parent_player_id := (self.active_group or self.synced_to):
if parent_player := self.mass.players.get(parent_player_id):
self._cli_proc: AsyncProcess | None = None
self.audio_pipe = AsyncNamedPipeWriter(
f"/tmp/{self.player.protocol.value}-{self.player.player_id}-{self.active_remote_id}-audio", # noqa: S108
- self.logger,
)
self.commands_pipe = AsyncNamedPipeWriter(
f"/tmp/{self.player.protocol.value}-{self.player.player_id}-{self.active_remote_id}-cmd", # noqa: S108
- self.logger,
)
# State tracking
self._stopped = False
logger.log(VERBOSE_LOG_LEVEL, line)
await asyncio.sleep(0) # Yield to event loop
- # ensure we're cleaned up afterwards (this also logs the returncode)
logger.debug("CLIRaop stderr reader ended")
if not self._stopped:
- await self.stop()
+ self.player.set_state_from_stream(state=PlaybackState.IDLE, elapsed_time=0)
await ffmpeg.write_eof()
await ffmpeg.wait_with_timeout(30)
del ffmpeg
- assert airplay_player.stream # for type checker
- # then stop the player stream
- await airplay_player.stream.stop()
async def _send_metadata(self, progress: int | None, metadata: PlayerMedia | None) -> None:
"""Send metadata to all players."""
# Initialize named pipe helpers
audio_pipe_path = f"/tmp/ma_airplay_audio_{self.instance_id}" # noqa: S108
metadata_pipe_path = f"/tmp/ma_airplay_metadata_{self.instance_id}" # noqa: S108
- self.audio_pipe = AsyncNamedPipeWriter(audio_pipe_path, self.logger)
- self.metadata_pipe = AsyncNamedPipeWriter(metadata_pipe_path, self.logger)
+ self.audio_pipe = AsyncNamedPipeWriter(audio_pipe_path)
+ self.metadata_pipe = AsyncNamedPipeWriter(metadata_pipe_path)
self.config_file = f"/tmp/ma_shairport_sync_{self.instance_id}.conf" # noqa: S108
# Use port 7000+ for AirPlay 2 compatibility
# Each instance gets a unique port: 7000, 7001, 7002, etc.
"""
self.logger.debug("Writing silence to audio pipe to unblock stream")
silence = b"\x00" * 176400 # 1 second of silence in PCM_S16LE stereo 44.1kHz
- await self.audio_pipe.write(silence, log_slow_writes=False)
+ await self.audio_pipe.write(silence)
def _process_shairport_log_line(self, line: str) -> None:
"""Process a log line from shairport-sync stderr.