player_config["provider"] = prov.instance_id
changed = True
+ # Migrate AirPlay legacy credentials (ap_credentials) to protocol-specific keys
+ # The old key was used for both RAOP and AirPlay, now we have separate keys
+ for player_id, player_config in self._data.get(CONF_PLAYERS, {}).items():
+ if player_config.get("provider") != "airplay":
+ continue
+ if not (values := player_config.get("values")):
+ continue
+ if "ap_credentials" not in values:
+ continue
+ # Migrate to raop_credentials (RAOP is the default/fallback protocol)
+ # The new code will use the correct key based on the protocol
+ old_creds = values.pop("ap_credentials")
+ if old_creds and "raop_credentials" not in values:
+ values["raop_credentials"] = old_creds
+ LOGGER.info("Migrated AirPlay credentials for player %s", player_id)
+ changed = True
+
if changed:
await self._async_save()
from __future__ import annotations
import asyncio
+import errno as errno_module
+import logging
import os
-import stat
+import time
from contextlib import suppress
from pathlib import Path
+_LOGGER = logging.getLogger("named_pipe")
+
class AsyncNamedPipeWriter:
- """Simple async writer for named pipes using thread pool for blocking I/O."""
+ """Async writer for named pipes."""
def __init__(self, pipe_path: str) -> None:
- """Initialize named pipe writer.
-
- Args:
- pipe_path: Path to the named pipe
- """
+ """Initialize named pipe writer."""
self._pipe_path = pipe_path
+ self._write_fd: int | None = None
@property
def path(self) -> str:
return self._pipe_path
async def create(self) -> None:
- """Create the named pipe (if it does not exist)."""
+ """Create the named pipe."""
def _create() -> None:
- try:
- os.mkfifo(self._pipe_path)
- except FileExistsError:
- # Check if existing file is actually a named pipe
- file_stat = os.stat(self._pipe_path)
- if not stat.S_ISFIFO(file_stat.st_mode):
- # Not a FIFO - remove and recreate
- Path(self._pipe_path).unlink()
- os.mkfifo(self._pipe_path)
+ pipe_path = Path(self._pipe_path)
+ if pipe_path.exists():
+ pipe_path.unlink()
+ os.mkfifo(self._pipe_path)
await asyncio.to_thread(_create)
+ def _ensure_write_fd(self) -> bool:
+ """Ensure we have a write fd open. Returns True if successful."""
+ if self._write_fd is not None:
+ return True
+ if not Path(self._pipe_path).exists():
+ return False
+ # Retry opening until reader is available (up to 1s)
+ for _ in range(20):
+ try:
+ self._write_fd = os.open(self._pipe_path, os.O_WRONLY | os.O_NONBLOCK)
+ return True
+ except OSError as e:
+ if e.errno in (errno_module.ENXIO, errno_module.ENOENT):
+ time.sleep(0.05)
+ continue
+ raise
+ _LOGGER.warning("Could not open pipe %s: no reader after retries", self._pipe_path)
+ return False
+
async def write(self, data: bytes) -> None:
- """Write data to the named pipe (blocking operation runs in thread)."""
+ """Write data to the named pipe."""
def _write() -> None:
- with open(self._pipe_path, "wb") as pipe_file:
- pipe_file.write(data)
+ if not self._ensure_write_fd():
+ return
+ try:
+ assert self._write_fd is not None
+ os.write(self._write_fd, data)
+ except OSError as e:
+ if e.errno == errno_module.EPIPE:
+ # Reader closed, reset fd for next attempt
+ if self._write_fd is not None:
+ with suppress(Exception):
+ os.close(self._write_fd)
+ self._write_fd = None
+ else:
+ raise
- # Run blocking write in thread pool
await asyncio.to_thread(_write)
async def remove(self) -> None:
- """Remove the named pipe."""
-
- def _remove() -> None:
+ """Close write fd and remove the pipe."""
+ if self._write_fd is not None:
with suppress(Exception):
- Path(self._pipe_path).unlink()
-
- await asyncio.to_thread(_remove)
+ os.close(self._write_fd)
+ self._write_fd = None
+ pipe_path = Path(self._pipe_path)
+ if pipe_path.exists():
+ with suppress(Exception):
+ pipe_path.unlink()
def __str__(self) -> str:
"""Return string representation."""
await self._stdin_feeder_task
# close stdin to signal we're done sending data
- await asyncio.wait_for(self._stdin_lock.acquire(), 10)
+ with suppress(TimeoutError, asyncio.CancelledError):
+ await asyncio.wait_for(self._stdin_lock.acquire(), 5)
if self.proc.stdin and not self.proc.stdin.is_closing():
self.proc.stdin.close()
elif not self.proc.stdin and self.proc.returncode is None:
self.proc.send_signal(SIGINT)
# ensure we have no more readers active and stdout is drained
- await asyncio.wait_for(self._stdout_lock.acquire(), 10)
+ with suppress(TimeoutError, asyncio.CancelledError):
+ await asyncio.wait_for(self._stdout_lock.acquire(), 5)
if self.proc.stdout and not self.proc.stdout.at_eof():
with suppress(Exception):
await self.proc.stdout.read(-1)
# if we have a stderr task active, allow it to finish
if self._stderr_reader_task:
- await asyncio.wait_for(self._stderr_reader_task, 10)
+ with suppress(TimeoutError, asyncio.CancelledError):
+ await asyncio.wait_for(self._stderr_reader_task, 5)
elif self.proc.stderr and not self.proc.stderr.at_eof():
- await asyncio.wait_for(self._stderr_lock.acquire(), 10)
+ with suppress(TimeoutError, asyncio.CancelledError):
+ await asyncio.wait_for(self._stderr_lock.acquire(), 5)
# drain stderr
with suppress(Exception):
await self.proc.stderr.read(-1)
# make sure the process is really cleaned up.
# especially with pipes this can cause deadlocks if not properly guarded
# we need to ensure stdout and stderr are flushed and stdin closed
+ pid = self.proc.pid
+ terminate_attempts = 0
while self.returncode is None:
try:
# use communicate to flush all pipe buffers
- await asyncio.wait_for(self.proc.communicate(), 5)
+ await asyncio.wait_for(self.proc.communicate(), 2)
except TimeoutError:
+ terminate_attempts += 1
self.logger.debug(
- "Process %s with PID %s did not stop in time. Sending terminate...",
+ "Process %s with PID %s did not stop in time (attempt %d). Sending SIGKILL...",
self.name,
- self.proc.pid,
+ pid,
+ terminate_attempts,
)
- with suppress(ProcessLookupError):
- self.proc.terminate()
+ # Use os.kill for more direct signal delivery
+ with suppress(ProcessLookupError, OSError):
+ os.kill(pid, 9) # SIGKILL = 9
+ # Give up after 5 attempts - process may be zombie
+ if terminate_attempts >= 5:
+ self.logger.warning(
+ "Process %s (PID %s) did not terminate after %d SIGKILL attempts",
+ self.name,
+ pid,
+ terminate_attempts,
+ )
+ break
self.logger.log(
VERBOSE_LOG_LEVEL,
"Process %s with PID %s stopped with returncode %s",
self.returncode,
)
+ async def kill(self) -> None:
+ """
+ Immediately kill the process with SIGKILL.
+
+ Use this for forceful termination when the process doesn't respond to
+ normal termination signals. Unlike close(), this doesn't attempt graceful
+ shutdown - it immediately sends SIGKILL.
+ """
+ self._close_called = True
+ if not self.proc or self.returncode is not None:
+ return
+
+ pid = self.proc.pid
+
+ # Cancel stdin feeder task if any
+ if self._stdin_feeder_task and not self._stdin_feeder_task.done():
+ self._stdin_feeder_task.cancel()
+ with suppress(asyncio.CancelledError, Exception):
+ await self._stdin_feeder_task
+
+ # Cancel stderr reader task if any
+ if self._stderr_reader_task and not self._stderr_reader_task.done():
+ self._stderr_reader_task.cancel()
+ with suppress(asyncio.CancelledError, Exception):
+ await self._stderr_reader_task
+
+ # Close all pipes first to prevent any I/O blocking
+ # This helps processes stuck on blocked I/O to receive signals
+ if self.proc.stdin and not self.proc.stdin.is_closing():
+ self.proc.stdin.close()
+ if self.proc.stdout:
+ self.proc.stdout.feed_eof()
+ if self.proc.stderr:
+ self.proc.stderr.feed_eof()
+
+ # Send SIGKILL immediately using os.kill for more direct signal delivery
+ self.logger.debug("Killing process %s with PID %s", self.name, pid)
+ with suppress(ProcessLookupError, OSError):
+ os.kill(pid, 9) # SIGKILL = 9
+
+ # Wait for process to actually terminate
+ try:
+ await asyncio.wait_for(self.proc.wait(), 2)
+ except TimeoutError:
+ # Try one more time with os.kill
+ with suppress(ProcessLookupError, OSError):
+ os.kill(pid, 9)
+ try:
+ await asyncio.wait_for(self.proc.wait(), 2)
+ except TimeoutError:
+ self.logger.warning(
+ "Process %s with PID %s did not terminate after SIGKILL - may be zombie",
+ self.name,
+ pid,
+ )
+
+ self.logger.log(
+ VERBOSE_LOG_LEVEL,
+ "Process %s with PID %s killed with returncode %s",
+ self.name,
+ pid,
+ self.returncode,
+ )
+
async def wait(self) -> int:
"""Wait for the process and return the returncode."""
if self._returncode is None:
"""
# if the player is grouped/synced, use the active source of the group/parent player
if parent_player_id := (self.active_group or self.synced_to):
- if parent_player := self.mass.players.get(parent_player_id):
+ if parent_player_id != self.player_id and (
+ parent_player := self.mass.players.get(parent_player_id)
+ ):
return parent_player.active_source
for plugin_source in self.mass.players.get_plugin_sources():
if plugin_source.in_use_by == self.player_id:
)
# if the player is grouped/synced, use the current_media of the group/parent player
if parent_player_id := (self.active_group or self.synced_to):
- if parent_player := self.mass.players.get(parent_player_id):
+ if parent_player_id != self.player_id and (
+ parent_player := self.mass.players.get(parent_player_id)
+ ):
return parent_player.current_media
# if a pluginsource is currently active, return those details
if (
from __future__ import annotations
-from enum import Enum
+from enum import IntEnum
from typing import Final
from music_assistant_models.enums import ContentType
DOMAIN = "airplay"
-class StreamingProtocol(Enum):
+class StreamingProtocol(IntEnum):
"""AirPlay streaming protocol versions."""
RAOP = 1 # AirPlay 1 (RAOP)
RAOP_DISCOVERY_TYPE: Final[str] = "_raop._tcp.local."
DACP_DISCOVERY_TYPE: Final[str] = "_dacp._tcp.local."
-AIRPLAY_PRELOAD_SECONDS: Final[int] = (
- 5 # Number of seconds (in PCM) to preload before throttling back
-)
-AIRPLAY_PROCESS_SPAWN_TIME_MS: Final[int] = (
- 200 # Time in ms to allow AirPlay CLI processes to spawn and initialise
-)
AIRPLAY_OUTPUT_BUFFER_DURATION_MS: Final[int] = (
2000 # Read ahead buffer for cliraop. Output buffer duration for cliap2.
)
AIRPLAY2_MIN_LOG_LEVEL: Final[int] = 3 # Min loglevel to ensure stderr output contains what we need
AIRPLAY2_CONNECT_TIME_MS: Final[int] = 2500 # Time in ms to allow AirPlay2 device to connect
+RAOP_CONNECT_TIME_MS: Final[int] = 1000 # Time in ms to allow RAOP device to connect
+
+# Per-protocol credential storage keys
+CONF_RAOP_CREDENTIALS: Final[str] = "raop_credentials"
+CONF_AIRPLAY_CREDENTIALS: Final[str] = "airplay_credentials"
+
+# Legacy credential key (for migration)
CONF_AP_CREDENTIALS: Final[str] = "ap_credentials"
-CONF_MRP_CREDENTIALS: Final[str] = "mrp_credentials"
-CONF_ACTION_START_PAIRING: Final[str] = "start_ap_pairing"
-CONF_ACTION_FINISH_PAIRING: Final[str] = "finish_ap_pairing"
-CONF_ACTION_START_MRP_PAIRING: Final[str] = "start_mrp_pairing"
-CONF_ACTION_FINISH_MRP_PAIRING: Final[str] = "finish_mrp_pairing"
+
+# Pairing action keys
+CONF_ACTION_START_PAIRING: Final[str] = "start_pairing"
+CONF_ACTION_FINISH_PAIRING: Final[str] = "finish_pairing"
+CONF_ACTION_RESET_PAIRING: Final[str] = "reset_pairing"
CONF_PAIRING_PIN: Final[str] = "pairing_pin"
-CONF_MRP_PAIRING_PIN: Final[str] = "mrp_pairing_pin"
CONF_ENABLE_LATE_JOIN: Final[str] = "enable_late_join"
BACKOFF_TIME_LOWER_LIMIT: Final[int] = 15 # seconds
("Sonos", "Move 2"),
("Sonos", "Roam 2"),
("Sonos", "Arc Ultra"),
- # Samsung has been repeatedly being reported as having issues with AirPlay 1/raop
+ # Samsung has been repeatedly being reported as having issues with AirPlay (raop and AP2)
("Samsung", "*"),
)
AIRPLAY_2_DEFAULT_MODELS = (
# Models that are known to work better with AirPlay 2 protocol instead of RAOP
+ # These use the translated/friendly model names from get_model_info()
("Ubiquiti Inc.", "*"),
("Juke Audio", "*"),
)
def is_broken_airplay_model(manufacturer: str, model: str) -> bool:
- """Check if a model is known to have broken RAOP support."""
+ """Check if a model is known to have broken AirPlay support."""
for broken_manufacturer, broken_model in BROKEN_AIRPLAY_MODELS:
if broken_manufacturer in (manufacturer, "*") and broken_model in (model, "*"):
return True
return (ntp_seconds << 32) | ntp_fraction
+def player_id_to_mac_address(player_id: str) -> str:
+ """Convert a player_id to a MAC address-like string."""
+ # the player_id is the mac address prefixed with "ap"
+ hex_str = player_id.replace("ap", "").upper()
+ return ":".join(hex_str[i : i + 2] for i in range(0, 12, 2))
+
+
+def generate_active_remote_id(mac_address: str) -> str:
+ """
+ Generate an Active-Remote ID for DACP communication.
+
+ The Active-Remote ID is used to match DACP callbacks from devices to the
+ correct stream. This function generates a consistent ID based on the
+ player_id (=macaddress, =device id), converted to uint32).
+
+ :return: Active-Remote ID as decimal string.
+ """
+ # Convert MAC address format to uint32
+ # Remove colons: "AA:BB:CC:DD:EE:FF" -> "AABBCCDDEEFF"
+ hex_str = mac_address.replace(":", "").upper()
+ # Parse as uint64 and truncate to uint32 (lower 32 bits)
+ device_id_u64 = int(hex_str, 16)
+ device_id_u32 = device_id_u64 & 0xFFFFFFFF
+ return str(device_id_u32)
+
+
def add_seconds_to_ntp(ntp_timestamp: int, seconds: float) -> int:
"""
Add seconds to an NTP timestamp.
"[libraop (RAOP)](https://github.com/music-assistant/libraop)",
"[OwnTone (AirPlay2)](https://github.com/OwnTone)"
],
- "requirements": [],
+ "requirements": ["srptools>=1.0.0"],
"documentation": "https://music-assistant.io/player-support/airplay/",
"multi_instance": false,
"builtin": false,
--- /dev/null
+"""Native pairing implementations for AirPlay devices.
+
+This module provides pairing support for:
+- AirPlay 2 (HAP - HomeKit Accessory Protocol) - for Apple TV 4+, HomePod, Mac
+- RAOP (AirPlay 1 legacy pairing) - for older devices
+
+Both implementations produce credentials compatible with cliap2/cliraop.
+"""
+
+from __future__ import annotations
+
+import binascii
+import hashlib
+import logging
+import os
+import plistlib
+import uuid
+
+import aiohttp
+from cryptography.hazmat.primitives import hashes, serialization
+from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
+from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
+from cryptography.hazmat.primitives.ciphers.aead import ChaCha20Poly1305
+from cryptography.hazmat.primitives.kdf.hkdf import HKDF
+from music_assistant_models.errors import PlayerCommandFailed
+from srptools import SRPClientSession, SRPContext
+
+from .constants import StreamingProtocol
+
+# ============================================================================
+# Common utilities
+# ============================================================================
+
+
+def hkdf_derive(
+ input_key: bytes,
+ salt: bytes,
+ info: bytes,
+ length: int = 32,
+) -> bytes:
+ """Derive key using HKDF-SHA512.
+
+ :param input_key: Input keying material.
+ :param salt: Salt value.
+ :param info: Context info.
+ :param length: Output key length.
+ :return: Derived key bytes.
+ """
+ hkdf = HKDF(
+ algorithm=hashes.SHA512(),
+ length=length,
+ salt=salt,
+ info=info,
+ )
+ return hkdf.derive(input_key)
+
+
+# ============================================================================
+# TLV encoding/decoding for HAP
+# ============================================================================
+
+# TLV types for HAP pairing
+TLV_METHOD = 0x00
+TLV_IDENTIFIER = 0x01
+TLV_SALT = 0x02
+TLV_PUBLIC_KEY = 0x03
+TLV_PROOF = 0x04
+TLV_ENCRYPTED_DATA = 0x05
+TLV_STATE = 0x06
+TLV_ERROR = 0x07
+TLV_SIGNATURE = 0x0A
+
+
+def tlv_encode(items: list[tuple[int, bytes]]) -> bytes:
+ """Encode items into TLV format.
+
+ :param items: List of (type, value) tuples.
+ :return: TLV-encoded bytes.
+ """
+ result = bytearray()
+ for tlv_type, value in items:
+ offset = 0
+ while offset < len(value):
+ chunk = value[offset : offset + 255]
+ result.append(tlv_type)
+ result.append(len(chunk))
+ result.extend(chunk)
+ offset += 255
+ if len(value) == 0:
+ result.append(tlv_type)
+ result.append(0)
+ return bytes(result)
+
+
+def tlv_decode(data: bytes) -> dict[int, bytes]:
+ """Decode TLV format into dictionary.
+
+ :param data: TLV-encoded bytes.
+ :return: Dictionary mapping type to concatenated value.
+ """
+ result: dict[int, bytearray] = {}
+ offset = 0
+ while offset < len(data):
+ tlv_type = data[offset]
+ length = data[offset + 1]
+ value = data[offset + 2 : offset + 2 + length]
+ if tlv_type in result:
+ result[tlv_type].extend(value)
+ else:
+ result[tlv_type] = bytearray(value)
+ offset += 2 + length
+ return {k: bytes(v) for k, v in result.items()}
+
+
+# ============================================================================
+# HAP Pairing constants (for AirPlay 2)
+# ============================================================================
+
+# SRP 3072-bit prime for HAP (hex string format for srptools)
+HAP_SRP_PRIME_3072 = (
+ "FFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD129024E088A67CC74"
+ "020BBEA63B139B22514A08798E3404DDEF9519B3CD3A431B302B0A6DF25F1437"
+ "4FE1356D6D51C245E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7ED"
+ "EE386BFB5A899FA5AE9F24117C4B1FE649286651ECE45B3DC2007CB8A163BF05"
+ "98DA48361C55D39A69163FA8FD24CF5F83655D23DCA3AD961C62F356208552BB"
+ "9ED529077096966D670C354E4ABC9804F1746C08CA18217C32905E462E36CE3B"
+ "E39E772C180E86039B2783A2EC07A28FB5C55DF06F4C52C9DE2BCBF695581718"
+ "3995497CEA956AE515D2261898FA051015728E5A8AAAC42DAD33170D04507A33"
+ "A85521ABDF1CBA64ECFB850458DBEF0A8AEA71575D060C7DB3970F85A6E1E4C7"
+ "ABF5AE8CDB0933D71E8C94E04A25619DCEE3D2261AD2EE6BF12FFA06D98A0864"
+ "D87602733EC86A64521F2B18177B200CBBE117577A615D6C770988C0BAD946E2"
+ "08E24FA074E5AB3143DB5BFCE0FD108E4B82D120A93AD2CAFFFFFFFFFFFFFFFF"
+)
+HAP_SRP_GENERATOR = "5"
+
+
+# ============================================================================
+# RAOP Pairing constants (for AirPlay 1 legacy)
+# ============================================================================
+
+# SRP 2048-bit prime for RAOP (hex string format for srptools)
+RAOP_SRP_PRIME_2048 = (
+ "AC6BDB41324A9A9BF166DE5E1389582FAF72B6651987EE07FC319294"
+ "3DB56050A37329CBB4A099ED8193E0757767A13DD52312AB4B03310D"
+ "CD7F48A9DA04FD50E8083969EDB767B0CF6095179A163AB3661A05FB"
+ "D5FAAAE82918A9962F0B93B855F97993EC975EEAA80D740ADBF4FF74"
+ "7359D041D5C33EA71D281E446B14773BCA97B43A23FB801676BD207A"
+ "436C6481F1D2B9078717461A5B9D32E688F87748544523B524B0D57D"
+ "5EA77A2775D2ECFA032CFBDBF52FB3786160279004E57AE6AF874E73"
+ "03CE53299CCC041C7BC308D82A5698F3A8D0C38271AE35F8E9DBFBB6"
+ "94B5C803D89F7AE435DE236D525F54759B65E372FCD68EF20FA7111F"
+ "9E4AFF73"
+)
+RAOP_SRP_GENERATOR = "02" # RFC5054-2048bit uses generator 2
+
+
+# ============================================================================
+# Base Pairing class
+# ============================================================================
+
+
+class AirPlayPairing:
+ """Base class for AirPlay pairing.
+
+ Handles both HAP (AirPlay 2) and RAOP (AirPlay 1) pairing protocols.
+ """
+
+ def __init__(
+ self,
+ address: str,
+ name: str,
+ protocol: StreamingProtocol,
+ logger: logging.Logger,
+ port: int | None = None,
+ device_id: str | None = None,
+ ) -> None:
+ """Initialize AirPlay pairing.
+
+ :param address: IP address of the device.
+ :param name: Display name of the device.
+ :param protocol: Streaming protocol (RAOP or AIRPLAY2).
+ :param logger: Logger instance.
+ :param port: Port number (default: 7000 for AirPlay 2, 5000 for RAOP).
+ :param device_id: Device identifier (DACP ID) - must match what cliap2 uses.
+ """
+ self.address = address
+ self.name = name
+ self.protocol = protocol
+ self.logger = logger
+ self.port = port or (7000 if protocol == StreamingProtocol.AIRPLAY2 else 5000)
+
+ # HTTP session
+ self._session: aiohttp.ClientSession | None = None
+ self._base_url: str = f"http://{address}:{self.port}"
+
+ # Common state
+ self._is_pairing: bool = False
+ self._srp_context: SRPContext | None = None
+ self._srp_session: SRPClientSession | None = None
+ self._session_key: bytes | None = None
+
+ # Client identifier (device_id) handling depends on protocol:
+ # - HAP (AirPlay 2): Uses DACP ID as string identifier (must match cliap2 pair-verify)
+ # - RAOP: Uses 8 random bytes (not the DACP ID) - credentials are self-contained
+ if protocol == StreamingProtocol.AIRPLAY2:
+ # For HAP, use DACP ID as the identifier (must match pair-verify)
+ if device_id:
+ self._client_id: bytes = device_id.encode()
+ else:
+ self._client_id = str(uuid.uuid4()).encode()
+ else:
+ # For RAOP, generate 8 random bytes for client_id
+ # The credentials format is client_id_hex:auth_secret_hex
+ self._client_id = os.urandom(8)
+
+ # Ed25519 keypair
+ self._client_private_key: Ed25519PrivateKey | None = None
+ self._client_public_key: bytes | None = None
+
+ # Server's public key
+ self._server_public_key: bytes | None = None
+
+ @property
+ def is_pairing(self) -> bool:
+ """Return True if a pairing session is in progress."""
+ return self._is_pairing
+
+ @property
+ def device_provides_pin(self) -> bool:
+ """Return True if the device displays the PIN."""
+ return True # Both HAP and RAOP display PIN on device
+
+ @property
+ def protocol_name(self) -> str:
+ """Return human-readable protocol name."""
+ if self.protocol == StreamingProtocol.RAOP:
+ return "RAOP (AirPlay 1)"
+ return "AirPlay"
+
+ async def start_pairing(self) -> bool:
+ """Start the pairing process.
+
+ :return: True if device provides PIN (always True for AirPlay).
+ :raises PlayerCommandFailed: If device connection fails.
+ """
+ self.logger.info(
+ "Starting %s pairing with %s at %s:%d",
+ self.protocol_name,
+ self.name,
+ self.address,
+ self.port,
+ )
+
+ # Generate Ed25519 keypair
+ self._client_private_key = Ed25519PrivateKey.generate()
+ self._client_public_key = self._client_private_key.public_key().public_bytes(
+ encoding=serialization.Encoding.Raw,
+ format=serialization.PublicFormat.Raw,
+ )
+
+ # Create HTTP session
+ self._session = aiohttp.ClientSession()
+
+ try:
+ # Request PIN to be shown on device
+ async with self._session.post(
+ f"{self._base_url}/pair-pin-start",
+ timeout=aiohttp.ClientTimeout(total=10),
+ ) as resp:
+ if resp.status != 200:
+ raise PlayerCommandFailed(f"Failed to start pairing: HTTP {resp.status}")
+
+ self._is_pairing = True
+ self.logger.info("Device %s is displaying PIN", self.name)
+
+ # SRP context will be created in finish_pairing when we have the PIN
+ return True
+
+ except aiohttp.ClientError as err:
+ await self.close()
+ raise PlayerCommandFailed(f"Connection failed: {err}") from err
+
+ async def finish_pairing(self, pin: str) -> str:
+ """Complete pairing with the provided PIN.
+
+ :param pin: 4-digit PIN from device screen.
+ :return: Credentials string for cliap2/cliraop.
+ :raises PlayerCommandFailed: If pairing fails.
+ """
+ if not self._session:
+ raise PlayerCommandFailed("Pairing not started")
+
+ try:
+ if self.protocol == StreamingProtocol.AIRPLAY2:
+ return await self._finish_hap_pairing(pin)
+ return await self._finish_raop_pairing(pin)
+ except PlayerCommandFailed:
+ raise
+ except Exception as err:
+ self.logger.exception("Pairing failed")
+ raise PlayerCommandFailed(f"Pairing failed: {err}") from err
+ finally:
+ await self.close()
+
+ # ========================================================================
+ # HAP (AirPlay 2) pairing implementation
+ # ========================================================================
+
+ async def _finish_hap_pairing(self, pin: str) -> str:
+ """Complete HAP pairing for AirPlay 2.
+
+ :param pin: 4-digit PIN.
+ :return: Credentials (192 hex chars).
+ """
+ if not self._session:
+ raise PlayerCommandFailed("Pairing not started")
+
+ self.logger.info("Completing HAP pairing with PIN")
+
+ # HAP headers required for pair-setup
+ hap_headers = {
+ "Content-Type": "application/octet-stream",
+ "X-Apple-HKP": "3",
+ }
+
+ # M1: Send method request (state=1, method=0 for pair-setup)
+ m1_data = tlv_encode(
+ [
+ (TLV_METHOD, bytes([0x00])),
+ (TLV_STATE, bytes([0x01])),
+ ]
+ )
+
+ async with self._session.post(
+ f"{self._base_url}/pair-setup",
+ data=m1_data,
+ headers=hap_headers,
+ timeout=aiohttp.ClientTimeout(total=30),
+ ) as resp:
+ if resp.status != 200:
+ raise PlayerCommandFailed(f"M1 failed: HTTP {resp.status}")
+ m2_data = await resp.read()
+
+ # Parse M2
+ m2 = tlv_decode(m2_data)
+ if TLV_ERROR in m2:
+ raise PlayerCommandFailed(f"Device error in M2: {m2[TLV_ERROR].hex()}")
+
+ salt = m2.get(TLV_SALT)
+ server_pk_srp = m2.get(TLV_PUBLIC_KEY)
+ if not salt or not server_pk_srp:
+ raise PlayerCommandFailed("Invalid M2: missing salt or public key")
+
+ # M3: SRP authentication - create context with password
+ # PIN is passed directly as string (not "Pair-Setup:PIN")
+ # Note: pyatv doesn't specify bits_random, uses default
+ self._srp_context = SRPContext(
+ username="Pair-Setup",
+ password=pin,
+ prime=HAP_SRP_PRIME_3072,
+ generator=HAP_SRP_GENERATOR,
+ hash_func=hashlib.sha512,
+ )
+ # Pass Ed25519 private key bytes as the SRP "a" value (random private exponent)
+ # This is what pyatv does - use the client's Ed25519 private key as the SRP private value
+ if not self._client_private_key:
+ raise PlayerCommandFailed("Client private key not initialized")
+ auth_private = self._client_private_key.private_bytes(
+ encoding=serialization.Encoding.Raw,
+ format=serialization.PrivateFormat.Raw,
+ encryption_algorithm=serialization.NoEncryption(),
+ )
+ self._srp_session = SRPClientSession(
+ self._srp_context, binascii.hexlify(auth_private).decode()
+ )
+
+ # Process with server's public key and salt (as hex strings)
+ self._srp_session.process(server_pk_srp.hex(), salt.hex())
+
+ # Get client's public key and proof
+ client_pk_srp = bytes.fromhex(self._srp_session.public)
+ client_proof = bytes.fromhex(self._srp_session.key_proof.decode("ascii"))
+
+ m3_data = tlv_encode(
+ [
+ (TLV_STATE, bytes([0x03])),
+ (TLV_PUBLIC_KEY, client_pk_srp),
+ (TLV_PROOF, client_proof),
+ ]
+ )
+
+ async with self._session.post(
+ f"{self._base_url}/pair-setup",
+ data=m3_data,
+ headers=hap_headers,
+ timeout=aiohttp.ClientTimeout(total=30),
+ ) as resp:
+ if resp.status != 200:
+ raise PlayerCommandFailed(f"M3 failed: HTTP {resp.status}")
+ m4_data = await resp.read()
+
+ # Parse M4
+ m4 = tlv_decode(m4_data)
+ if TLV_ERROR in m4:
+ raise PlayerCommandFailed(f"Device error in M4: {m4[TLV_ERROR].hex()}")
+
+ server_proof = m4.get(TLV_PROOF)
+ if not server_proof:
+ raise PlayerCommandFailed("Invalid M4: missing proof")
+
+ # Verify server proof
+ if not self._srp_session.verify_proof(server_proof.hex().encode("ascii")):
+ raise PlayerCommandFailed("Server proof verification failed")
+
+ # Get session key
+ self._session_key = bytes.fromhex(self._srp_session.key.decode("ascii"))
+
+ # M5: Send encrypted client info
+ await self._send_hap_m5()
+
+ # Generate credentials
+ return self._generate_hap_credentials()
+
+ async def _send_hap_m5(self) -> None:
+ """Send M5 with encrypted client info and receive M6."""
+ if (
+ not self._session_key
+ or not self._client_private_key
+ or not self._client_public_key
+ or not self._session
+ ):
+ raise PlayerCommandFailed("Invalid state for M5")
+
+ # HAP headers required for pair-setup
+ hap_headers = {
+ "Content-Type": "application/octet-stream",
+ "X-Apple-HKP": "3",
+ }
+
+ # Derive keys
+ enc_key = hkdf_derive(
+ self._session_key,
+ b"Pair-Setup-Encrypt-Salt",
+ b"Pair-Setup-Encrypt-Info",
+ 32,
+ )
+ sign_key = hkdf_derive(
+ self._session_key,
+ b"Pair-Setup-Controller-Sign-Salt",
+ b"Pair-Setup-Controller-Sign-Info",
+ 32,
+ )
+
+ # Sign device info
+ device_info = sign_key + self._client_id + self._client_public_key
+ signature = self._client_private_key.sign(device_info)
+
+ # Create and encrypt inner TLV
+ inner_tlv = tlv_encode(
+ [
+ (TLV_IDENTIFIER, self._client_id),
+ (TLV_PUBLIC_KEY, self._client_public_key),
+ (TLV_SIGNATURE, signature),
+ ]
+ )
+
+ cipher = ChaCha20Poly1305(enc_key)
+ # Nonce format: 4 zero bytes + 8-byte message identifier = 12 bytes
+ nonce = b"\x00\x00\x00\x00PS-Msg05"
+ encrypted = cipher.encrypt(nonce, inner_tlv, None)
+
+ # Send M5
+ m5_data = tlv_encode(
+ [
+ (TLV_STATE, bytes([0x05])),
+ (TLV_ENCRYPTED_DATA, encrypted),
+ ]
+ )
+
+ async with self._session.post(
+ f"{self._base_url}/pair-setup",
+ data=m5_data,
+ headers=hap_headers,
+ timeout=aiohttp.ClientTimeout(total=30),
+ ) as resp:
+ if resp.status != 200:
+ raise PlayerCommandFailed(f"M5 failed: HTTP {resp.status}")
+ m6_data = await resp.read()
+
+ # Parse M6
+ m6 = tlv_decode(m6_data)
+ if TLV_ERROR in m6:
+ raise PlayerCommandFailed(f"Device error in M6: {m6[TLV_ERROR].hex()}")
+
+ encrypted_data = m6.get(TLV_ENCRYPTED_DATA)
+ if not encrypted_data:
+ raise PlayerCommandFailed("Invalid M6: missing encrypted data")
+
+ # Decrypt M6
+ # Nonce format: 4 zero bytes + 8-byte message identifier = 12 bytes
+ nonce = b"\x00\x00\x00\x00PS-Msg06"
+ decrypted = cipher.decrypt(nonce, encrypted_data, None)
+
+ # Extract server's public key
+ inner = tlv_decode(decrypted)
+ self._server_public_key = inner.get(TLV_PUBLIC_KEY)
+ if not self._server_public_key:
+ raise PlayerCommandFailed("Invalid M6: missing server public key")
+
+ def _generate_hap_credentials(self) -> str:
+ """Generate HAP credentials for cliap2.
+
+ Format: client_private_key(128 hex) + server_public_key(64 hex) = 192 hex chars
+
+ :return: Credentials string.
+ """
+ if (
+ not self._client_private_key
+ or not self._server_public_key
+ or not self._client_public_key
+ ):
+ raise PlayerCommandFailed("Missing keys for credential generation")
+
+ # Get raw private key (32 bytes seed)
+ private_key_bytes = self._client_private_key.private_bytes(
+ encoding=serialization.Encoding.Raw,
+ format=serialization.PrivateFormat.Raw,
+ encryption_algorithm=serialization.NoEncryption(),
+ )
+
+ # Expand to 64-byte Ed25519 secret key format (seed + public_key)
+ if len(private_key_bytes) == 32:
+ private_key_bytes = private_key_bytes + self._client_public_key
+
+ if len(private_key_bytes) != 64 or len(self._server_public_key) != 32:
+ raise PlayerCommandFailed("Invalid key lengths")
+
+ return binascii.hexlify(private_key_bytes).decode("ascii") + binascii.hexlify(
+ self._server_public_key
+ ).decode("ascii")
+
+ # ========================================================================
+ # RAOP (AirPlay 1 legacy) pairing implementation
+ # ========================================================================
+
+ def _compute_raop_premaster_secret(
+ self,
+ user_id: str,
+ password: str,
+ salt: bytes,
+ client_private: bytes,
+ client_public: bytes,
+ server_public: bytes,
+ ) -> bytes:
+ """Compute RAOP SRP premaster secret S.
+
+ S = (B - k*v)^(a + u*x) mod N
+
+ :param user_id: Username (hex-encoded client_id).
+ :param password: PIN code.
+ :param salt: Salt from server.
+ :param client_private: Client private key (a) as bytes.
+ :param client_public: Client public key (A) as bytes.
+ :param server_public: Server public key (B) as bytes.
+ :return: Premaster secret S as bytes (padded to N length).
+ """
+ # Convert values to integers
+ n_bytes = bytes.fromhex(RAOP_SRP_PRIME_2048)
+ n_len = len(n_bytes)
+ n = int.from_bytes(n_bytes, "big")
+ g = int.from_bytes(bytes.fromhex(RAOP_SRP_GENERATOR), "big")
+
+ a = int.from_bytes(client_private, "big")
+ b_pub = int.from_bytes(server_public, "big")
+
+ # x = H(s | H(I : P))
+ inner_hash = hashlib.sha1(f"{user_id}:{password}".encode()).digest()
+ x = int.from_bytes(hashlib.sha1(salt + inner_hash).digest(), "big")
+
+ # k = H(N | PAD(g))
+ g_padded = bytes.fromhex(RAOP_SRP_GENERATOR).rjust(n_len, b"\x00")
+ k = int.from_bytes(hashlib.sha1(n_bytes + g_padded).digest(), "big")
+
+ # u = H(PAD(A) | PAD(B))
+ a_padded = client_public.rjust(n_len, b"\x00")
+ b_padded = server_public.rjust(n_len, b"\x00")
+ u = int.from_bytes(hashlib.sha1(a_padded + b_padded).digest(), "big")
+
+ # v = g^x mod N
+ v = pow(g, x, n)
+
+ # S = (B - k*v)^(a + u*x) mod N
+ s_int = pow(b_pub - k * v, a + u * x, n)
+
+ # Convert to bytes and pad to N length
+ s_bytes = s_int.to_bytes((s_int.bit_length() + 7) // 8, "big")
+ return s_bytes.rjust(n_len, b"\x00")
+
+ def _compute_raop_session_key(self, premaster_secret: bytes) -> bytes:
+ r"""Compute RAOP session key K from premaster secret S.
+
+ K = SHA1(S | \x00\x00\x00\x00) | SHA1(S | \x00\x00\x00\x01)
+
+ This produces a 40-byte key (two SHA1 hashes concatenated).
+
+ :param premaster_secret: The SRP premaster secret S.
+ :return: 40-byte session key K.
+ """
+ k1 = hashlib.sha1(premaster_secret + b"\x00\x00\x00\x00").digest()
+ k2 = hashlib.sha1(premaster_secret + b"\x00\x00\x00\x01").digest()
+ return k1 + k2
+
+ def _compute_raop_m1(
+ self, user_id: str, salt: bytes, client_pk: bytes, server_pk: bytes, session_key: bytes
+ ) -> bytes:
+ """Compute RAOP SRP M1 proof with padding for A and B (but not g).
+
+ M1 = H(H(N) XOR H(g) | H(I) | s | PAD(A) | PAD(B) | K)
+
+ Note: g is NOT padded, but A and B ARE padded to N length.
+ K is 40 bytes (from _compute_raop_session_key).
+
+ :param user_id: Username (hex-encoded client_id).
+ :param salt: Salt bytes from server.
+ :param client_pk: Client public key (A).
+ :param server_pk: Server public key (B).
+ :param session_key: Session key (K) - 40 bytes.
+ :return: M1 proof bytes (20 bytes for SHA-1).
+ """
+ n_bytes = bytes.fromhex(RAOP_SRP_PRIME_2048)
+ n_len = len(n_bytes)
+ g_bytes = bytes.fromhex(RAOP_SRP_GENERATOR)
+
+ # H(N) XOR H(g) - g is NOT padded
+ h_n = hashlib.sha1(n_bytes).digest()
+ h_g = hashlib.sha1(g_bytes).digest()
+ h_n_xor_h_g = bytes(a ^ b for a, b in zip(h_n, h_g, strict=True))
+
+ # H(I) - hash of username
+ h_i = hashlib.sha1(user_id.encode("ascii")).digest()
+
+ # PAD A and B to N length
+ a_padded = client_pk.rjust(n_len, b"\x00")
+ b_padded = server_pk.rjust(n_len, b"\x00")
+
+ # M1 = H(H(N) XOR H(g) | H(I) | s | PAD(A) | PAD(B) | K)
+ m1_data = h_n_xor_h_g + h_i + salt + a_padded + b_padded + session_key
+ return hashlib.sha1(m1_data).digest()
+
+ def _compute_raop_client_public(self, auth_secret: bytes) -> bytes:
+ """Compute RAOP SRP client public key A = g^a mod N.
+
+ :param auth_secret: 32-byte random secret (used as SRP private key a).
+ :return: Client public key A as bytes.
+ """
+ n_bytes = bytes.fromhex(RAOP_SRP_PRIME_2048)
+ n = int.from_bytes(n_bytes, "big")
+ g = int.from_bytes(bytes.fromhex(RAOP_SRP_GENERATOR), "big")
+ a = int.from_bytes(auth_secret, "big")
+ a_pub = pow(g, a, n)
+ return a_pub.to_bytes((a_pub.bit_length() + 7) // 8, "big")
+
+ async def _finish_raop_pairing(self, pin: str) -> str:
+ """Complete RAOP pairing for AirPlay 1.
+
+ :param pin: 4-digit PIN.
+ :return: Credentials (client_id:auth_secret format).
+ """
+ if not self._session:
+ raise PlayerCommandFailed("Pairing not started")
+
+ self.logger.info("Completing RAOP pairing with PIN")
+
+ # Generate 32-byte auth secret
+ auth_secret = os.urandom(32)
+
+ # Derive Ed25519 public key from auth secret
+ # For RAOP, we use the auth_secret as the Ed25519 seed
+ from cryptography.hazmat.primitives.asymmetric.ed25519 import ( # noqa: PLC0415
+ Ed25519PrivateKey as Ed25519Key,
+ )
+
+ auth_private_key = Ed25519Key.from_private_bytes(auth_secret)
+ auth_public_key = auth_private_key.public_key().public_bytes(
+ encoding=serialization.Encoding.Raw,
+ format=serialization.PublicFormat.Raw,
+ )
+
+ # Step 1: Send device ID and method
+ user_id = self._client_id.hex().upper()
+ step1_plist = {
+ "method": "pin",
+ "user": user_id,
+ }
+
+ async with self._session.post(
+ f"{self._base_url}/pair-setup-pin",
+ data=plistlib.dumps(step1_plist, fmt=plistlib.FMT_BINARY),
+ headers={"Content-Type": "application/x-apple-binary-plist"},
+ timeout=aiohttp.ClientTimeout(total=30),
+ ) as resp:
+ if resp.status != 200:
+ raise PlayerCommandFailed(f"RAOP step 1 failed: HTTP {resp.status}")
+ step1_response = plistlib.loads(await resp.read())
+
+ # Get salt and server public key
+ salt, server_pk = step1_response.get("salt"), step1_response.get("pk")
+ if not salt or not server_pk:
+ raise PlayerCommandFailed("Invalid RAOP step 1 response")
+
+ # Step 2: SRP authentication
+ # Apple uses a custom K formula: K = SHA1(S|0000) | SHA1(S|0001) (40 bytes)
+ client_pk = self._compute_raop_client_public(auth_secret)
+ premaster_secret = self._compute_raop_premaster_secret(
+ user_id, pin, salt, auth_secret, client_pk, server_pk
+ )
+ session_key = self._compute_raop_session_key(premaster_secret)
+ client_proof = self._compute_raop_m1(user_id, salt, client_pk, server_pk, session_key)
+
+ step2_plist = {
+ "pk": client_pk,
+ "proof": client_proof,
+ }
+
+ async with self._session.post(
+ f"{self._base_url}/pair-setup-pin",
+ data=plistlib.dumps(step2_plist, fmt=plistlib.FMT_BINARY),
+ headers={"Content-Type": "application/x-apple-binary-plist"},
+ timeout=aiohttp.ClientTimeout(total=30),
+ ) as resp:
+ if resp.status != 200:
+ raise PlayerCommandFailed(f"RAOP step 2 failed: HTTP {resp.status}")
+ step2_response = plistlib.loads(await resp.read())
+
+ # Verify server proof M2 exists (verification optional)
+ server_proof = step2_response.get("proof")
+ if not server_proof:
+ raise PlayerCommandFailed("RAOP server did not return proof")
+ self._session_key = session_key
+
+ # Step 3: Encrypt and send auth public key using AES-GCM
+ # Derive AES key and IV from session key K (40 bytes)
+ aes_key = hashlib.sha512(b"Pair-Setup-AES-Key" + session_key).digest()[:16]
+ aes_iv = bytearray(hashlib.sha512(b"Pair-Setup-AES-IV" + session_key).digest()[:16])
+ aes_iv[-1] = (aes_iv[-1] + 1) % 256 # Increment last byte
+
+ # Encrypt auth public key with AES-GCM
+ cipher = Cipher(algorithms.AES(aes_key), modes.GCM(bytes(aes_iv)))
+ encryptor = cipher.encryptor()
+ encrypted_pk = encryptor.update(auth_public_key) + encryptor.finalize()
+ tag = encryptor.tag
+
+ step3_plist = {
+ "epk": encrypted_pk,
+ "authTag": tag,
+ }
+
+ async with self._session.post(
+ f"{self._base_url}/pair-setup-pin",
+ data=plistlib.dumps(step3_plist, fmt=plistlib.FMT_BINARY),
+ headers={"Content-Type": "application/x-apple-binary-plist"},
+ timeout=aiohttp.ClientTimeout(total=30),
+ ) as resp:
+ if resp.status != 200:
+ raise PlayerCommandFailed(f"RAOP step 3 failed: HTTP {resp.status}")
+
+ # Return credentials in cliraop format: client_id:auth_secret
+ return f"{self._client_id.hex()}:{auth_secret.hex()}"
+
+ # ========================================================================
+ # Cleanup
+ # ========================================================================
+
+ async def close(self) -> None:
+ """Clean up resources."""
+ self._is_pairing = False
+ if self._session:
+ await self._session.close()
+ self._session = None
+ self._srp_context = None
+ self._srp_session = None
+ self._session_key = None
from music_assistant_models.config_entries import ConfigEntry, ConfigValueOption, ConfigValueType
from music_assistant_models.enums import ConfigEntryType, PlaybackState, PlayerFeature, PlayerType
-from propcache import under_cached_property as cached_property
from music_assistant.constants import (
CONF_ENTRY_DEPRECATED_EQ_BASS,
AIRPLAY_FLOW_PCM_FORMAT,
CACHE_CATEGORY_PREV_VOLUME,
CONF_ACTION_FINISH_PAIRING,
+ CONF_ACTION_RESET_PAIRING,
CONF_ACTION_START_PAIRING,
+ CONF_AIRPLAY_CREDENTIALS,
CONF_AIRPLAY_PROTOCOL,
CONF_ALAC_ENCODE,
- CONF_AP_CREDENTIALS,
CONF_ENCRYPTION,
CONF_IGNORE_VOLUME,
CONF_PAIRING_PIN,
CONF_PASSWORD,
+ CONF_RAOP_CREDENTIALS,
FALLBACK_VOLUME,
RAOP_DISCOVERY_TYPE,
StreamingProtocol,
get_primary_ip_address_from_zeroconf,
is_airplay2_preferred_model,
is_broken_airplay_model,
+ player_id_to_mac_address,
)
from .stream_session import AirPlayStreamSession
if TYPE_CHECKING:
from zeroconf.asyncio import AsyncServiceInfo
+ from .pairing import AirPlayPairing
+ from .protocols._protocol import AirPlayProtocol
from .protocols.airplay2 import AirPlay2Stream
from .protocols.raop import RaopStream
from .provider import AirPlayProvider
self.stream: RaopStream | AirPlay2Stream | None = None
self.last_command_sent = 0.0
self._lock = asyncio.Lock()
+ self._active_pairing: AirPlayPairing | None = None
+ self._transitioning = False # Set during stream replacement to ignore stale DACP messages
# Set (static) player attributes
self._attr_type = PlayerType.PLAYER
self._attr_name = display_name
model=model,
manufacturer=manufacturer,
ip_address=address,
+ mac_address=player_id_to_mac_address(player_id),
)
self._attr_supported_features = {
PlayerFeature.PAUSE,
self._attr_can_group_with = {provider.instance_id}
self._attr_enabled_by_default = not is_broken_airplay_model(manufacturer, model)
- @cached_property
+ @property
def protocol(self) -> StreamingProtocol:
- """Get the streaming protocol to use for this player."""
- protocol_value = self.config.get_value(CONF_AIRPLAY_PROTOCOL)
- # Convert integer value to StreamingProtocol enum
- if protocol_value == 2:
- return StreamingProtocol.AIRPLAY2
- return StreamingProtocol.RAOP
+ """Get the streaming protocol to use/prefer for this player."""
+ preferred_option = cast("int", self.config.get_value(CONF_AIRPLAY_PROTOCOL))
+ return self._get_protocol_for_config_value(preferred_option)
@property
def available(self) -> bool:
"""Return if the player is currently available."""
if self._requires_pairing():
- # check if we have credentials stored
- credentials = self.config.get_value(CONF_AP_CREDENTIALS)
- if not credentials:
+ # check if we have credentials stored for the current protocol
+ creds_key = self._get_credentials_key(self.protocol)
+ if not self.config.get_value(creds_key):
return False
return super().available
if not self.stream or not self.stream.session:
return super().corrected_elapsed_time or 0.0
session = self.stream.session
- elapsed = time.time() - session.last_stream_started - session.total_pause_time
+ elapsed = time.time() - session.start_time - session.total_pause_time
if session.last_paused is not None:
current_pause = time.time() - session.last_paused
elapsed -= current_pause
key=CONF_AIRPLAY_PROTOCOL,
type=ConfigEntryType.INTEGER,
required=False,
- label="AirPlay version to use for streaming",
+ label="AirPlay protocol version to use for streaming",
description="AirPlay version 1 protocol uses RAOP.\n"
"AirPlay version 2 is an extension of RAOP.\n"
"Some newer devices do not fully support RAOP and "
- "will only work with AirPlay version 2.",
+ "will only work with AirPlay version 2, "
+ "while older devices may only support RAOP.\n\n"
+ "In most cases the default automatic selection will work fine.",
category="airplay",
options=[
- ConfigValueOption("AirPlay 1 (RAOP)", StreamingProtocol.RAOP.value),
- ConfigValueOption("AirPlay 2", StreamingProtocol.AIRPLAY2.value),
+ ConfigValueOption("Automatically select", 0),
+ ConfigValueOption("Prefer AirPlay 1 (RAOP)", StreamingProtocol.RAOP.value),
+ ConfigValueOption("Prefer AirPlay 2", StreamingProtocol.AIRPLAY2.value),
],
- default_value=StreamingProtocol.AIRPLAY2.value
- if is_airplay2_preferred_model(
- self.device_info.manufacturer, self.device_info.model
- )
- else StreamingProtocol.RAOP.value,
+ default_value=0,
),
ConfigEntry(
key=CONF_ENCRYPTION,
category="airplay",
depends_on=CONF_AIRPLAY_PROTOCOL,
depends_on_value=StreamingProtocol.RAOP.value,
+ hidden=self.protocol != StreamingProtocol.RAOP,
),
ConfigEntry(
key=CONF_ALAC_ENCODE,
category="airplay",
depends_on=CONF_AIRPLAY_PROTOCOL,
depends_on_value=StreamingProtocol.RAOP.value,
+ hidden=self.protocol != StreamingProtocol.RAOP,
),
CONF_ENTRY_SYNC_ADJUST,
ConfigEntry(
"Enable this option to ignore these reports."
),
category="airplay",
+ # TODO: remove depends_on when DACP support is added for AirPlay2
depends_on=CONF_AIRPLAY_PROTOCOL,
depends_on_value=StreamingProtocol.RAOP.value,
+ hidden=self.protocol != StreamingProtocol.RAOP,
),
]
# Mac devices (including iMac, MacBook, Mac mini, Mac Pro, Mac Studio)
return model.startswith(("Mac", "iMac"))
+ def _get_credentials_key(self, protocol: StreamingProtocol) -> str:
+ """Get the config key for credentials for given protocol."""
+ if protocol == StreamingProtocol.RAOP:
+ return CONF_RAOP_CREDENTIALS
+ return CONF_AIRPLAY_CREDENTIALS
+
+ def _get_protocol_for_config_value(self, config_option: int) -> StreamingProtocol:
+ if config_option == StreamingProtocol.AIRPLAY2 and self.airplay_discovery_info:
+ return StreamingProtocol.AIRPLAY2
+ if config_option == StreamingProtocol.RAOP and self.raop_discovery_info:
+ return StreamingProtocol.RAOP
+ # automatic selection
+ if self.airplay_discovery_info and is_airplay2_preferred_model(
+ self.device_info.manufacturer, self.device_info.model
+ ):
+ return StreamingProtocol.AIRPLAY2
+ return StreamingProtocol.RAOP
+
def _get_pairing_config_entries(
self, values: dict[str, ConfigValueType] | None
) -> list[ConfigEntry]:
- """Return pairing config entries for Apple TV and macOS devices.
+ """
+ Return pairing config entries for Apple TV and macOS devices.
- Uses cliraop for AirPlay/RAOP pairing.
+ Uses native pairing for both AirPlay 2 (HAP) and RAOP protocols.
"""
entries: list[ConfigEntry] = []
- # Check if we have credentials stored
- if values and (creds := values.get(CONF_AP_CREDENTIALS)):
- credentials = str(creds)
+ # Determine protocol name for UI
+ conf_protocol: int = 0
+ if values and (val := values.get(CONF_AIRPLAY_PROTOCOL)):
+ conf_protocol = cast("int", val)
else:
- credentials = str(self.config.get_value(CONF_AP_CREDENTIALS) or "")
- has_credentials = bool(credentials)
-
- if not has_credentials:
- # Show pairing instructions and start button
- if not self.stream and self.protocol == StreamingProtocol.RAOP:
- # ensure we have a stream instance to track pairing state
- from .protocols.raop import RaopStream # noqa: PLC0415
-
- self.stream = RaopStream(self)
- elif not self.stream and self.protocol == StreamingProtocol.AIRPLAY2:
- # ensure we have a stream instance to track pairing state
- from .protocols.airplay2 import AirPlay2Stream # noqa: PLC0415
-
- self.stream = AirPlay2Stream(self)
- if self.stream and not self.stream.supports_pairing:
- # TEMP until ap2 pairing is implemented
- return [
- ConfigEntry(
- key="pairing_unsupported",
- type=ConfigEntryType.ALERT,
- label=(
- "This device requires pairing but it is not supported "
- "by the current Music Assistant AirPlay implementation."
- ),
- )
- ]
+ conf_protocol = cast("int", self.config.get_value(CONF_AIRPLAY_PROTOCOL, 0) or 0)
+ protocol = self._get_protocol_for_config_value(conf_protocol)
+ protocol_name = "RAOP" if protocol == StreamingProtocol.RAOP else "AirPlay"
+ protocol_key = (
+ CONF_RAOP_CREDENTIALS
+ if protocol == StreamingProtocol.RAOP
+ else CONF_AIRPLAY_CREDENTIALS
+ )
+ has_creds_for_current_protocol = (
+ values.get(protocol_key) if values else self.config.get_value(protocol_key)
+ )
+ if not has_creds_for_current_protocol:
# If pairing was started, show PIN entry
- if self.stream and self.stream.is_pairing:
+ if self._active_pairing and self._active_pairing.is_pairing:
entries.append(
ConfigEntry(
key=CONF_PAIRING_PIN,
ConfigEntry(
key=CONF_ACTION_FINISH_PAIRING,
type=ConfigEntryType.ACTION,
- label="Complete the pairing process with the PIN",
+ label=f"Complete {protocol_name} pairing with the PIN",
action=CONF_ACTION_FINISH_PAIRING,
)
)
else:
+ # Show pairing instructions and start button
entries.append(
ConfigEntry(
key="pairing_instructions",
type=ConfigEntryType.LABEL,
label=(
- "This device requires pairing before it can be used. "
+ f"This device requires {protocol_name} pairing before it can be used. "
"Click the button below to start the pairing process."
),
)
ConfigEntry(
key=CONF_ACTION_START_PAIRING,
type=ConfigEntryType.ACTION,
- label="Start the AirPlay pairing process",
+ label=f"Start {protocol_name} pairing",
action=CONF_ACTION_START_PAIRING,
)
)
ConfigEntry(
key="pairing_status",
type=ConfigEntryType.LABEL,
- label="Device is paired and ready to use.",
+ label=f"Device is paired ({protocol_name}) and ready to use.",
+ )
+ )
+ # Add reset pairing button
+ entries.append(
+ ConfigEntry(
+ key=CONF_ACTION_RESET_PAIRING,
+ type=ConfigEntryType.ACTION,
+ label=f"Reset {protocol_name} pairing",
+ action=CONF_ACTION_RESET_PAIRING,
)
)
# Store credentials (hidden from UI)
- entries.append(
- ConfigEntry(
- key=CONF_AP_CREDENTIALS,
- type=ConfigEntryType.SECURE_STRING,
- label="AirPlay Credentials",
- default_value=credentials,
- value=credentials,
- required=False,
- hidden=True,
+ for protocol in (StreamingProtocol.RAOP, StreamingProtocol.AIRPLAY2):
+ conf_key = self._get_credentials_key(protocol)
+ entries.append(
+ ConfigEntry(
+ key=conf_key,
+ type=ConfigEntryType.SECURE_STRING,
+ label=conf_key,
+ default_value=None,
+ value=values.get(conf_key) if values else None,
+ required=False,
+ hidden=True,
+ )
)
- )
-
return entries
async def _handle_pairing_action(
self, action: str, values: dict[str, ConfigValueType] | None
) -> None:
- """Handle pairing actions using the configured protocol."""
- if not self.stream and self.protocol == StreamingProtocol.RAOP:
- # ensure we have a stream instance to track pairing state
- from .protocols.raop import RaopStream # noqa: PLC0415
+ """
+ Handle pairing actions.
- self.stream = RaopStream(self)
- elif not self.stream and self.protocol == StreamingProtocol.AIRPLAY2:
- # ensure we have a stream instance to track pairing state
- from .protocols.airplay2 import AirPlay2Stream # noqa: PLC0415
+ Uses native pairing for both AirPlay 2 (HAP) and RAOP protocols.
+ Both produce credentials compatible with cliap2/cliraop respectively.
+ """
+ conf_protocol: int = 0
+ if values and (val := values.get(CONF_AIRPLAY_PROTOCOL)):
+ conf_protocol = cast("int", val)
+ else:
+ conf_protocol = cast("int", self.config.get_value(CONF_AIRPLAY_PROTOCOL, 0) or 0)
+ protocol = self._get_protocol_for_config_value(conf_protocol)
+ protocol_name = "RAOP" if protocol == StreamingProtocol.RAOP else "AirPlay"
- self.stream = AirPlay2Stream(self)
if action == CONF_ACTION_START_PAIRING:
- if self.stream and self.stream.is_pairing:
+ if self._active_pairing and self._active_pairing.is_pairing:
self.logger.warning("Pairing process already in progress for %s", self.display_name)
return
- self.logger.info("Started AirPlay pairing for %s", self.display_name)
- if self.stream:
- await self.stream.start_pairing()
+
+ self.logger.info("Starting %s pairing for %s", protocol_name, self.display_name)
+
+ from .pairing import AirPlayPairing # noqa: PLC0415
+
+ # Determine port based on protocol
+ # Note: For Apple devices, pairing always happens on the AirPlay port (7000)
+ # even when streaming will use RAOP. The RAOP port (5000) is only for streaming.
+ port: int | None = None
+ if self.airplay_discovery_info:
+ port = self.airplay_discovery_info.port or 7000
+ elif self.raop_discovery_info:
+ # Fallback for devices without AirPlay service
+ port = self.raop_discovery_info.port or 5000
+ # Get the DACP ID from the provider - must match what cliap2 uses
+ provider = cast("AirPlayProvider", self.provider)
+ device_id = provider.dacp_id
+
+ self._active_pairing = AirPlayPairing(
+ address=self.address,
+ name=self.display_name,
+ protocol=protocol,
+ logger=self.logger,
+ port=port,
+ device_id=device_id,
+ )
+ await self._active_pairing.start_pairing()
elif action == CONF_ACTION_FINISH_PAIRING:
if not values:
- # guard
return
pin = values.get(CONF_PAIRING_PIN)
self.logger.warning("No PIN provided for pairing")
return
- if self.stream:
- credentials = await self.stream.finish_pairing(pin=str(pin))
- else:
+ if not self._active_pairing:
+ self.logger.warning("No active pairing session for %s", self.display_name)
return
- values[CONF_AP_CREDENTIALS] = credentials
+ credentials = await self._active_pairing.finish_pairing(pin=str(pin))
+ self._active_pairing = None
- self.logger.info(
- "Finished AirPlay pairing for %s",
- self.display_name,
- )
+ # Store credentials with the protocol-specific key
+ cred_key = self._get_credentials_key(protocol)
+ values[cred_key] = credentials
+
+ self.logger.info("Finished %s pairing for %s", protocol_name, self.display_name)
+
+ elif action == CONF_ACTION_RESET_PAIRING:
+ cred_key = self._get_credentials_key(protocol)
+ self.logger.info("Resetting %s pairing for %s", protocol_name, self.display_name)
+ if values is not None:
+ values[cred_key] = None
async def stop(self) -> None:
"""Send STOP command to player."""
"""Send PLAY (unpause) command to player."""
async with self._lock:
if self.stream and self.stream.running:
- await self.stream.send_cli_command("ACTION=PLAY\n")
+ await self.stream.send_cli_command("ACTION=PLAY")
async def pause(self) -> None:
"""Send PAUSE command to player."""
async with self._lock:
if not self.stream or not self.stream.running:
return
- await self.stream.send_cli_command("ACTION=PAUSE\n")
+ await self.stream.send_cli_command("ACTION=PAUSE")
async def play_media(self, media: PlayerMedia) -> None:
"""Handle PLAY MEDIA on given player."""
raise RuntimeError("Player is synced")
self._attr_current_media = media
+ # Always stop any existing stream
+ if self.stream and self.stream.running and self.stream.session:
+ # Set transitioning flag to ignore stale DACP messages (like prevent-playback)
+ self._transitioning = True
+ # Force stop the session (to speed up stopping)
+ await self.stream.session.stop(force=True)
+ self.stream = None
+
# select audio source
audio_source = self.mass.streams.get_stream(media, AIRPLAY_FLOW_PCM_FORMAT)
- # if an existing stream session is running, we could replace it with the new stream
- if self.stream and self.stream.running:
- # check if we need to replace the stream
- if self.stream.prevent_playback:
- # player is in prevent playback mode, we need to stop the stream
- await self.stop()
- elif self.stream.session:
- await self.stream.session.replace_stream(audio_source)
- return
-
# setup StreamSession for player (and its sync childs if any)
sync_clients = self._get_sync_clients()
provider = cast("AirPlayProvider", self.provider)
stream_session = AirPlayStreamSession(provider, sync_clients, AIRPLAY_FLOW_PCM_FORMAT)
await stream_session.start(audio_source)
+ self._transitioning = False
async def volume_set(self, volume_level: int) -> None:
"""Send VOLUME_SET command to given player."""
if self.stream and self.stream.running:
- await self.stream.send_cli_command(f"VOLUME={volume_level}\n")
+ await self.stream.send_cli_command(f"VOLUME={volume_level}")
self._attr_volume_level = volume_level
self.update_state()
# store last state in cache
self.update_state()
def set_state_from_stream(
- self, state: PlaybackState | None = None, elapsed_time: float | None = None
+ self,
+ state: PlaybackState | None = None,
+ elapsed_time: float | None = None,
+ stream: AirPlayProtocol | None = None,
) -> None:
- """Set the playback state from stream (RAOP or AirPlay2)."""
+ """Set the playback state from stream (RAOP or AirPlay2).
+
+ :param state: New playback state (or None to keep current).
+ :param elapsed_time: New elapsed time (or None to keep current).
+ :param stream: The stream instance sending this update (for validation).
+ """
+ # Ignore state updates from old/stale streams
+ if stream is not None and stream != self.stream:
+ return
+
if state is not None:
prev_state = self._attr_playback_state
self._attr_playback_state = state
if self.stream.running and self.stream.session:
self.mass.create_task(self.stream.session.stop())
self.stream = None
+ if self._active_pairing:
+ await self._active_pairing.close()
+ self._active_pairing = None
def _get_sync_clients(self) -> list[AirPlayPlayer]:
"""Get all sync clients for a player."""
from __future__ import annotations
+import asyncio
import time
from abc import ABC, abstractmethod
-from random import randint
from typing import TYPE_CHECKING
from music_assistant_models.enums import PlaybackState
-from music_assistant.constants import VERBOSE_LOG_LEVEL
from music_assistant.helpers.named_pipe import AsyncNamedPipeWriter
from music_assistant.providers.airplay.constants import AIRPLAY_PCM_FORMAT
+from music_assistant.providers.airplay.helpers import generate_active_remote_id
if TYPE_CHECKING:
from music_assistant_models.player import PlayerMedia
# the pcm audio format used for streaming to this protocol
pcm_format = AIRPLAY_PCM_FORMAT
- supports_pairing = False # whether this protocol supports pairing
- is_pairing: bool = False # whether this protocol instance is in pairing mode
def __init__(
self,
self.mass = player.provider.mass
self.player = player
self.logger = player.provider.logger.getChild(f"protocol.{self.__class__.__name__}")
- # Generate unique ID to prevent race conditions with named pipes
- self.active_remote_id: str = str(randint(1000, 8000))
+ mac_address = self.player.device_info.mac_address or self.player.player_id
+ self.active_remote_id: str = generate_active_remote_id(mac_address)
self.prevent_playback: bool = False
self._cli_proc: AsyncProcess | None = None
- self.audio_pipe = AsyncNamedPipeWriter(
- f"/tmp/{self.player.protocol.value}-{self.player.player_id}-{self.active_remote_id}-audio", # noqa: S108
- )
self.commands_pipe = AsyncNamedPipeWriter(
f"/tmp/{self.player.protocol.value}-{self.player.player_id}-{self.active_remote_id}-cmd", # noqa: S108
)
- # State tracking
self._stopped = False
self._total_bytes_sent = 0
self._stream_bytes_sent = 0
+ self._connected = asyncio.Event()
@property
def running(self) -> bool:
@abstractmethod
async def start(self, start_ntp: int) -> None:
- """Initialize streaming process for the player.
+ """Start the CLI process.
- Args:
- start_ntp: NTP timestamp to start streaming
+ :param start_ntp: NTP timestamp to start streaming.
"""
- async def stop(self) -> None:
- """Stop playback and cleanup."""
- # Send stop command before setting _stopped flag
- await self.send_cli_command("ACTION=STOP")
- self._stopped = True
-
- # Close the CLI process (wait for it to terminate)
- if self._cli_proc and not self._cli_proc.closed:
- await self._cli_proc.close()
+ async def wait_for_connection(self) -> None:
+ """Wait for device connection to be established."""
+ if not self._cli_proc:
+ return
+ await asyncio.wait_for(self._connected.wait(), timeout=10)
+ # repeat sending the volume level to the player because some players seem
+ # to ignore it the first time
+ # https://github.com/music-assistant/support/issues/3330
+ self.mass.call_later(2, self.send_cli_command(f"VOLUME={self.player.volume_level}"))
- self.player.set_state_from_stream(state=PlaybackState.IDLE, elapsed_time=0)
+ async def stop(self, force: bool = False) -> None:
+ """
+ Stop playback and cleanup.
- # Cleanup named pipes
- await self.audio_pipe.remove()
+ :param force: If True, immediately kill the process without graceful shutdown.
+ """
+ # always send stop command first
+ await self.send_cli_command("ACTION=STOP")
+ if self._cli_proc:
+ await self._cli_proc.write_eof()
+ self._stopped = True
await self.commands_pipe.remove()
-
- async def start_pairing(self) -> None:
- """Start pairing process for this protocol (if supported)."""
- raise NotImplementedError("Pairing not implemented for this protocol")
-
- async def finish_pairing(self, pin: str) -> str:
- """Finish pairing process with given PIN (if supported)."""
- raise NotImplementedError("Pairing not implemented for this protocol")
+ if force:
+ if self._cli_proc and not self._cli_proc.closed:
+ await self._cli_proc.kill()
+ elif self._cli_proc and not self._cli_proc.closed:
+ await self._cli_proc.close()
+ if not force:
+ self.player.set_state_from_stream(state=PlaybackState.IDLE, elapsed_time=0)
async def send_cli_command(self, command: str) -> None:
"""Send an interactive command to the running CLI binary."""
return
if not self.commands_pipe:
return
-
- self.player.logger.log(VERBOSE_LOG_LEVEL, "sending command %s", command)
self.player.last_command_sent = time.time()
+ if not command.endswith("\n"):
+ command += "\n"
await self.commands_pipe.write(command.encode("utf-8"))
async def send_metadata(self, progress: int | None, metadata: PlayerMedia | None) -> None:
await self.send_cli_command(cmd)
# get image
if metadata.image_url:
- await self.send_cli_command(f"ARTWORK={metadata.image_url}\n")
+ await self.send_cli_command(f"ARTWORK={metadata.image_url}")
if progress is not None:
- await self.send_cli_command(f"PROGRESS={progress}\n")
+ await self.send_cli_command(f"PROGRESS={progress}")
import asyncio
import logging
+from typing import TYPE_CHECKING, cast
from music_assistant_models.enums import PlaybackState
-from music_assistant_models.errors import PlayerCommandFailed
from music_assistant.constants import CONF_SYNC_ADJUST, VERBOSE_LOG_LEVEL
from music_assistant.helpers.process import AsyncProcess
from music_assistant.providers.airplay.constants import (
AIRPLAY2_MIN_LOG_LEVEL,
+ CONF_AIRPLAY_CREDENTIALS,
)
from music_assistant.providers.airplay.helpers import get_cli_binary
from ._protocol import AirPlayProtocol
+if TYPE_CHECKING:
+ from music_assistant.providers.airplay.provider import AirPlayProvider
+
class AirPlay2Stream(AirPlayProtocol):
"""
AirPlay 2 Audio Streamer.
- Python is not suitable for realtime audio streaming so we do the actual streaming
- of audio using a small executable written in C based on owntones to do
- the actual timestamped playback. It reads pcm audio from a named pipe
- and we can send some interactive commands using another named pipe.
+ Uses cliap2 (C executable based on owntone) for timestamped playback.
+ Audio is fed via stdin, commands via a named pipe.
"""
@property
mass_level = 5
return max(mass_level, AIRPLAY2_MIN_LOG_LEVEL)
- async def start(self, start_ntp: int, skip: int = 0) -> None:
- """Initialize CLI process for a player."""
+ async def start(self, start_ntp: int) -> None:
+ """Start cliap2 process."""
cli_binary = await get_cli_binary(self.player.protocol)
assert self.player.airplay_discovery_info is not None
for key, value in self.player.airplay_discovery_info.decoded_properties.items():
txt_kv += f'"{key}={value}" '
- # Note: skip parameter is accepted for API compatibility with base class
- # but is not currently used by the cliap2 binary (AirPlay2 handles late joiners differently)
-
# cliap2 is the binary that handles the actual streaming to the player
# this binary leverages from the AirPlay2 support in owntones
# https://github.com/music-assistant/cliairplay
+ # Get AirPlay credentials if available (for Apple devices that require pairing)
+ airplay_credentials: str | None = None
+ if creds := self.player.config.get_value(CONF_AIRPLAY_CREDENTIALS):
+ airplay_credentials = str(creds)
+
+ # Get the provider's DACP ID for remote control callbacks
+ prov = cast("AirPlayProvider", self.prov)
+
cli_args = [
cli_binary,
"--name",
str(self.player.volume_level),
"--loglevel",
str(self._cli_loglevel),
+ "--dacp_id",
+ prov.dacp_id,
+ "--active_remote",
+ self.active_remote_id,
"--pipe",
- self.audio_pipe.path,
+ "-", # Use stdin for audio input
"--command_pipe",
self.commands_pipe.path,
]
+ # Add credentials for authenticated AirPlay devices (Apple TV, HomePod, etc.)
+ # Native HAP pairing format: 192 hex chars = client_private_key(128) + server_public_key(64)
+ if airplay_credentials:
+ if len(airplay_credentials) == 192:
+ cli_args += ["--auth", airplay_credentials]
+ else:
+ self.player.logger.warning(
+ "Invalid credentials length: %d (expected 192)",
+ len(airplay_credentials),
+ )
+
self.player.logger.debug(
"Starting cliap2 process for player %s with args: %s",
player_id,
cli_args,
)
- self._cli_proc = AsyncProcess(cli_args, stdin=False, stderr=True, name="cliap2")
+ self._cli_proc = AsyncProcess(cli_args, stdin=True, stderr=True, name="cliap2")
await self._cli_proc.start()
- # read up to first num_lines lines of stderr to get the initial status
- num_lines: int = 50
- if self.prov.logger.level > logging.INFO:
- num_lines *= 10
- for _ in range(num_lines):
- line = (await self._cli_proc.read_stderr()).decode("utf-8", errors="ignore").strip()
- self.player.logger.debug(line)
- if f"airplay: Adding AirPlay device '{self.player.display_name}'" in line:
- self.player.logger.info("AirPlay device connected. Starting playback.")
- break
- if f"The AirPlay 2 device '{self.player.display_name}' failed" in line:
- raise PlayerCommandFailed("Cannot connect to AirPlay device")
# start reading the stderr of the cliap2 process from another task
self._cli_proc.attach_stderr_reader(self.mass.create_task(self._stderr_reader()))
if not self._cli_proc:
return
async for line in self._cli_proc.iter_stderr():
+ if self._stopped:
+ break
+ if "player: event_play_start()" in line:
+ # successfully connected
+ self._connected.set()
if "Pause at" in line:
- player.set_state_from_stream(state=PlaybackState.PAUSED)
- if "Restarted at" in line:
- player.set_state_from_stream(state=PlaybackState.PLAYING)
- if "Starting at" in line:
+ player.set_state_from_stream(state=PlaybackState.PAUSED, stream=self)
+ elif "Restarted at" in line:
+ player.set_state_from_stream(state=PlaybackState.PLAYING, stream=self)
+ elif "Starting at" in line:
# streaming has started
- player.set_state_from_stream(state=PlaybackState.PLAYING, elapsed_time=0)
+ player.set_state_from_stream(
+ state=PlaybackState.PLAYING, elapsed_time=0, stream=self
+ )
if "put delay detected" in line:
if "resetting all outputs" in line:
logger.error("High packet loss detected, restarting playback...")
logger.info(line)
elif "[ WARN]" in line:
logger.warning(line)
+ elif "[DEBUG]" in line and "mass_timer_cb" in line:
+ # mass_timer_cb is very spammy, reduce it to verbose
+ logger.log(VERBOSE_LOG_LEVEL, line)
elif "[DEBUG]" in line:
logger.debug(line)
elif "[ SPAM]" in line:
# ensure we're cleaned up afterwards (this also logs the returncode)
if not self._stopped:
self._stopped = True
- self.player.set_state_from_stream(state=PlaybackState.IDLE, elapsed_time=0)
+ self.player.set_state_from_stream(state=PlaybackState.IDLE, elapsed_time=0, stream=self)
from typing import TYPE_CHECKING, cast
from music_assistant_models.enums import PlaybackState
-from music_assistant_models.errors import PlayerCommandFailed
from music_assistant.constants import VERBOSE_LOG_LEVEL
from music_assistant.helpers.process import AsyncProcess
from music_assistant.providers.airplay.constants import (
AIRPLAY_OUTPUT_BUFFER_DURATION_MS,
CONF_ALAC_ENCODE,
- CONF_AP_CREDENTIALS,
CONF_ENCRYPTION,
CONF_PASSWORD,
+ CONF_RAOP_CREDENTIALS,
)
from music_assistant.providers.airplay.helpers import get_cli_binary
and we can send some interactive commands using a named pipe.
"""
- supports_pairing = True
-
async def start(self, start_ntp: int) -> None:
- """Initialize CLIRaop process for a player."""
+ """Start CLIRaop process."""
assert self.player.raop_discovery_info is not None # for type checker
cli_binary = await get_cli_binary(self.player.protocol)
extra_args: list[str] = []
player_id, CONF_PASSWORD
):
extra_args += ["-password", str(device_password)]
- # Add AirPlay credentials from pairing if available (for Apple devices)
- if ap_credentials := self.player.config.get_value(CONF_AP_CREDENTIALS):
- extra_args += ["-secret", str(ap_credentials)]
+ # Add RAOP credentials from pairing if available (for Apple devices)
+ if raop_credentials := self.player.config.get_value(CONF_RAOP_CREDENTIALS):
+ # Credentials format is "client_id:auth_secret", cliraop expects just auth_secret
+ creds_str = str(raop_credentials)
+ auth_secret = creds_str.split(":", 1)[1] if ":" in creds_str else creds_str
+ extra_args += ["-secret", auth_secret]
if self.prov.logger.isEnabledFor(logging.DEBUG):
extra_args += ["-debug", "5"]
elif self.prov.logger.isEnabledFor(VERBOSE_LOG_LEVEL):
extra_args += ["-debug", "10"]
- # cliraop is the binary that handles the actual raop streaming to the player
- # this is a slightly modified version of philippe44's libraop
- # https://github.com/music-assistant/libraop
- # we use this intermediate binary to do the actual streaming because attempts to do
- # so using pure python (e.g. pyatv) were not successful due to the realtime nature
-
cliraop_args = [
cli_binary,
"-ntpstart",
"-udn",
self.player.raop_discovery_info.name,
self.player.address,
- self.audio_pipe.path,
+ "-", # Use stdin for audio input
]
self.player.logger.debug(
"Starting cliraop process for player %s with args: %s",
)
self._cli_proc = AsyncProcess(cliraop_args, stdin=True, stderr=True, name="cliraop")
await self._cli_proc.start()
-
- # read up to first 50 lines of stderr to get the initial status
- for _ in range(50):
- line = (await self._cli_proc.read_stderr()).decode("utf-8", errors="ignore")
- self.player.logger.debug(line)
- if "connected to " in line:
- self.player.logger.info("AirPlay device connected. Starting playback.")
- break
- if "Cannot connect to AirPlay device" in line:
- raise PlayerCommandFailed("Cannot connect to AirPlay device")
-
- # start reading the stderr of the cliraop process from another task
+ # start reading the stderr of the cliap2 process from another task
self._cli_proc.attach_stderr_reader(self.mass.create_task(self._stderr_reader()))
- # repeat sending the volume level to the player because some players seem
- # to ignore it the first time
- # https://github.com/music-assistant/support/issues/3330
- self.mass.call_later(1, self.send_cli_command(f"VOLUME={self.player.volume_level}\n"))
-
- async def start_pairing(self) -> None:
- """Start pairing process for this protocol (if supported)."""
- assert self.player.raop_discovery_info is not None # for type checker
- cli_binary = await get_cli_binary(self.player.protocol)
-
- cliraop_args = [
- cli_binary,
- "-pair",
- "-if",
- self.mass.streams.bind_ip,
- "-port",
- str(self.player.raop_discovery_info.port),
- "-udn",
- self.player.raop_discovery_info.name,
- self.player.address,
- ]
- self.player.logger.debug(
- "Starting PAIRING with cliraop process for player %s with args: %s",
- self.player.player_id,
- cliraop_args,
- )
- self._cli_proc = AsyncProcess(cliraop_args, stdin=True, stderr=True, name="cliraop")
- await self._cli_proc.start()
- # read up to first 10 lines of stderr to get the initial status
- for _ in range(10):
- line = (await self._cli_proc.read_stderr()).decode("utf-8", errors="ignore")
- self.player.logger.debug(line)
- if "enter PIN code displayed on " in line:
- self.is_pairing = True
- return
- await self._cli_proc.close()
- raise PlayerCommandFailed("Pairing failed")
-
- async def finish_pairing(self, pin: str) -> str:
- """Finish pairing process with given PIN (if supported)."""
- if not self.is_pairing:
- await self.start_pairing()
- if not self._cli_proc or self._cli_proc.closed:
- raise PlayerCommandFailed("Pairing process not started")
-
- self.is_pairing = False
- _, _stderr = await self._cli_proc.communicate(input=f"{pin}\n".encode(), timeout=10)
- for line in _stderr.decode().splitlines():
- self.player.logger.debug(line)
- for error in ("device did not respond", "can't authentify", "pin failed"):
- if error in line.lower():
- raise PlayerCommandFailed(f"Pairing failed: {error}")
- if "secret is " in line:
- return line.split("secret is ")[1].strip()
- raise PlayerCommandFailed(f"Pairing failed: {_stderr.decode().strip()}")
async def _stderr_reader(self) -> None:
"""Monitor stderr for the running CLIRaop process."""
if not self._cli_proc:
return
async for line in self._cli_proc.iter_stderr():
+ if self._stopped:
+ break
+ if "connected to " in line:
+ self._connected.set()
+ # successfully connected - playback will/can start
if "set pause" in line or "Pause at" in line:
- player.set_state_from_stream(state=PlaybackState.PAUSED)
- if "Restarted at" in line or "restarting w/ pause" in line:
- player.set_state_from_stream(state=PlaybackState.PLAYING)
- if "restarting w/o pause" in line:
+ player.set_state_from_stream(state=PlaybackState.PAUSED, stream=self)
+ elif "Restarted at" in line or "restarting w/ pause" in line:
+ player.set_state_from_stream(state=PlaybackState.PLAYING, stream=self)
+ elif "restarting w/o pause" in line:
# streaming has started
- player.set_state_from_stream(state=PlaybackState.PLAYING, elapsed_time=0)
+ player.set_state_from_stream(
+ state=PlaybackState.PLAYING, elapsed_time=0, stream=self
+ )
if "lost packet out of backlog" in line:
lost_packets += 1
if lost_packets == 100:
logger.debug("CLIRaop stderr reader ended")
if not self._stopped:
self._stopped = True
- self.player.set_state_from_stream(state=PlaybackState.IDLE, elapsed_time=0)
+ self.player.set_state_from_stream(state=PlaybackState.IDLE, elapsed_time=0, stream=self)
import asyncio
import socket
-from random import randrange
from typing import cast
from music_assistant_models.enums import PlaybackState
"""Handle async initialization of the provider."""
# register DACP zeroconf service
dacp_port = await select_free_port(39831, 49831)
- self.dacp_id = dacp_id = f"{randrange(2**64):X}"
+ # Use first 16 hex chars of server_id as a persistent DACP ID
+ # This ensures the DACP ID remains the same across restarts, which is required
+ # for AirPlay 2 (HAP) pair-verify to work with previously paired devices
+ self.dacp_id = dacp_id = self.mass.server_id[:16].upper()
self.logger.debug("Starting DACP ActiveRemote %s on port %s", dacp_id, dacp_port)
self._dacp_server = await asyncio.start_server(
self._handle_dacp_request, "0.0.0.0", dacp_port
player.update_volume_from_device(volume)
elif "device-prevent-playback=1" in path:
# device switched to another source (or is powered off)
- if stream := player.stream:
+ # Ignore during stream transition (stale message from old CLI process)
+ if player._transitioning:
+ self.logger.debug("Ignoring prevent-playback during stream transition")
+ elif stream := player.stream:
stream.prevent_playback = True
if stream.session:
self.mass.create_task(stream.session.remove_client(player))
from contextlib import suppress
from typing import TYPE_CHECKING
+from music_assistant_models.errors import PlayerCommandFailed
+
from music_assistant.constants import CONF_SYNC_ADJUST
from music_assistant.helpers.audio import get_player_filter_params
from music_assistant.helpers.ffmpeg import FFMpeg
-from music_assistant.helpers.util import TaskManager
from music_assistant.providers.airplay.helpers import ntp_to_unix_time, unix_time_to_ntp
from .constants import (
AIRPLAY2_CONNECT_TIME_MS,
- AIRPLAY_OUTPUT_BUFFER_DURATION_MS,
- AIRPLAY_PRELOAD_SECONDS,
- AIRPLAY_PROCESS_SPAWN_TIME_MS,
CONF_ENABLE_LATE_JOIN,
ENABLE_LATE_JOIN_DEFAULT,
+ RAOP_CONNECT_TIME_MS,
StreamingProtocol,
)
from .protocols.airplay2 import AirPlay2Stream
) -> None:
"""Initialize AirPlayStreamSession.
- Args:
- airplay_provider: The AirPlay provider instance
- sync_clients: List of AirPlay players to stream to
- pcm_format: PCM format of the input stream
- audio_source: Async generator yielding audio chunks
+ :param airplay_provider: The AirPlay provider instance.
+ :param sync_clients: List of AirPlay players to stream to.
+ :param pcm_format: PCM format of the input stream.
"""
assert sync_clients
self.prov = airplay_provider
self._lock = asyncio.Lock()
self.start_ntp: int = 0
self.start_time: float = 0.0
- self.wait_start: float = 0.0 # in seconds
- self.seconds_streamed: float = 0 # Total seconds sent to session
- # because we reuse an existing stream session for new play_media requests,
- # we need to track when the last stream was started
- self.last_stream_started: float = 0.0
+ self.wait_start: float = 0.0
+ self.seconds_streamed: float = 0
self.total_pause_time: float = 0.0
self.last_paused: float | None = None
- self._clients_ready = asyncio.Event()
self._first_chunk_received = asyncio.Event()
async def start(self, audio_source: AsyncGenerator[bytes, None]) -> None:
"""Initialize stream session for all players."""
- self.prov.logger.debug(
- "Starting stream session with %d clients",
- len(self.sync_clients),
- )
- # Prepare all clients
- # this will create the stream objects, named pipes and start ffmpeg
- async with TaskManager(self.mass) as tm:
- for _airplay_player in self.sync_clients:
- tm.create_task(self._prepare_client(_airplay_player))
-
- # Start audio source streamer task
- # this will read from the audio source and distribute
- # to all player-specific ffmpeg processes
- # we start this task early because some streams (especially radio)
- # may need more time to buffer - this way we ensure we have audio ready
- # when the players should start playing
- self._audio_source_task = asyncio.create_task(self._audio_streamer(audio_source))
- # wait until the first chunk is received before starting clients
- await self._first_chunk_received.wait()
-
- # Start all clients
- # Get current NTP timestamp and calculate wait time
cur_time = time.time()
- # AirPlay2 clients need around 2500ms to establish connection and start playback
- # The also have a fixed 2000ms output buffer. We will not be able to respect the
- # ntpstart time unless we cater for all these time delays.
- # RAOP clients need less due to less RTSP exchanges and different packet buffer
- # handling
- # Plus we need to cater for process spawn and initialisation time
- wait_start = (
- AIRPLAY2_CONNECT_TIME_MS
- + AIRPLAY_OUTPUT_BUFFER_DURATION_MS
- + AIRPLAY_PROCESS_SPAWN_TIME_MS
- + (250 * len(self.sync_clients))
- ) # in milliseconds
+ has_airplay2_client = any(
+ p.protocol == StreamingProtocol.AIRPLAY2 for p in self.sync_clients
+ )
+ wait_start = AIRPLAY2_CONNECT_TIME_MS if has_airplay2_client else RAOP_CONNECT_TIME_MS
wait_start_seconds = wait_start / 1000
- self.wait_start = wait_start_seconds # in seconds
+ self.wait_start = wait_start_seconds
self.start_time = cur_time + wait_start_seconds
- self.last_stream_started = self.start_time
self.start_ntp = unix_time_to_ntp(self.start_time)
+ await asyncio.gather(*[self._start_client(p, self.start_ntp) for p in self.sync_clients])
+ self._audio_source_task = asyncio.create_task(self._audio_streamer(audio_source))
+ try:
+ await asyncio.gather(
+ *[p.stream.wait_for_connection() for p in self.sync_clients if p.stream]
+ )
+ except Exception:
+ # playback failed to start, cleanup
+ await self.stop()
+ raise PlayerCommandFailed("Playback failed to start")
- async with TaskManager(self.mass) as tm:
- for _airplay_player in self.sync_clients:
- tm.create_task(self._start_client(_airplay_player, self.start_ntp))
-
- # All clients started
- self._clients_ready.set()
-
- async def stop(self) -> None:
+ async def stop(self, force: bool = False) -> None:
"""Stop playback and cleanup."""
- await asyncio.gather(
- *[self.remove_client(x) for x in self.sync_clients],
- return_exceptions=True,
- )
if self._audio_source_task and not self._audio_source_task.done():
self._audio_source_task.cancel()
with suppress(asyncio.CancelledError):
await self._audio_source_task
+ if force:
+ await asyncio.gather(
+ *[self.stop_client(x, force=True) for x in self.sync_clients],
+ )
+ self.sync_clients = []
+ else:
+ await asyncio.gather(
+ *[self.remove_client(x) for x in self.sync_clients],
+ )
async def remove_client(self, airplay_player: AirPlayPlayer) -> None:
"""Remove a sync client from the session."""
if airplay_player not in self.sync_clients:
return
self.sync_clients.remove(airplay_player)
- if airplay_player.stream and airplay_player.stream.session == self:
- await airplay_player.stream.stop()
- if ffmpeg := self._player_ffmpeg.pop(airplay_player.player_id, None):
- await ffmpeg.close()
+ await self.stop_client(airplay_player)
# If this was the last client, stop the session
if not self.sync_clients:
await self.stop()
return
+ async def stop_client(self, airplay_player: AirPlayPlayer, force: bool = False) -> None:
+ """
+ Stop a client's stream and ffmpeg.
+
+ :param airplay_player: The player to stop.
+ :param force: If True, kill CLI process immediately.
+ """
+ ffmpeg = self._player_ffmpeg.pop(airplay_player.player_id, None)
+ if force:
+ if ffmpeg and not ffmpeg.closed:
+ await ffmpeg.kill()
+ if airplay_player.stream and airplay_player.stream.session == self:
+ await airplay_player.stream.stop(force=True)
+ else:
+ if ffmpeg and not ffmpeg.closed:
+ await ffmpeg.close()
+ if airplay_player.stream and airplay_player.stream.session == self:
+ await airplay_player.stream.stop()
+
async def add_client(self, airplay_player: AirPlayPlayer) -> None:
"""Add a sync client to the session as a late joiner.
CONF_ENABLE_LATE_JOIN, ENABLE_LATE_JOIN_DEFAULT
)
if not allow_late_join:
- # Late joining is not allowed - restart the session for all players
- await self.stop() # we need to stop the current session to add a new client
- # this could potentially be called by multiple players at the exact same time
- # so we debounce the resync a bit here with a timer
+ await self.stop()
if sync_leader.current_media:
self.mass.call_later(
0.5,
)
return
- # Prepare the new client for streaming
- await self._prepare_client(airplay_player)
-
- # Snapshot seconds_streamed inside lock to prevent race conditions
- # Keep lock held during stream.start() to ensure player doesn't miss any chunks
async with self._lock:
- # Calculate skip_seconds based on how many chunks have been sent
skip_seconds = self.seconds_streamed
- # Start the stream at compensated NTP timestamp
start_at = self.start_time + skip_seconds
start_ntp = unix_time_to_ntp(start_at)
- self.prov.logger.debug(
- "Adding late joiner %s to session, playback starts %.3fs from now",
- airplay_player.player_id,
- start_at - time.time(),
- )
- # Add player to sync clients list
if airplay_player not in self.sync_clients:
self.sync_clients.append(airplay_player)
await self._start_client(airplay_player, start_ntp)
-
- async def replace_stream(self, audio_source: AsyncGenerator[bytes, None]) -> None:
- """Replace the audio source of the stream."""
- self._first_chunk_received.clear()
- new_audio_source_task = asyncio.create_task(self._audio_streamer(audio_source))
- await self._first_chunk_received.wait()
- async with self._lock:
- # Cancel the current audio source task
- assert self._audio_source_task # for type checker
- old_audio_source_task = self._audio_source_task
- old_audio_source_task.cancel()
- self._audio_source_task = new_audio_source_task
- self.last_stream_started = time.time() + self.wait_start
- self.total_pause_time = 0.0
- self.last_paused = None
- for sync_client in self.sync_clients:
- sync_client.set_state_from_stream(state=None, elapsed_time=0)
- # ensure we cleanly wait for the old audio source task to finish
- with suppress(asyncio.CancelledError):
- await old_audio_source_task
+ if airplay_player.stream:
+ await airplay_player.stream.wait_for_connection()
async def _audio_streamer(self, audio_source: AsyncGenerator[bytes, None]) -> None:
"""Stream audio to all players."""
pcm_sample_size = self.pcm_format.pcm_sample_size
- stream_start_time = time.time()
- first_chunk_received = False
- # each chunk is exactly one second of audio data based on the pcm format.
- async for chunk in audio_source:
- if first_chunk_received is False:
- first_chunk_received = True
- self.prov.logger.debug(
- "First audio chunk received after %.3fs",
- time.time() - stream_start_time,
- )
- self._first_chunk_received.set()
- # Wait until all clients are ready
- await self._clients_ready.wait()
- # Send chunk to all players
- async with self._lock:
- sync_clients = [x for x in self.sync_clients if x.stream and x.stream.running]
- if not sync_clients:
- self.prov.logger.debug(
- "Audio streamer exiting: No running clients left in session"
- )
- return
-
- # Write chunk to all players
- write_tasks = [
- self._write_chunk_to_player(x, chunk) for x in sync_clients if x.stream
- ]
- results = await asyncio.gather(*write_tasks, return_exceptions=True)
-
- # Check for write errors or timeouts
- players_to_remove: list[AirPlayPlayer] = []
- for i, result in enumerate(results):
- if i >= len(sync_clients):
- continue
- player = sync_clients[i]
-
- if isinstance(result, asyncio.TimeoutError):
- self.prov.logger.warning(
- "Removing player %s from session: stopped reading data (write timeout)",
- player.player_id,
- )
- players_to_remove.append(player)
- elif isinstance(result, Exception):
- self.prov.logger.warning(
- "Removing player %s from session due to write error: %s",
- player.player_id,
- result,
- )
- players_to_remove.append(player)
-
- # Remove failed/timed-out players from sync group
- for player in players_to_remove:
- self.mass.create_task(self.remove_client(player))
-
- # Update chunk counter (each chunk is exactly one second of audio)
- chunk_seconds = len(chunk) / pcm_sample_size
- self.seconds_streamed += chunk_seconds
-
- # Entire stream consumed: send EOF
- self.prov.logger.debug("Audio source stream exhausted")
+ watchdog_task = asyncio.create_task(self._silence_watchdog(pcm_sample_size))
+ try:
+ async for chunk in audio_source:
+ if not self._first_chunk_received.is_set():
+ watchdog_task.cancel()
+ with suppress(asyncio.CancelledError):
+ await watchdog_task
+ self._first_chunk_received.set()
+
+ if not self.sync_clients:
+ break
+
+ await self._write_chunk_to_all_players(chunk)
+ self.seconds_streamed += len(chunk) / pcm_sample_size
+ finally:
+ if not watchdog_task.done():
+ watchdog_task.cancel()
+ with suppress(asyncio.CancelledError):
+ await watchdog_task
async with self._lock:
await asyncio.gather(
*[
return_exceptions=True,
)
- async def _write_chunk_to_player(self, airplay_player: AirPlayPlayer, chunk: bytes) -> None:
- """
- Write audio chunk to a specific player.
+ async def _silence_watchdog(self, pcm_sample_size: int) -> None:
+ """Insert silence if audio source is slow to deliver first chunk."""
+ grace_period = 0.2
+ max_silence_padding = 5.0
+ silence_inserted = 0.0
+
+ await asyncio.sleep(grace_period)
+ while not self._first_chunk_received.is_set() and silence_inserted < max_silence_padding:
+ silence_duration = 0.1
+ silence_bytes = int(pcm_sample_size * silence_duration)
+ silence_chunk = bytes(silence_bytes)
+ await self._write_chunk_to_all_players(silence_chunk)
+ self.seconds_streamed += silence_duration
+ silence_inserted += silence_duration
+ await asyncio.sleep(0.05)
+
+ if silence_inserted > 0:
+ self.prov.logger.warning(
+ "Inserted %.1fs silence padding while waiting for audio source",
+ silence_inserted,
+ )
- each chunk is (in general) one second of audio data based on the pcm format.
- For late joiners, compensates for chunks sent between join time and actual chunk delivery.
- Blocks (async) until the data has been written.
- """
+ async def _write_chunk_to_all_players(self, chunk: bytes) -> None:
+ """Write a chunk to all connected players."""
+ async with self._lock:
+ sync_clients = [x for x in self.sync_clients if x.stream and x.stream.running]
+ if not sync_clients:
+ return
+
+ # Write chunk to all players
+ write_tasks = [self._write_chunk_to_player(x, chunk) for x in sync_clients if x.stream]
+ results = await asyncio.gather(*write_tasks, return_exceptions=True)
+
+ # Check for write errors or timeouts
+ players_to_remove: list[AirPlayPlayer] = []
+ for i, result in enumerate(results):
+ if i >= len(sync_clients):
+ continue
+ player = sync_clients[i]
+
+ if isinstance(result, TimeoutError):
+ self.prov.logger.warning(
+ "Removing player %s from session: stopped reading data (write timeout)",
+ player.player_id,
+ )
+ players_to_remove.append(player)
+ elif isinstance(result, Exception):
+ self.prov.logger.warning(
+ "Removing player %s from session due to write error: %s",
+ player.player_id,
+ result,
+ )
+ players_to_remove.append(player)
+
+ for player in players_to_remove:
+ self.mass.create_task(self.remove_client(player))
+
+ async def _write_chunk_to_player(self, airplay_player: AirPlayPlayer, chunk: bytes) -> None:
+ """Write audio chunk to a player's ffmpeg process."""
player_id = airplay_player.player_id
- # we write the chunk to the player's ffmpeg process which
- # applies any player-specific filters (e.g. volume, dsp, etc)
- # and outputs in the correct format for the player stream
- # to the named pipe associated with the player's stream
if ffmpeg := self._player_ffmpeg.get(player_id):
if ffmpeg.closed:
return
- # Use a 35 second timeout - if the write takes longer, the player
- # has stopped reading data and we're in a deadlock situation
- # 35 seconds is a little bit above out pause timeout (30s) to allow for some margin
await asyncio.wait_for(ffmpeg.write(chunk), timeout=35.0)
async def _write_eof_to_player(self, airplay_player: AirPlayPlayer) -> None:
"""Write EOF to a specific player."""
- # cleanup any associated FFMpeg instance first
if ffmpeg := self._player_ffmpeg.pop(airplay_player.player_id, None):
await ffmpeg.write_eof()
await ffmpeg.wait_with_timeout(30)
- del ffmpeg
+ if airplay_player.stream and airplay_player.stream._cli_proc:
+ await airplay_player.stream._cli_proc.write_eof()
- async def _prepare_client(self, airplay_player: AirPlayPlayer) -> None:
- """Prepare stream for a single client."""
- # Stop existing stream if running
+ async def _start_client(self, airplay_player: AirPlayPlayer, start_ntp: int) -> None:
+ """Start CLI process and ffmpeg for a single client."""
if airplay_player.stream and airplay_player.stream.running:
await airplay_player.stream.stop()
- # Create appropriate stream type based on protocol
if airplay_player.protocol == StreamingProtocol.AIRPLAY2:
airplay_player.stream = AirPlay2Stream(airplay_player)
else:
airplay_player.stream = RaopStream(airplay_player)
- # Link stream session to player stream
airplay_player.stream.session = self
- # create the named pipes
- await airplay_player.stream.audio_pipe.create()
- await airplay_player.stream.commands_pipe.create()
- # start the (player-specific) ffmpeg process
- # note that ffmpeg will open the named pipe for writing
- await self._start_client_ffmpeg(airplay_player)
- await asyncio.sleep(0.05) # allow ffmpeg to open the pipe properly
-
- async def _start_client(self, airplay_player: AirPlayPlayer, start_ntp: int) -> None:
- """Start stream for a single client."""
sync_adjust = airplay_player.config.get_value(CONF_SYNC_ADJUST, 0)
assert isinstance(sync_adjust, int)
if sync_adjust != 0:
- # apply sync adjustment
- start_ntp += sync_adjust * 1000 # sync_adjust is in seconds, NTP in milliseconds
start_ntp = unix_time_to_ntp(ntp_to_unix_time(start_ntp) + (sync_adjust / 1000))
- # start the stream
- assert airplay_player.stream # for type checker
await airplay_player.stream.start(start_ntp)
-
- async def _start_client_ffmpeg(self, airplay_player: AirPlayPlayer) -> None:
- """Start or restart the player's ffmpeg stream."""
- # Clean up any existing FFmpeg instance for this player
+ # Start ffmpeg to feed audio to CLI stdin
if ffmpeg := self._player_ffmpeg.pop(airplay_player.player_id, None):
await ffmpeg.close()
- del ffmpeg
- assert airplay_player.stream # for type checker
- # Create the FFMpeg instance per player which accepts our PCM audio
- # applies any player-specific filters (e.g. volume, dsp, etc)
- # and outputs in the correct format for the player stream
- # to the named pipe associated with the player's stream
filter_params = get_player_filter_params(
self.mass,
airplay_player.player_id,
self.pcm_format,
airplay_player.stream.pcm_format,
)
+ cli_proc = airplay_player.stream._cli_proc
+ assert cli_proc
+ assert cli_proc.proc
+ assert cli_proc.proc.stdin
+ stdin_transport = cli_proc.proc.stdin.transport
+ audio_output: str | int = stdin_transport.get_extra_info("pipe").fileno()
ffmpeg = FFMpeg(
audio_input="-",
input_format=self.pcm_format,
output_format=airplay_player.stream.pcm_format,
filter_params=filter_params,
- audio_output=airplay_player.stream.audio_pipe.path,
- extra_input_args=[
- "-y",
- "-readrate",
- "1",
- "-readrate_initial_burst",
- f"{AIRPLAY_PRELOAD_SECONDS}",
- ],
+ audio_output=audio_output,
)
await ffmpeg.start()
self._player_ffmpeg[airplay_player.player_id] = ffmpeg
snapcast==2.3.7
soco==0.30.12
soundcloudpy==0.1.4
+srptools>=1.0.0
sxm==0.2.8
unidecode==1.4.0
uv>=0.8.0