From: Marcel van der Veldt Date: Sat, 24 Jan 2026 00:26:40 +0000 (+0100) Subject: Fixes for the AirPlay provider (#3014) X-Git-Url: https://git.kitaultman.com/?a=commitdiff_plain;h=ac0d9c255ef08b4b679e8b3ed5f9fa2a77ad1bfa;p=music-assistant-server.git Fixes for the AirPlay provider (#3014) --- diff --git a/music_assistant/controllers/config.py b/music_assistant/controllers/config.py index 7a06fe7a..9a07583c 100644 --- a/music_assistant/controllers/config.py +++ b/music_assistant/controllers/config.py @@ -1384,6 +1384,23 @@ class ConfigController: player_config["provider"] = prov.instance_id changed = True + # Migrate AirPlay legacy credentials (ap_credentials) to protocol-specific keys + # The old key was used for both RAOP and AirPlay, now we have separate keys + for player_id, player_config in self._data.get(CONF_PLAYERS, {}).items(): + if player_config.get("provider") != "airplay": + continue + if not (values := player_config.get("values")): + continue + if "ap_credentials" not in values: + continue + # Migrate to raop_credentials (RAOP is the default/fallback protocol) + # The new code will use the correct key based on the protocol + old_creds = values.pop("ap_credentials") + if old_creds and "raop_credentials" not in values: + values["raop_credentials"] = old_creds + LOGGER.info("Migrated AirPlay credentials for player %s", player_id) + changed = True + if changed: await self._async_save() diff --git a/music_assistant/helpers/named_pipe.py b/music_assistant/helpers/named_pipe.py index ac26ddb5..380660f5 100644 --- a/music_assistant/helpers/named_pipe.py +++ b/music_assistant/helpers/named_pipe.py @@ -3,22 +3,23 @@ from __future__ import annotations import asyncio +import errno as errno_module +import logging import os -import stat +import time from contextlib import suppress from pathlib import Path +_LOGGER = logging.getLogger("named_pipe") + class AsyncNamedPipeWriter: - """Simple async writer for named pipes using thread pool for blocking I/O.""" + """Async writer for named pipes.""" def __init__(self, pipe_path: str) -> None: - """Initialize named pipe writer. - - Args: - pipe_path: Path to the named pipe - """ + """Initialize named pipe writer.""" self._pipe_path = pipe_path + self._write_fd: int | None = None @property def path(self) -> str: @@ -26,39 +27,66 @@ class AsyncNamedPipeWriter: return self._pipe_path async def create(self) -> None: - """Create the named pipe (if it does not exist).""" + """Create the named pipe.""" def _create() -> None: - try: - os.mkfifo(self._pipe_path) - except FileExistsError: - # Check if existing file is actually a named pipe - file_stat = os.stat(self._pipe_path) - if not stat.S_ISFIFO(file_stat.st_mode): - # Not a FIFO - remove and recreate - Path(self._pipe_path).unlink() - os.mkfifo(self._pipe_path) + pipe_path = Path(self._pipe_path) + if pipe_path.exists(): + pipe_path.unlink() + os.mkfifo(self._pipe_path) await asyncio.to_thread(_create) + def _ensure_write_fd(self) -> bool: + """Ensure we have a write fd open. Returns True if successful.""" + if self._write_fd is not None: + return True + if not Path(self._pipe_path).exists(): + return False + # Retry opening until reader is available (up to 1s) + for _ in range(20): + try: + self._write_fd = os.open(self._pipe_path, os.O_WRONLY | os.O_NONBLOCK) + return True + except OSError as e: + if e.errno in (errno_module.ENXIO, errno_module.ENOENT): + time.sleep(0.05) + continue + raise + _LOGGER.warning("Could not open pipe %s: no reader after retries", self._pipe_path) + return False + async def write(self, data: bytes) -> None: - """Write data to the named pipe (blocking operation runs in thread).""" + """Write data to the named pipe.""" def _write() -> None: - with open(self._pipe_path, "wb") as pipe_file: - pipe_file.write(data) + if not self._ensure_write_fd(): + return + try: + assert self._write_fd is not None + os.write(self._write_fd, data) + except OSError as e: + if e.errno == errno_module.EPIPE: + # Reader closed, reset fd for next attempt + if self._write_fd is not None: + with suppress(Exception): + os.close(self._write_fd) + self._write_fd = None + else: + raise - # Run blocking write in thread pool await asyncio.to_thread(_write) async def remove(self) -> None: - """Remove the named pipe.""" - - def _remove() -> None: + """Close write fd and remove the pipe.""" + if self._write_fd is not None: with suppress(Exception): - Path(self._pipe_path).unlink() - - await asyncio.to_thread(_remove) + os.close(self._write_fd) + self._write_fd = None + pipe_path = Path(self._pipe_path) + if pipe_path.exists(): + with suppress(Exception): + pipe_path.unlink() def __str__(self) -> str: """Return string representation.""" diff --git a/music_assistant/helpers/process.py b/music_assistant/helpers/process.py index 0fa6183b..1da3541f 100644 --- a/music_assistant/helpers/process.py +++ b/music_assistant/helpers/process.py @@ -266,22 +266,26 @@ class AsyncProcess: await self._stdin_feeder_task # close stdin to signal we're done sending data - await asyncio.wait_for(self._stdin_lock.acquire(), 10) + with suppress(TimeoutError, asyncio.CancelledError): + await asyncio.wait_for(self._stdin_lock.acquire(), 5) if self.proc.stdin and not self.proc.stdin.is_closing(): self.proc.stdin.close() elif not self.proc.stdin and self.proc.returncode is None: self.proc.send_signal(SIGINT) # ensure we have no more readers active and stdout is drained - await asyncio.wait_for(self._stdout_lock.acquire(), 10) + with suppress(TimeoutError, asyncio.CancelledError): + await asyncio.wait_for(self._stdout_lock.acquire(), 5) if self.proc.stdout and not self.proc.stdout.at_eof(): with suppress(Exception): await self.proc.stdout.read(-1) # if we have a stderr task active, allow it to finish if self._stderr_reader_task: - await asyncio.wait_for(self._stderr_reader_task, 10) + with suppress(TimeoutError, asyncio.CancelledError): + await asyncio.wait_for(self._stderr_reader_task, 5) elif self.proc.stderr and not self.proc.stderr.at_eof(): - await asyncio.wait_for(self._stderr_lock.acquire(), 10) + with suppress(TimeoutError, asyncio.CancelledError): + await asyncio.wait_for(self._stderr_lock.acquire(), 5) # drain stderr with suppress(Exception): await self.proc.stderr.read(-1) @@ -289,18 +293,32 @@ class AsyncProcess: # make sure the process is really cleaned up. # especially with pipes this can cause deadlocks if not properly guarded # we need to ensure stdout and stderr are flushed and stdin closed + pid = self.proc.pid + terminate_attempts = 0 while self.returncode is None: try: # use communicate to flush all pipe buffers - await asyncio.wait_for(self.proc.communicate(), 5) + await asyncio.wait_for(self.proc.communicate(), 2) except TimeoutError: + terminate_attempts += 1 self.logger.debug( - "Process %s with PID %s did not stop in time. Sending terminate...", + "Process %s with PID %s did not stop in time (attempt %d). Sending SIGKILL...", self.name, - self.proc.pid, + pid, + terminate_attempts, ) - with suppress(ProcessLookupError): - self.proc.terminate() + # Use os.kill for more direct signal delivery + with suppress(ProcessLookupError, OSError): + os.kill(pid, 9) # SIGKILL = 9 + # Give up after 5 attempts - process may be zombie + if terminate_attempts >= 5: + self.logger.warning( + "Process %s (PID %s) did not terminate after %d SIGKILL attempts", + self.name, + pid, + terminate_attempts, + ) + break self.logger.log( VERBOSE_LOG_LEVEL, "Process %s with PID %s stopped with returncode %s", @@ -309,6 +327,70 @@ class AsyncProcess: self.returncode, ) + async def kill(self) -> None: + """ + Immediately kill the process with SIGKILL. + + Use this for forceful termination when the process doesn't respond to + normal termination signals. Unlike close(), this doesn't attempt graceful + shutdown - it immediately sends SIGKILL. + """ + self._close_called = True + if not self.proc or self.returncode is not None: + return + + pid = self.proc.pid + + # Cancel stdin feeder task if any + if self._stdin_feeder_task and not self._stdin_feeder_task.done(): + self._stdin_feeder_task.cancel() + with suppress(asyncio.CancelledError, Exception): + await self._stdin_feeder_task + + # Cancel stderr reader task if any + if self._stderr_reader_task and not self._stderr_reader_task.done(): + self._stderr_reader_task.cancel() + with suppress(asyncio.CancelledError, Exception): + await self._stderr_reader_task + + # Close all pipes first to prevent any I/O blocking + # This helps processes stuck on blocked I/O to receive signals + if self.proc.stdin and not self.proc.stdin.is_closing(): + self.proc.stdin.close() + if self.proc.stdout: + self.proc.stdout.feed_eof() + if self.proc.stderr: + self.proc.stderr.feed_eof() + + # Send SIGKILL immediately using os.kill for more direct signal delivery + self.logger.debug("Killing process %s with PID %s", self.name, pid) + with suppress(ProcessLookupError, OSError): + os.kill(pid, 9) # SIGKILL = 9 + + # Wait for process to actually terminate + try: + await asyncio.wait_for(self.proc.wait(), 2) + except TimeoutError: + # Try one more time with os.kill + with suppress(ProcessLookupError, OSError): + os.kill(pid, 9) + try: + await asyncio.wait_for(self.proc.wait(), 2) + except TimeoutError: + self.logger.warning( + "Process %s with PID %s did not terminate after SIGKILL - may be zombie", + self.name, + pid, + ) + + self.logger.log( + VERBOSE_LOG_LEVEL, + "Process %s with PID %s killed with returncode %s", + self.name, + pid, + self.returncode, + ) + async def wait(self) -> int: """Wait for the process and return the returncode.""" if self._returncode is None: diff --git a/music_assistant/models/player.py b/music_assistant/models/player.py index f9b334ac..8eefaccf 100644 --- a/music_assistant/models/player.py +++ b/music_assistant/models/player.py @@ -846,7 +846,9 @@ class Player(ABC): """ # if the player is grouped/synced, use the active source of the group/parent player if parent_player_id := (self.active_group or self.synced_to): - if parent_player := self.mass.players.get(parent_player_id): + if parent_player_id != self.player_id and ( + parent_player := self.mass.players.get(parent_player_id) + ): return parent_player.active_source for plugin_source in self.mass.players.get_plugin_sources(): if plugin_source.in_use_by == self.player_id: @@ -1331,7 +1333,9 @@ class Player(ABC): ) # if the player is grouped/synced, use the current_media of the group/parent player if parent_player_id := (self.active_group or self.synced_to): - if parent_player := self.mass.players.get(parent_player_id): + if parent_player_id != self.player_id and ( + parent_player := self.mass.players.get(parent_player_id) + ): return parent_player.current_media # if a pluginsource is currently active, return those details if ( diff --git a/music_assistant/providers/airplay/bin/cliap2-linux-aarch64 b/music_assistant/providers/airplay/bin/cliap2-linux-aarch64 index 0e085c54..b5f487e3 100755 Binary files a/music_assistant/providers/airplay/bin/cliap2-linux-aarch64 and b/music_assistant/providers/airplay/bin/cliap2-linux-aarch64 differ diff --git a/music_assistant/providers/airplay/bin/cliap2-linux-x86_64 b/music_assistant/providers/airplay/bin/cliap2-linux-x86_64 index 7d95c1a6..0a6e222d 100755 Binary files a/music_assistant/providers/airplay/bin/cliap2-linux-x86_64 and b/music_assistant/providers/airplay/bin/cliap2-linux-x86_64 differ diff --git a/music_assistant/providers/airplay/bin/cliap2-macos-arm64 b/music_assistant/providers/airplay/bin/cliap2-macos-arm64 index 570b796f..e2a952dd 100755 Binary files a/music_assistant/providers/airplay/bin/cliap2-macos-arm64 and b/music_assistant/providers/airplay/bin/cliap2-macos-arm64 differ diff --git a/music_assistant/providers/airplay/bin/cliraop-linux-aarch64 b/music_assistant/providers/airplay/bin/cliraop-linux-aarch64 index f8367fe3..2b7b9a4f 100755 Binary files a/music_assistant/providers/airplay/bin/cliraop-linux-aarch64 and b/music_assistant/providers/airplay/bin/cliraop-linux-aarch64 differ diff --git a/music_assistant/providers/airplay/bin/cliraop-linux-x86_64 b/music_assistant/providers/airplay/bin/cliraop-linux-x86_64 index 29c4c1b0..af3ad678 100755 Binary files a/music_assistant/providers/airplay/bin/cliraop-linux-x86_64 and b/music_assistant/providers/airplay/bin/cliraop-linux-x86_64 differ diff --git a/music_assistant/providers/airplay/bin/cliraop-macos-arm64 b/music_assistant/providers/airplay/bin/cliraop-macos-arm64 index 4fb8f387..0f74b1a1 100755 Binary files a/music_assistant/providers/airplay/bin/cliraop-macos-arm64 and b/music_assistant/providers/airplay/bin/cliraop-macos-arm64 differ diff --git a/music_assistant/providers/airplay/constants.py b/music_assistant/providers/airplay/constants.py index 08562028..a4d114d6 100644 --- a/music_assistant/providers/airplay/constants.py +++ b/music_assistant/providers/airplay/constants.py @@ -2,7 +2,7 @@ from __future__ import annotations -from enum import Enum +from enum import IntEnum from typing import Final from music_assistant_models.enums import ContentType @@ -13,7 +13,7 @@ from music_assistant.constants import INTERNAL_PCM_FORMAT DOMAIN = "airplay" -class StreamingProtocol(Enum): +class StreamingProtocol(IntEnum): """AirPlay streaming protocol versions.""" RAOP = 1 # AirPlay 1 (RAOP) @@ -34,25 +34,25 @@ AIRPLAY_DISCOVERY_TYPE: Final[str] = "_airplay._tcp.local." RAOP_DISCOVERY_TYPE: Final[str] = "_raop._tcp.local." DACP_DISCOVERY_TYPE: Final[str] = "_dacp._tcp.local." -AIRPLAY_PRELOAD_SECONDS: Final[int] = ( - 5 # Number of seconds (in PCM) to preload before throttling back -) -AIRPLAY_PROCESS_SPAWN_TIME_MS: Final[int] = ( - 200 # Time in ms to allow AirPlay CLI processes to spawn and initialise -) AIRPLAY_OUTPUT_BUFFER_DURATION_MS: Final[int] = ( 2000 # Read ahead buffer for cliraop. Output buffer duration for cliap2. ) AIRPLAY2_MIN_LOG_LEVEL: Final[int] = 3 # Min loglevel to ensure stderr output contains what we need AIRPLAY2_CONNECT_TIME_MS: Final[int] = 2500 # Time in ms to allow AirPlay2 device to connect +RAOP_CONNECT_TIME_MS: Final[int] = 1000 # Time in ms to allow RAOP device to connect + +# Per-protocol credential storage keys +CONF_RAOP_CREDENTIALS: Final[str] = "raop_credentials" +CONF_AIRPLAY_CREDENTIALS: Final[str] = "airplay_credentials" + +# Legacy credential key (for migration) CONF_AP_CREDENTIALS: Final[str] = "ap_credentials" -CONF_MRP_CREDENTIALS: Final[str] = "mrp_credentials" -CONF_ACTION_START_PAIRING: Final[str] = "start_ap_pairing" -CONF_ACTION_FINISH_PAIRING: Final[str] = "finish_ap_pairing" -CONF_ACTION_START_MRP_PAIRING: Final[str] = "start_mrp_pairing" -CONF_ACTION_FINISH_MRP_PAIRING: Final[str] = "finish_mrp_pairing" + +# Pairing action keys +CONF_ACTION_START_PAIRING: Final[str] = "start_pairing" +CONF_ACTION_FINISH_PAIRING: Final[str] = "finish_pairing" +CONF_ACTION_RESET_PAIRING: Final[str] = "reset_pairing" CONF_PAIRING_PIN: Final[str] = "pairing_pin" -CONF_MRP_PAIRING_PIN: Final[str] = "mrp_pairing_pin" CONF_ENABLE_LATE_JOIN: Final[str] = "enable_late_join" BACKOFF_TIME_LOWER_LIMIT: Final[int] = 15 # seconds @@ -80,12 +80,13 @@ BROKEN_AIRPLAY_MODELS = ( ("Sonos", "Move 2"), ("Sonos", "Roam 2"), ("Sonos", "Arc Ultra"), - # Samsung has been repeatedly being reported as having issues with AirPlay 1/raop + # Samsung has been repeatedly being reported as having issues with AirPlay (raop and AP2) ("Samsung", "*"), ) AIRPLAY_2_DEFAULT_MODELS = ( # Models that are known to work better with AirPlay 2 protocol instead of RAOP + # These use the translated/friendly model names from get_model_info() ("Ubiquiti Inc.", "*"), ("Juke Audio", "*"), ) diff --git a/music_assistant/providers/airplay/helpers.py b/music_assistant/providers/airplay/helpers.py index 68ec1da2..31b372f3 100644 --- a/music_assistant/providers/airplay/helpers.py +++ b/music_assistant/providers/airplay/helpers.py @@ -115,7 +115,7 @@ def get_primary_ip_address_from_zeroconf(discovery_info: AsyncServiceInfo) -> st def is_broken_airplay_model(manufacturer: str, model: str) -> bool: - """Check if a model is known to have broken RAOP support.""" + """Check if a model is known to have broken AirPlay support.""" for broken_manufacturer, broken_model in BROKEN_AIRPLAY_MODELS: if broken_manufacturer in (manufacturer, "*") and broken_model in (model, "*"): return True @@ -272,6 +272,32 @@ def unix_time_to_ntp(unix_timestamp: float) -> int: return (ntp_seconds << 32) | ntp_fraction +def player_id_to_mac_address(player_id: str) -> str: + """Convert a player_id to a MAC address-like string.""" + # the player_id is the mac address prefixed with "ap" + hex_str = player_id.replace("ap", "").upper() + return ":".join(hex_str[i : i + 2] for i in range(0, 12, 2)) + + +def generate_active_remote_id(mac_address: str) -> str: + """ + Generate an Active-Remote ID for DACP communication. + + The Active-Remote ID is used to match DACP callbacks from devices to the + correct stream. This function generates a consistent ID based on the + player_id (=macaddress, =device id), converted to uint32). + + :return: Active-Remote ID as decimal string. + """ + # Convert MAC address format to uint32 + # Remove colons: "AA:BB:CC:DD:EE:FF" -> "AABBCCDDEEFF" + hex_str = mac_address.replace(":", "").upper() + # Parse as uint64 and truncate to uint32 (lower 32 bits) + device_id_u64 = int(hex_str, 16) + device_id_u32 = device_id_u64 & 0xFFFFFFFF + return str(device_id_u32) + + def add_seconds_to_ntp(ntp_timestamp: int, seconds: float) -> int: """ Add seconds to an NTP timestamp. diff --git a/music_assistant/providers/airplay/manifest.json b/music_assistant/providers/airplay/manifest.json index 54686991..4129e84b 100644 --- a/music_assistant/providers/airplay/manifest.json +++ b/music_assistant/providers/airplay/manifest.json @@ -9,7 +9,7 @@ "[libraop (RAOP)](https://github.com/music-assistant/libraop)", "[OwnTone (AirPlay2)](https://github.com/OwnTone)" ], - "requirements": [], + "requirements": ["srptools>=1.0.0"], "documentation": "https://music-assistant.io/player-support/airplay/", "multi_instance": false, "builtin": false, diff --git a/music_assistant/providers/airplay/pairing.py b/music_assistant/providers/airplay/pairing.py new file mode 100644 index 00000000..de6112ce --- /dev/null +++ b/music_assistant/providers/airplay/pairing.py @@ -0,0 +1,782 @@ +"""Native pairing implementations for AirPlay devices. + +This module provides pairing support for: +- AirPlay 2 (HAP - HomeKit Accessory Protocol) - for Apple TV 4+, HomePod, Mac +- RAOP (AirPlay 1 legacy pairing) - for older devices + +Both implementations produce credentials compatible with cliap2/cliraop. +""" + +from __future__ import annotations + +import binascii +import hashlib +import logging +import os +import plistlib +import uuid + +import aiohttp +from cryptography.hazmat.primitives import hashes, serialization +from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey +from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes +from cryptography.hazmat.primitives.ciphers.aead import ChaCha20Poly1305 +from cryptography.hazmat.primitives.kdf.hkdf import HKDF +from music_assistant_models.errors import PlayerCommandFailed +from srptools import SRPClientSession, SRPContext + +from .constants import StreamingProtocol + +# ============================================================================ +# Common utilities +# ============================================================================ + + +def hkdf_derive( + input_key: bytes, + salt: bytes, + info: bytes, + length: int = 32, +) -> bytes: + """Derive key using HKDF-SHA512. + + :param input_key: Input keying material. + :param salt: Salt value. + :param info: Context info. + :param length: Output key length. + :return: Derived key bytes. + """ + hkdf = HKDF( + algorithm=hashes.SHA512(), + length=length, + salt=salt, + info=info, + ) + return hkdf.derive(input_key) + + +# ============================================================================ +# TLV encoding/decoding for HAP +# ============================================================================ + +# TLV types for HAP pairing +TLV_METHOD = 0x00 +TLV_IDENTIFIER = 0x01 +TLV_SALT = 0x02 +TLV_PUBLIC_KEY = 0x03 +TLV_PROOF = 0x04 +TLV_ENCRYPTED_DATA = 0x05 +TLV_STATE = 0x06 +TLV_ERROR = 0x07 +TLV_SIGNATURE = 0x0A + + +def tlv_encode(items: list[tuple[int, bytes]]) -> bytes: + """Encode items into TLV format. + + :param items: List of (type, value) tuples. + :return: TLV-encoded bytes. + """ + result = bytearray() + for tlv_type, value in items: + offset = 0 + while offset < len(value): + chunk = value[offset : offset + 255] + result.append(tlv_type) + result.append(len(chunk)) + result.extend(chunk) + offset += 255 + if len(value) == 0: + result.append(tlv_type) + result.append(0) + return bytes(result) + + +def tlv_decode(data: bytes) -> dict[int, bytes]: + """Decode TLV format into dictionary. + + :param data: TLV-encoded bytes. + :return: Dictionary mapping type to concatenated value. + """ + result: dict[int, bytearray] = {} + offset = 0 + while offset < len(data): + tlv_type = data[offset] + length = data[offset + 1] + value = data[offset + 2 : offset + 2 + length] + if tlv_type in result: + result[tlv_type].extend(value) + else: + result[tlv_type] = bytearray(value) + offset += 2 + length + return {k: bytes(v) for k, v in result.items()} + + +# ============================================================================ +# HAP Pairing constants (for AirPlay 2) +# ============================================================================ + +# SRP 3072-bit prime for HAP (hex string format for srptools) +HAP_SRP_PRIME_3072 = ( + "FFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD129024E088A67CC74" + "020BBEA63B139B22514A08798E3404DDEF9519B3CD3A431B302B0A6DF25F1437" + "4FE1356D6D51C245E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7ED" + "EE386BFB5A899FA5AE9F24117C4B1FE649286651ECE45B3DC2007CB8A163BF05" + "98DA48361C55D39A69163FA8FD24CF5F83655D23DCA3AD961C62F356208552BB" + "9ED529077096966D670C354E4ABC9804F1746C08CA18217C32905E462E36CE3B" + "E39E772C180E86039B2783A2EC07A28FB5C55DF06F4C52C9DE2BCBF695581718" + "3995497CEA956AE515D2261898FA051015728E5A8AAAC42DAD33170D04507A33" + "A85521ABDF1CBA64ECFB850458DBEF0A8AEA71575D060C7DB3970F85A6E1E4C7" + "ABF5AE8CDB0933D71E8C94E04A25619DCEE3D2261AD2EE6BF12FFA06D98A0864" + "D87602733EC86A64521F2B18177B200CBBE117577A615D6C770988C0BAD946E2" + "08E24FA074E5AB3143DB5BFCE0FD108E4B82D120A93AD2CAFFFFFFFFFFFFFFFF" +) +HAP_SRP_GENERATOR = "5" + + +# ============================================================================ +# RAOP Pairing constants (for AirPlay 1 legacy) +# ============================================================================ + +# SRP 2048-bit prime for RAOP (hex string format for srptools) +RAOP_SRP_PRIME_2048 = ( + "AC6BDB41324A9A9BF166DE5E1389582FAF72B6651987EE07FC319294" + "3DB56050A37329CBB4A099ED8193E0757767A13DD52312AB4B03310D" + "CD7F48A9DA04FD50E8083969EDB767B0CF6095179A163AB3661A05FB" + "D5FAAAE82918A9962F0B93B855F97993EC975EEAA80D740ADBF4FF74" + "7359D041D5C33EA71D281E446B14773BCA97B43A23FB801676BD207A" + "436C6481F1D2B9078717461A5B9D32E688F87748544523B524B0D57D" + "5EA77A2775D2ECFA032CFBDBF52FB3786160279004E57AE6AF874E73" + "03CE53299CCC041C7BC308D82A5698F3A8D0C38271AE35F8E9DBFBB6" + "94B5C803D89F7AE435DE236D525F54759B65E372FCD68EF20FA7111F" + "9E4AFF73" +) +RAOP_SRP_GENERATOR = "02" # RFC5054-2048bit uses generator 2 + + +# ============================================================================ +# Base Pairing class +# ============================================================================ + + +class AirPlayPairing: + """Base class for AirPlay pairing. + + Handles both HAP (AirPlay 2) and RAOP (AirPlay 1) pairing protocols. + """ + + def __init__( + self, + address: str, + name: str, + protocol: StreamingProtocol, + logger: logging.Logger, + port: int | None = None, + device_id: str | None = None, + ) -> None: + """Initialize AirPlay pairing. + + :param address: IP address of the device. + :param name: Display name of the device. + :param protocol: Streaming protocol (RAOP or AIRPLAY2). + :param logger: Logger instance. + :param port: Port number (default: 7000 for AirPlay 2, 5000 for RAOP). + :param device_id: Device identifier (DACP ID) - must match what cliap2 uses. + """ + self.address = address + self.name = name + self.protocol = protocol + self.logger = logger + self.port = port or (7000 if protocol == StreamingProtocol.AIRPLAY2 else 5000) + + # HTTP session + self._session: aiohttp.ClientSession | None = None + self._base_url: str = f"http://{address}:{self.port}" + + # Common state + self._is_pairing: bool = False + self._srp_context: SRPContext | None = None + self._srp_session: SRPClientSession | None = None + self._session_key: bytes | None = None + + # Client identifier (device_id) handling depends on protocol: + # - HAP (AirPlay 2): Uses DACP ID as string identifier (must match cliap2 pair-verify) + # - RAOP: Uses 8 random bytes (not the DACP ID) - credentials are self-contained + if protocol == StreamingProtocol.AIRPLAY2: + # For HAP, use DACP ID as the identifier (must match pair-verify) + if device_id: + self._client_id: bytes = device_id.encode() + else: + self._client_id = str(uuid.uuid4()).encode() + else: + # For RAOP, generate 8 random bytes for client_id + # The credentials format is client_id_hex:auth_secret_hex + self._client_id = os.urandom(8) + + # Ed25519 keypair + self._client_private_key: Ed25519PrivateKey | None = None + self._client_public_key: bytes | None = None + + # Server's public key + self._server_public_key: bytes | None = None + + @property + def is_pairing(self) -> bool: + """Return True if a pairing session is in progress.""" + return self._is_pairing + + @property + def device_provides_pin(self) -> bool: + """Return True if the device displays the PIN.""" + return True # Both HAP and RAOP display PIN on device + + @property + def protocol_name(self) -> str: + """Return human-readable protocol name.""" + if self.protocol == StreamingProtocol.RAOP: + return "RAOP (AirPlay 1)" + return "AirPlay" + + async def start_pairing(self) -> bool: + """Start the pairing process. + + :return: True if device provides PIN (always True for AirPlay). + :raises PlayerCommandFailed: If device connection fails. + """ + self.logger.info( + "Starting %s pairing with %s at %s:%d", + self.protocol_name, + self.name, + self.address, + self.port, + ) + + # Generate Ed25519 keypair + self._client_private_key = Ed25519PrivateKey.generate() + self._client_public_key = self._client_private_key.public_key().public_bytes( + encoding=serialization.Encoding.Raw, + format=serialization.PublicFormat.Raw, + ) + + # Create HTTP session + self._session = aiohttp.ClientSession() + + try: + # Request PIN to be shown on device + async with self._session.post( + f"{self._base_url}/pair-pin-start", + timeout=aiohttp.ClientTimeout(total=10), + ) as resp: + if resp.status != 200: + raise PlayerCommandFailed(f"Failed to start pairing: HTTP {resp.status}") + + self._is_pairing = True + self.logger.info("Device %s is displaying PIN", self.name) + + # SRP context will be created in finish_pairing when we have the PIN + return True + + except aiohttp.ClientError as err: + await self.close() + raise PlayerCommandFailed(f"Connection failed: {err}") from err + + async def finish_pairing(self, pin: str) -> str: + """Complete pairing with the provided PIN. + + :param pin: 4-digit PIN from device screen. + :return: Credentials string for cliap2/cliraop. + :raises PlayerCommandFailed: If pairing fails. + """ + if not self._session: + raise PlayerCommandFailed("Pairing not started") + + try: + if self.protocol == StreamingProtocol.AIRPLAY2: + return await self._finish_hap_pairing(pin) + return await self._finish_raop_pairing(pin) + except PlayerCommandFailed: + raise + except Exception as err: + self.logger.exception("Pairing failed") + raise PlayerCommandFailed(f"Pairing failed: {err}") from err + finally: + await self.close() + + # ======================================================================== + # HAP (AirPlay 2) pairing implementation + # ======================================================================== + + async def _finish_hap_pairing(self, pin: str) -> str: + """Complete HAP pairing for AirPlay 2. + + :param pin: 4-digit PIN. + :return: Credentials (192 hex chars). + """ + if not self._session: + raise PlayerCommandFailed("Pairing not started") + + self.logger.info("Completing HAP pairing with PIN") + + # HAP headers required for pair-setup + hap_headers = { + "Content-Type": "application/octet-stream", + "X-Apple-HKP": "3", + } + + # M1: Send method request (state=1, method=0 for pair-setup) + m1_data = tlv_encode( + [ + (TLV_METHOD, bytes([0x00])), + (TLV_STATE, bytes([0x01])), + ] + ) + + async with self._session.post( + f"{self._base_url}/pair-setup", + data=m1_data, + headers=hap_headers, + timeout=aiohttp.ClientTimeout(total=30), + ) as resp: + if resp.status != 200: + raise PlayerCommandFailed(f"M1 failed: HTTP {resp.status}") + m2_data = await resp.read() + + # Parse M2 + m2 = tlv_decode(m2_data) + if TLV_ERROR in m2: + raise PlayerCommandFailed(f"Device error in M2: {m2[TLV_ERROR].hex()}") + + salt = m2.get(TLV_SALT) + server_pk_srp = m2.get(TLV_PUBLIC_KEY) + if not salt or not server_pk_srp: + raise PlayerCommandFailed("Invalid M2: missing salt or public key") + + # M3: SRP authentication - create context with password + # PIN is passed directly as string (not "Pair-Setup:PIN") + # Note: pyatv doesn't specify bits_random, uses default + self._srp_context = SRPContext( + username="Pair-Setup", + password=pin, + prime=HAP_SRP_PRIME_3072, + generator=HAP_SRP_GENERATOR, + hash_func=hashlib.sha512, + ) + # Pass Ed25519 private key bytes as the SRP "a" value (random private exponent) + # This is what pyatv does - use the client's Ed25519 private key as the SRP private value + if not self._client_private_key: + raise PlayerCommandFailed("Client private key not initialized") + auth_private = self._client_private_key.private_bytes( + encoding=serialization.Encoding.Raw, + format=serialization.PrivateFormat.Raw, + encryption_algorithm=serialization.NoEncryption(), + ) + self._srp_session = SRPClientSession( + self._srp_context, binascii.hexlify(auth_private).decode() + ) + + # Process with server's public key and salt (as hex strings) + self._srp_session.process(server_pk_srp.hex(), salt.hex()) + + # Get client's public key and proof + client_pk_srp = bytes.fromhex(self._srp_session.public) + client_proof = bytes.fromhex(self._srp_session.key_proof.decode("ascii")) + + m3_data = tlv_encode( + [ + (TLV_STATE, bytes([0x03])), + (TLV_PUBLIC_KEY, client_pk_srp), + (TLV_PROOF, client_proof), + ] + ) + + async with self._session.post( + f"{self._base_url}/pair-setup", + data=m3_data, + headers=hap_headers, + timeout=aiohttp.ClientTimeout(total=30), + ) as resp: + if resp.status != 200: + raise PlayerCommandFailed(f"M3 failed: HTTP {resp.status}") + m4_data = await resp.read() + + # Parse M4 + m4 = tlv_decode(m4_data) + if TLV_ERROR in m4: + raise PlayerCommandFailed(f"Device error in M4: {m4[TLV_ERROR].hex()}") + + server_proof = m4.get(TLV_PROOF) + if not server_proof: + raise PlayerCommandFailed("Invalid M4: missing proof") + + # Verify server proof + if not self._srp_session.verify_proof(server_proof.hex().encode("ascii")): + raise PlayerCommandFailed("Server proof verification failed") + + # Get session key + self._session_key = bytes.fromhex(self._srp_session.key.decode("ascii")) + + # M5: Send encrypted client info + await self._send_hap_m5() + + # Generate credentials + return self._generate_hap_credentials() + + async def _send_hap_m5(self) -> None: + """Send M5 with encrypted client info and receive M6.""" + if ( + not self._session_key + or not self._client_private_key + or not self._client_public_key + or not self._session + ): + raise PlayerCommandFailed("Invalid state for M5") + + # HAP headers required for pair-setup + hap_headers = { + "Content-Type": "application/octet-stream", + "X-Apple-HKP": "3", + } + + # Derive keys + enc_key = hkdf_derive( + self._session_key, + b"Pair-Setup-Encrypt-Salt", + b"Pair-Setup-Encrypt-Info", + 32, + ) + sign_key = hkdf_derive( + self._session_key, + b"Pair-Setup-Controller-Sign-Salt", + b"Pair-Setup-Controller-Sign-Info", + 32, + ) + + # Sign device info + device_info = sign_key + self._client_id + self._client_public_key + signature = self._client_private_key.sign(device_info) + + # Create and encrypt inner TLV + inner_tlv = tlv_encode( + [ + (TLV_IDENTIFIER, self._client_id), + (TLV_PUBLIC_KEY, self._client_public_key), + (TLV_SIGNATURE, signature), + ] + ) + + cipher = ChaCha20Poly1305(enc_key) + # Nonce format: 4 zero bytes + 8-byte message identifier = 12 bytes + nonce = b"\x00\x00\x00\x00PS-Msg05" + encrypted = cipher.encrypt(nonce, inner_tlv, None) + + # Send M5 + m5_data = tlv_encode( + [ + (TLV_STATE, bytes([0x05])), + (TLV_ENCRYPTED_DATA, encrypted), + ] + ) + + async with self._session.post( + f"{self._base_url}/pair-setup", + data=m5_data, + headers=hap_headers, + timeout=aiohttp.ClientTimeout(total=30), + ) as resp: + if resp.status != 200: + raise PlayerCommandFailed(f"M5 failed: HTTP {resp.status}") + m6_data = await resp.read() + + # Parse M6 + m6 = tlv_decode(m6_data) + if TLV_ERROR in m6: + raise PlayerCommandFailed(f"Device error in M6: {m6[TLV_ERROR].hex()}") + + encrypted_data = m6.get(TLV_ENCRYPTED_DATA) + if not encrypted_data: + raise PlayerCommandFailed("Invalid M6: missing encrypted data") + + # Decrypt M6 + # Nonce format: 4 zero bytes + 8-byte message identifier = 12 bytes + nonce = b"\x00\x00\x00\x00PS-Msg06" + decrypted = cipher.decrypt(nonce, encrypted_data, None) + + # Extract server's public key + inner = tlv_decode(decrypted) + self._server_public_key = inner.get(TLV_PUBLIC_KEY) + if not self._server_public_key: + raise PlayerCommandFailed("Invalid M6: missing server public key") + + def _generate_hap_credentials(self) -> str: + """Generate HAP credentials for cliap2. + + Format: client_private_key(128 hex) + server_public_key(64 hex) = 192 hex chars + + :return: Credentials string. + """ + if ( + not self._client_private_key + or not self._server_public_key + or not self._client_public_key + ): + raise PlayerCommandFailed("Missing keys for credential generation") + + # Get raw private key (32 bytes seed) + private_key_bytes = self._client_private_key.private_bytes( + encoding=serialization.Encoding.Raw, + format=serialization.PrivateFormat.Raw, + encryption_algorithm=serialization.NoEncryption(), + ) + + # Expand to 64-byte Ed25519 secret key format (seed + public_key) + if len(private_key_bytes) == 32: + private_key_bytes = private_key_bytes + self._client_public_key + + if len(private_key_bytes) != 64 or len(self._server_public_key) != 32: + raise PlayerCommandFailed("Invalid key lengths") + + return binascii.hexlify(private_key_bytes).decode("ascii") + binascii.hexlify( + self._server_public_key + ).decode("ascii") + + # ======================================================================== + # RAOP (AirPlay 1 legacy) pairing implementation + # ======================================================================== + + def _compute_raop_premaster_secret( + self, + user_id: str, + password: str, + salt: bytes, + client_private: bytes, + client_public: bytes, + server_public: bytes, + ) -> bytes: + """Compute RAOP SRP premaster secret S. + + S = (B - k*v)^(a + u*x) mod N + + :param user_id: Username (hex-encoded client_id). + :param password: PIN code. + :param salt: Salt from server. + :param client_private: Client private key (a) as bytes. + :param client_public: Client public key (A) as bytes. + :param server_public: Server public key (B) as bytes. + :return: Premaster secret S as bytes (padded to N length). + """ + # Convert values to integers + n_bytes = bytes.fromhex(RAOP_SRP_PRIME_2048) + n_len = len(n_bytes) + n = int.from_bytes(n_bytes, "big") + g = int.from_bytes(bytes.fromhex(RAOP_SRP_GENERATOR), "big") + + a = int.from_bytes(client_private, "big") + b_pub = int.from_bytes(server_public, "big") + + # x = H(s | H(I : P)) + inner_hash = hashlib.sha1(f"{user_id}:{password}".encode()).digest() + x = int.from_bytes(hashlib.sha1(salt + inner_hash).digest(), "big") + + # k = H(N | PAD(g)) + g_padded = bytes.fromhex(RAOP_SRP_GENERATOR).rjust(n_len, b"\x00") + k = int.from_bytes(hashlib.sha1(n_bytes + g_padded).digest(), "big") + + # u = H(PAD(A) | PAD(B)) + a_padded = client_public.rjust(n_len, b"\x00") + b_padded = server_public.rjust(n_len, b"\x00") + u = int.from_bytes(hashlib.sha1(a_padded + b_padded).digest(), "big") + + # v = g^x mod N + v = pow(g, x, n) + + # S = (B - k*v)^(a + u*x) mod N + s_int = pow(b_pub - k * v, a + u * x, n) + + # Convert to bytes and pad to N length + s_bytes = s_int.to_bytes((s_int.bit_length() + 7) // 8, "big") + return s_bytes.rjust(n_len, b"\x00") + + def _compute_raop_session_key(self, premaster_secret: bytes) -> bytes: + r"""Compute RAOP session key K from premaster secret S. + + K = SHA1(S | \x00\x00\x00\x00) | SHA1(S | \x00\x00\x00\x01) + + This produces a 40-byte key (two SHA1 hashes concatenated). + + :param premaster_secret: The SRP premaster secret S. + :return: 40-byte session key K. + """ + k1 = hashlib.sha1(premaster_secret + b"\x00\x00\x00\x00").digest() + k2 = hashlib.sha1(premaster_secret + b"\x00\x00\x00\x01").digest() + return k1 + k2 + + def _compute_raop_m1( + self, user_id: str, salt: bytes, client_pk: bytes, server_pk: bytes, session_key: bytes + ) -> bytes: + """Compute RAOP SRP M1 proof with padding for A and B (but not g). + + M1 = H(H(N) XOR H(g) | H(I) | s | PAD(A) | PAD(B) | K) + + Note: g is NOT padded, but A and B ARE padded to N length. + K is 40 bytes (from _compute_raop_session_key). + + :param user_id: Username (hex-encoded client_id). + :param salt: Salt bytes from server. + :param client_pk: Client public key (A). + :param server_pk: Server public key (B). + :param session_key: Session key (K) - 40 bytes. + :return: M1 proof bytes (20 bytes for SHA-1). + """ + n_bytes = bytes.fromhex(RAOP_SRP_PRIME_2048) + n_len = len(n_bytes) + g_bytes = bytes.fromhex(RAOP_SRP_GENERATOR) + + # H(N) XOR H(g) - g is NOT padded + h_n = hashlib.sha1(n_bytes).digest() + h_g = hashlib.sha1(g_bytes).digest() + h_n_xor_h_g = bytes(a ^ b for a, b in zip(h_n, h_g, strict=True)) + + # H(I) - hash of username + h_i = hashlib.sha1(user_id.encode("ascii")).digest() + + # PAD A and B to N length + a_padded = client_pk.rjust(n_len, b"\x00") + b_padded = server_pk.rjust(n_len, b"\x00") + + # M1 = H(H(N) XOR H(g) | H(I) | s | PAD(A) | PAD(B) | K) + m1_data = h_n_xor_h_g + h_i + salt + a_padded + b_padded + session_key + return hashlib.sha1(m1_data).digest() + + def _compute_raop_client_public(self, auth_secret: bytes) -> bytes: + """Compute RAOP SRP client public key A = g^a mod N. + + :param auth_secret: 32-byte random secret (used as SRP private key a). + :return: Client public key A as bytes. + """ + n_bytes = bytes.fromhex(RAOP_SRP_PRIME_2048) + n = int.from_bytes(n_bytes, "big") + g = int.from_bytes(bytes.fromhex(RAOP_SRP_GENERATOR), "big") + a = int.from_bytes(auth_secret, "big") + a_pub = pow(g, a, n) + return a_pub.to_bytes((a_pub.bit_length() + 7) // 8, "big") + + async def _finish_raop_pairing(self, pin: str) -> str: + """Complete RAOP pairing for AirPlay 1. + + :param pin: 4-digit PIN. + :return: Credentials (client_id:auth_secret format). + """ + if not self._session: + raise PlayerCommandFailed("Pairing not started") + + self.logger.info("Completing RAOP pairing with PIN") + + # Generate 32-byte auth secret + auth_secret = os.urandom(32) + + # Derive Ed25519 public key from auth secret + # For RAOP, we use the auth_secret as the Ed25519 seed + from cryptography.hazmat.primitives.asymmetric.ed25519 import ( # noqa: PLC0415 + Ed25519PrivateKey as Ed25519Key, + ) + + auth_private_key = Ed25519Key.from_private_bytes(auth_secret) + auth_public_key = auth_private_key.public_key().public_bytes( + encoding=serialization.Encoding.Raw, + format=serialization.PublicFormat.Raw, + ) + + # Step 1: Send device ID and method + user_id = self._client_id.hex().upper() + step1_plist = { + "method": "pin", + "user": user_id, + } + + async with self._session.post( + f"{self._base_url}/pair-setup-pin", + data=plistlib.dumps(step1_plist, fmt=plistlib.FMT_BINARY), + headers={"Content-Type": "application/x-apple-binary-plist"}, + timeout=aiohttp.ClientTimeout(total=30), + ) as resp: + if resp.status != 200: + raise PlayerCommandFailed(f"RAOP step 1 failed: HTTP {resp.status}") + step1_response = plistlib.loads(await resp.read()) + + # Get salt and server public key + salt, server_pk = step1_response.get("salt"), step1_response.get("pk") + if not salt or not server_pk: + raise PlayerCommandFailed("Invalid RAOP step 1 response") + + # Step 2: SRP authentication + # Apple uses a custom K formula: K = SHA1(S|0000) | SHA1(S|0001) (40 bytes) + client_pk = self._compute_raop_client_public(auth_secret) + premaster_secret = self._compute_raop_premaster_secret( + user_id, pin, salt, auth_secret, client_pk, server_pk + ) + session_key = self._compute_raop_session_key(premaster_secret) + client_proof = self._compute_raop_m1(user_id, salt, client_pk, server_pk, session_key) + + step2_plist = { + "pk": client_pk, + "proof": client_proof, + } + + async with self._session.post( + f"{self._base_url}/pair-setup-pin", + data=plistlib.dumps(step2_plist, fmt=plistlib.FMT_BINARY), + headers={"Content-Type": "application/x-apple-binary-plist"}, + timeout=aiohttp.ClientTimeout(total=30), + ) as resp: + if resp.status != 200: + raise PlayerCommandFailed(f"RAOP step 2 failed: HTTP {resp.status}") + step2_response = plistlib.loads(await resp.read()) + + # Verify server proof M2 exists (verification optional) + server_proof = step2_response.get("proof") + if not server_proof: + raise PlayerCommandFailed("RAOP server did not return proof") + self._session_key = session_key + + # Step 3: Encrypt and send auth public key using AES-GCM + # Derive AES key and IV from session key K (40 bytes) + aes_key = hashlib.sha512(b"Pair-Setup-AES-Key" + session_key).digest()[:16] + aes_iv = bytearray(hashlib.sha512(b"Pair-Setup-AES-IV" + session_key).digest()[:16]) + aes_iv[-1] = (aes_iv[-1] + 1) % 256 # Increment last byte + + # Encrypt auth public key with AES-GCM + cipher = Cipher(algorithms.AES(aes_key), modes.GCM(bytes(aes_iv))) + encryptor = cipher.encryptor() + encrypted_pk = encryptor.update(auth_public_key) + encryptor.finalize() + tag = encryptor.tag + + step3_plist = { + "epk": encrypted_pk, + "authTag": tag, + } + + async with self._session.post( + f"{self._base_url}/pair-setup-pin", + data=plistlib.dumps(step3_plist, fmt=plistlib.FMT_BINARY), + headers={"Content-Type": "application/x-apple-binary-plist"}, + timeout=aiohttp.ClientTimeout(total=30), + ) as resp: + if resp.status != 200: + raise PlayerCommandFailed(f"RAOP step 3 failed: HTTP {resp.status}") + + # Return credentials in cliraop format: client_id:auth_secret + return f"{self._client_id.hex()}:{auth_secret.hex()}" + + # ======================================================================== + # Cleanup + # ======================================================================== + + async def close(self) -> None: + """Clean up resources.""" + self._is_pairing = False + if self._session: + await self._session.close() + self._session = None + self._srp_context = None + self._srp_session = None + self._session_key = None diff --git a/music_assistant/providers/airplay/player.py b/music_assistant/providers/airplay/player.py index 28ee5a2f..a2beb401 100644 --- a/music_assistant/providers/airplay/player.py +++ b/music_assistant/providers/airplay/player.py @@ -8,7 +8,6 @@ from typing import TYPE_CHECKING, cast from music_assistant_models.config_entries import ConfigEntry, ConfigValueOption, ConfigValueType from music_assistant_models.enums import ConfigEntryType, PlaybackState, PlayerFeature, PlayerType -from propcache import under_cached_property as cached_property from music_assistant.constants import ( CONF_ENTRY_DEPRECATED_EQ_BASS, @@ -26,14 +25,16 @@ from .constants import ( AIRPLAY_FLOW_PCM_FORMAT, CACHE_CATEGORY_PREV_VOLUME, CONF_ACTION_FINISH_PAIRING, + CONF_ACTION_RESET_PAIRING, CONF_ACTION_START_PAIRING, + CONF_AIRPLAY_CREDENTIALS, CONF_AIRPLAY_PROTOCOL, CONF_ALAC_ENCODE, - CONF_AP_CREDENTIALS, CONF_ENCRYPTION, CONF_IGNORE_VOLUME, CONF_PAIRING_PIN, CONF_PASSWORD, + CONF_RAOP_CREDENTIALS, FALLBACK_VOLUME, RAOP_DISCOVERY_TYPE, StreamingProtocol, @@ -42,12 +43,15 @@ from .helpers import ( get_primary_ip_address_from_zeroconf, is_airplay2_preferred_model, is_broken_airplay_model, + player_id_to_mac_address, ) from .stream_session import AirPlayStreamSession if TYPE_CHECKING: from zeroconf.asyncio import AsyncServiceInfo + from .pairing import AirPlayPairing + from .protocols._protocol import AirPlayProtocol from .protocols.airplay2 import AirPlay2Stream from .protocols.raop import RaopStream from .provider import AirPlayProvider @@ -89,6 +93,8 @@ class AirPlayPlayer(Player): self.stream: RaopStream | AirPlay2Stream | None = None self.last_command_sent = 0.0 self._lock = asyncio.Lock() + self._active_pairing: AirPlayPairing | None = None + self._transitioning = False # Set during stream replacement to ignore stale DACP messages # Set (static) player attributes self._attr_type = PlayerType.PLAYER self._attr_name = display_name @@ -97,6 +103,7 @@ class AirPlayPlayer(Player): model=model, manufacturer=manufacturer, ip_address=address, + mac_address=player_id_to_mac_address(player_id), ) self._attr_supported_features = { PlayerFeature.PAUSE, @@ -108,22 +115,19 @@ class AirPlayPlayer(Player): self._attr_can_group_with = {provider.instance_id} self._attr_enabled_by_default = not is_broken_airplay_model(manufacturer, model) - @cached_property + @property def protocol(self) -> StreamingProtocol: - """Get the streaming protocol to use for this player.""" - protocol_value = self.config.get_value(CONF_AIRPLAY_PROTOCOL) - # Convert integer value to StreamingProtocol enum - if protocol_value == 2: - return StreamingProtocol.AIRPLAY2 - return StreamingProtocol.RAOP + """Get the streaming protocol to use/prefer for this player.""" + preferred_option = cast("int", self.config.get_value(CONF_AIRPLAY_PROTOCOL)) + return self._get_protocol_for_config_value(preferred_option) @property def available(self) -> bool: """Return if the player is currently available.""" if self._requires_pairing(): - # check if we have credentials stored - credentials = self.config.get_value(CONF_AP_CREDENTIALS) - if not credentials: + # check if we have credentials stored for the current protocol + creds_key = self._get_credentials_key(self.protocol) + if not self.config.get_value(creds_key): return False return super().available @@ -133,7 +137,7 @@ class AirPlayPlayer(Player): if not self.stream or not self.stream.session: return super().corrected_elapsed_time or 0.0 session = self.stream.session - elapsed = time.time() - session.last_stream_started - session.total_pause_time + elapsed = time.time() - session.start_time - session.total_pause_time if session.last_paused is not None: current_pause = time.time() - session.last_paused elapsed -= current_pause @@ -168,21 +172,20 @@ class AirPlayPlayer(Player): key=CONF_AIRPLAY_PROTOCOL, type=ConfigEntryType.INTEGER, required=False, - label="AirPlay version to use for streaming", + label="AirPlay protocol version to use for streaming", description="AirPlay version 1 protocol uses RAOP.\n" "AirPlay version 2 is an extension of RAOP.\n" "Some newer devices do not fully support RAOP and " - "will only work with AirPlay version 2.", + "will only work with AirPlay version 2, " + "while older devices may only support RAOP.\n\n" + "In most cases the default automatic selection will work fine.", category="airplay", options=[ - ConfigValueOption("AirPlay 1 (RAOP)", StreamingProtocol.RAOP.value), - ConfigValueOption("AirPlay 2", StreamingProtocol.AIRPLAY2.value), + ConfigValueOption("Automatically select", 0), + ConfigValueOption("Prefer AirPlay 1 (RAOP)", StreamingProtocol.RAOP.value), + ConfigValueOption("Prefer AirPlay 2", StreamingProtocol.AIRPLAY2.value), ], - default_value=StreamingProtocol.AIRPLAY2.value - if is_airplay2_preferred_model( - self.device_info.manufacturer, self.device_info.model - ) - else StreamingProtocol.RAOP.value, + default_value=0, ), ConfigEntry( key=CONF_ENCRYPTION, @@ -194,6 +197,7 @@ class AirPlayPlayer(Player): category="airplay", depends_on=CONF_AIRPLAY_PROTOCOL, depends_on_value=StreamingProtocol.RAOP.value, + hidden=self.protocol != StreamingProtocol.RAOP, ), ConfigEntry( key=CONF_ALAC_ENCODE, @@ -205,6 +209,7 @@ class AirPlayPlayer(Player): category="airplay", depends_on=CONF_AIRPLAY_PROTOCOL, depends_on_value=StreamingProtocol.RAOP.value, + hidden=self.protocol != StreamingProtocol.RAOP, ), CONF_ENTRY_SYNC_ADJUST, ConfigEntry( @@ -233,8 +238,10 @@ class AirPlayPlayer(Player): "Enable this option to ignore these reports." ), category="airplay", + # TODO: remove depends_on when DACP support is added for AirPlay2 depends_on=CONF_AIRPLAY_PROTOCOL, depends_on_value=StreamingProtocol.RAOP.value, + hidden=self.protocol != StreamingProtocol.RAOP, ), ] @@ -255,49 +262,54 @@ class AirPlayPlayer(Player): # Mac devices (including iMac, MacBook, Mac mini, Mac Pro, Mac Studio) return model.startswith(("Mac", "iMac")) + def _get_credentials_key(self, protocol: StreamingProtocol) -> str: + """Get the config key for credentials for given protocol.""" + if protocol == StreamingProtocol.RAOP: + return CONF_RAOP_CREDENTIALS + return CONF_AIRPLAY_CREDENTIALS + + def _get_protocol_for_config_value(self, config_option: int) -> StreamingProtocol: + if config_option == StreamingProtocol.AIRPLAY2 and self.airplay_discovery_info: + return StreamingProtocol.AIRPLAY2 + if config_option == StreamingProtocol.RAOP and self.raop_discovery_info: + return StreamingProtocol.RAOP + # automatic selection + if self.airplay_discovery_info and is_airplay2_preferred_model( + self.device_info.manufacturer, self.device_info.model + ): + return StreamingProtocol.AIRPLAY2 + return StreamingProtocol.RAOP + def _get_pairing_config_entries( self, values: dict[str, ConfigValueType] | None ) -> list[ConfigEntry]: - """Return pairing config entries for Apple TV and macOS devices. + """ + Return pairing config entries for Apple TV and macOS devices. - Uses cliraop for AirPlay/RAOP pairing. + Uses native pairing for both AirPlay 2 (HAP) and RAOP protocols. """ entries: list[ConfigEntry] = [] - # Check if we have credentials stored - if values and (creds := values.get(CONF_AP_CREDENTIALS)): - credentials = str(creds) + # Determine protocol name for UI + conf_protocol: int = 0 + if values and (val := values.get(CONF_AIRPLAY_PROTOCOL)): + conf_protocol = cast("int", val) else: - credentials = str(self.config.get_value(CONF_AP_CREDENTIALS) or "") - has_credentials = bool(credentials) - - if not has_credentials: - # Show pairing instructions and start button - if not self.stream and self.protocol == StreamingProtocol.RAOP: - # ensure we have a stream instance to track pairing state - from .protocols.raop import RaopStream # noqa: PLC0415 - - self.stream = RaopStream(self) - elif not self.stream and self.protocol == StreamingProtocol.AIRPLAY2: - # ensure we have a stream instance to track pairing state - from .protocols.airplay2 import AirPlay2Stream # noqa: PLC0415 - - self.stream = AirPlay2Stream(self) - if self.stream and not self.stream.supports_pairing: - # TEMP until ap2 pairing is implemented - return [ - ConfigEntry( - key="pairing_unsupported", - type=ConfigEntryType.ALERT, - label=( - "This device requires pairing but it is not supported " - "by the current Music Assistant AirPlay implementation." - ), - ) - ] + conf_protocol = cast("int", self.config.get_value(CONF_AIRPLAY_PROTOCOL, 0) or 0) + protocol = self._get_protocol_for_config_value(conf_protocol) + protocol_name = "RAOP" if protocol == StreamingProtocol.RAOP else "AirPlay" + protocol_key = ( + CONF_RAOP_CREDENTIALS + if protocol == StreamingProtocol.RAOP + else CONF_AIRPLAY_CREDENTIALS + ) + has_creds_for_current_protocol = ( + values.get(protocol_key) if values else self.config.get_value(protocol_key) + ) + if not has_creds_for_current_protocol: # If pairing was started, show PIN entry - if self.stream and self.stream.is_pairing: + if self._active_pairing and self._active_pairing.is_pairing: entries.append( ConfigEntry( key=CONF_PAIRING_PIN, @@ -310,17 +322,18 @@ class AirPlayPlayer(Player): ConfigEntry( key=CONF_ACTION_FINISH_PAIRING, type=ConfigEntryType.ACTION, - label="Complete the pairing process with the PIN", + label=f"Complete {protocol_name} pairing with the PIN", action=CONF_ACTION_FINISH_PAIRING, ) ) else: + # Show pairing instructions and start button entries.append( ConfigEntry( key="pairing_instructions", type=ConfigEntryType.LABEL, label=( - "This device requires pairing before it can be used. " + f"This device requires {protocol_name} pairing before it can be used. " "Click the button below to start the pairing process." ), ) @@ -329,7 +342,7 @@ class AirPlayPlayer(Player): ConfigEntry( key=CONF_ACTION_START_PAIRING, type=ConfigEntryType.ACTION, - label="Start the AirPlay pairing process", + label=f"Start {protocol_name} pairing", action=CONF_ACTION_START_PAIRING, ) ) @@ -339,50 +352,86 @@ class AirPlayPlayer(Player): ConfigEntry( key="pairing_status", type=ConfigEntryType.LABEL, - label="Device is paired and ready to use.", + label=f"Device is paired ({protocol_name}) and ready to use.", + ) + ) + # Add reset pairing button + entries.append( + ConfigEntry( + key=CONF_ACTION_RESET_PAIRING, + type=ConfigEntryType.ACTION, + label=f"Reset {protocol_name} pairing", + action=CONF_ACTION_RESET_PAIRING, ) ) # Store credentials (hidden from UI) - entries.append( - ConfigEntry( - key=CONF_AP_CREDENTIALS, - type=ConfigEntryType.SECURE_STRING, - label="AirPlay Credentials", - default_value=credentials, - value=credentials, - required=False, - hidden=True, + for protocol in (StreamingProtocol.RAOP, StreamingProtocol.AIRPLAY2): + conf_key = self._get_credentials_key(protocol) + entries.append( + ConfigEntry( + key=conf_key, + type=ConfigEntryType.SECURE_STRING, + label=conf_key, + default_value=None, + value=values.get(conf_key) if values else None, + required=False, + hidden=True, + ) ) - ) - return entries async def _handle_pairing_action( self, action: str, values: dict[str, ConfigValueType] | None ) -> None: - """Handle pairing actions using the configured protocol.""" - if not self.stream and self.protocol == StreamingProtocol.RAOP: - # ensure we have a stream instance to track pairing state - from .protocols.raop import RaopStream # noqa: PLC0415 + """ + Handle pairing actions. - self.stream = RaopStream(self) - elif not self.stream and self.protocol == StreamingProtocol.AIRPLAY2: - # ensure we have a stream instance to track pairing state - from .protocols.airplay2 import AirPlay2Stream # noqa: PLC0415 + Uses native pairing for both AirPlay 2 (HAP) and RAOP protocols. + Both produce credentials compatible with cliap2/cliraop respectively. + """ + conf_protocol: int = 0 + if values and (val := values.get(CONF_AIRPLAY_PROTOCOL)): + conf_protocol = cast("int", val) + else: + conf_protocol = cast("int", self.config.get_value(CONF_AIRPLAY_PROTOCOL, 0) or 0) + protocol = self._get_protocol_for_config_value(conf_protocol) + protocol_name = "RAOP" if protocol == StreamingProtocol.RAOP else "AirPlay" - self.stream = AirPlay2Stream(self) if action == CONF_ACTION_START_PAIRING: - if self.stream and self.stream.is_pairing: + if self._active_pairing and self._active_pairing.is_pairing: self.logger.warning("Pairing process already in progress for %s", self.display_name) return - self.logger.info("Started AirPlay pairing for %s", self.display_name) - if self.stream: - await self.stream.start_pairing() + + self.logger.info("Starting %s pairing for %s", protocol_name, self.display_name) + + from .pairing import AirPlayPairing # noqa: PLC0415 + + # Determine port based on protocol + # Note: For Apple devices, pairing always happens on the AirPlay port (7000) + # even when streaming will use RAOP. The RAOP port (5000) is only for streaming. + port: int | None = None + if self.airplay_discovery_info: + port = self.airplay_discovery_info.port or 7000 + elif self.raop_discovery_info: + # Fallback for devices without AirPlay service + port = self.raop_discovery_info.port or 5000 + # Get the DACP ID from the provider - must match what cliap2 uses + provider = cast("AirPlayProvider", self.provider) + device_id = provider.dacp_id + + self._active_pairing = AirPlayPairing( + address=self.address, + name=self.display_name, + protocol=protocol, + logger=self.logger, + port=port, + device_id=device_id, + ) + await self._active_pairing.start_pairing() elif action == CONF_ACTION_FINISH_PAIRING: if not values: - # guard return pin = values.get(CONF_PAIRING_PIN) @@ -390,17 +439,24 @@ class AirPlayPlayer(Player): self.logger.warning("No PIN provided for pairing") return - if self.stream: - credentials = await self.stream.finish_pairing(pin=str(pin)) - else: + if not self._active_pairing: + self.logger.warning("No active pairing session for %s", self.display_name) return - values[CONF_AP_CREDENTIALS] = credentials + credentials = await self._active_pairing.finish_pairing(pin=str(pin)) + self._active_pairing = None - self.logger.info( - "Finished AirPlay pairing for %s", - self.display_name, - ) + # Store credentials with the protocol-specific key + cred_key = self._get_credentials_key(protocol) + values[cred_key] = credentials + + self.logger.info("Finished %s pairing for %s", protocol_name, self.display_name) + + elif action == CONF_ACTION_RESET_PAIRING: + cred_key = self._get_credentials_key(protocol) + self.logger.info("Resetting %s pairing for %s", protocol_name, self.display_name) + if values is not None: + values[cred_key] = None async def stop(self) -> None: """Send STOP command to player.""" @@ -414,7 +470,7 @@ class AirPlayPlayer(Player): """Send PLAY (unpause) command to player.""" async with self._lock: if self.stream and self.stream.running: - await self.stream.send_cli_command("ACTION=PLAY\n") + await self.stream.send_cli_command("ACTION=PLAY") async def pause(self) -> None: """Send PAUSE command to player.""" @@ -427,7 +483,7 @@ class AirPlayPlayer(Player): async with self._lock: if not self.stream or not self.stream.running: return - await self.stream.send_cli_command("ACTION=PAUSE\n") + await self.stream.send_cli_command("ACTION=PAUSE") async def play_media(self, media: PlayerMedia) -> None: """Handle PLAY MEDIA on given player.""" @@ -436,29 +492,28 @@ class AirPlayPlayer(Player): raise RuntimeError("Player is synced") self._attr_current_media = media + # Always stop any existing stream + if self.stream and self.stream.running and self.stream.session: + # Set transitioning flag to ignore stale DACP messages (like prevent-playback) + self._transitioning = True + # Force stop the session (to speed up stopping) + await self.stream.session.stop(force=True) + self.stream = None + # select audio source audio_source = self.mass.streams.get_stream(media, AIRPLAY_FLOW_PCM_FORMAT) - # if an existing stream session is running, we could replace it with the new stream - if self.stream and self.stream.running: - # check if we need to replace the stream - if self.stream.prevent_playback: - # player is in prevent playback mode, we need to stop the stream - await self.stop() - elif self.stream.session: - await self.stream.session.replace_stream(audio_source) - return - # setup StreamSession for player (and its sync childs if any) sync_clients = self._get_sync_clients() provider = cast("AirPlayProvider", self.provider) stream_session = AirPlayStreamSession(provider, sync_clients, AIRPLAY_FLOW_PCM_FORMAT) await stream_session.start(audio_source) + self._transitioning = False async def volume_set(self, volume_level: int) -> None: """Send VOLUME_SET command to given player.""" if self.stream and self.stream.running: - await self.stream.send_cli_command(f"VOLUME={volume_level}\n") + await self.stream.send_cli_command(f"VOLUME={volume_level}") self._attr_volume_level = volume_level self.update_state() # store last state in cache @@ -593,9 +648,21 @@ class AirPlayPlayer(Player): self.update_state() def set_state_from_stream( - self, state: PlaybackState | None = None, elapsed_time: float | None = None + self, + state: PlaybackState | None = None, + elapsed_time: float | None = None, + stream: AirPlayProtocol | None = None, ) -> None: - """Set the playback state from stream (RAOP or AirPlay2).""" + """Set the playback state from stream (RAOP or AirPlay2). + + :param state: New playback state (or None to keep current). + :param elapsed_time: New elapsed time (or None to keep current). + :param stream: The stream instance sending this update (for validation). + """ + # Ignore state updates from old/stale streams + if stream is not None and stream != self.stream: + return + if state is not None: prev_state = self._attr_playback_state self._attr_playback_state = state @@ -620,6 +687,9 @@ class AirPlayPlayer(Player): if self.stream.running and self.stream.session: self.mass.create_task(self.stream.session.stop()) self.stream = None + if self._active_pairing: + await self._active_pairing.close() + self._active_pairing = None def _get_sync_clients(self) -> list[AirPlayPlayer]: """Get all sync clients for a player.""" diff --git a/music_assistant/providers/airplay/protocols/_protocol.py b/music_assistant/providers/airplay/protocols/_protocol.py index 5f3afefd..fe7dced6 100644 --- a/music_assistant/providers/airplay/protocols/_protocol.py +++ b/music_assistant/providers/airplay/protocols/_protocol.py @@ -2,16 +2,16 @@ from __future__ import annotations +import asyncio import time from abc import ABC, abstractmethod -from random import randint from typing import TYPE_CHECKING from music_assistant_models.enums import PlaybackState -from music_assistant.constants import VERBOSE_LOG_LEVEL from music_assistant.helpers.named_pipe import AsyncNamedPipeWriter from music_assistant.providers.airplay.constants import AIRPLAY_PCM_FORMAT +from music_assistant.providers.airplay.helpers import generate_active_remote_id if TYPE_CHECKING: from music_assistant_models.player import PlayerMedia @@ -33,8 +33,6 @@ class AirPlayProtocol(ABC): # the pcm audio format used for streaming to this protocol pcm_format = AIRPLAY_PCM_FORMAT - supports_pairing = False # whether this protocol supports pairing - is_pairing: bool = False # whether this protocol instance is in pairing mode def __init__( self, @@ -49,20 +47,17 @@ class AirPlayProtocol(ABC): self.mass = player.provider.mass self.player = player self.logger = player.provider.logger.getChild(f"protocol.{self.__class__.__name__}") - # Generate unique ID to prevent race conditions with named pipes - self.active_remote_id: str = str(randint(1000, 8000)) + mac_address = self.player.device_info.mac_address or self.player.player_id + self.active_remote_id: str = generate_active_remote_id(mac_address) self.prevent_playback: bool = False self._cli_proc: AsyncProcess | None = None - self.audio_pipe = AsyncNamedPipeWriter( - f"/tmp/{self.player.protocol.value}-{self.player.player_id}-{self.active_remote_id}-audio", # noqa: S108 - ) self.commands_pipe = AsyncNamedPipeWriter( f"/tmp/{self.player.protocol.value}-{self.player.player_id}-{self.active_remote_id}-cmd", # noqa: S108 ) - # State tracking self._stopped = False self._total_bytes_sent = 0 self._stream_bytes_sent = 0 + self._connected = asyncio.Event() @property def running(self) -> bool: @@ -71,35 +66,40 @@ class AirPlayProtocol(ABC): @abstractmethod async def start(self, start_ntp: int) -> None: - """Initialize streaming process for the player. + """Start the CLI process. - Args: - start_ntp: NTP timestamp to start streaming + :param start_ntp: NTP timestamp to start streaming. """ - async def stop(self) -> None: - """Stop playback and cleanup.""" - # Send stop command before setting _stopped flag - await self.send_cli_command("ACTION=STOP") - self._stopped = True - - # Close the CLI process (wait for it to terminate) - if self._cli_proc and not self._cli_proc.closed: - await self._cli_proc.close() + async def wait_for_connection(self) -> None: + """Wait for device connection to be established.""" + if not self._cli_proc: + return + await asyncio.wait_for(self._connected.wait(), timeout=10) + # repeat sending the volume level to the player because some players seem + # to ignore it the first time + # https://github.com/music-assistant/support/issues/3330 + self.mass.call_later(2, self.send_cli_command(f"VOLUME={self.player.volume_level}")) - self.player.set_state_from_stream(state=PlaybackState.IDLE, elapsed_time=0) + async def stop(self, force: bool = False) -> None: + """ + Stop playback and cleanup. - # Cleanup named pipes - await self.audio_pipe.remove() + :param force: If True, immediately kill the process without graceful shutdown. + """ + # always send stop command first + await self.send_cli_command("ACTION=STOP") + if self._cli_proc: + await self._cli_proc.write_eof() + self._stopped = True await self.commands_pipe.remove() - - async def start_pairing(self) -> None: - """Start pairing process for this protocol (if supported).""" - raise NotImplementedError("Pairing not implemented for this protocol") - - async def finish_pairing(self, pin: str) -> str: - """Finish pairing process with given PIN (if supported).""" - raise NotImplementedError("Pairing not implemented for this protocol") + if force: + if self._cli_proc and not self._cli_proc.closed: + await self._cli_proc.kill() + elif self._cli_proc and not self._cli_proc.closed: + await self._cli_proc.close() + if not force: + self.player.set_state_from_stream(state=PlaybackState.IDLE, elapsed_time=0) async def send_cli_command(self, command: str) -> None: """Send an interactive command to the running CLI binary.""" @@ -107,9 +107,9 @@ class AirPlayProtocol(ABC): return if not self.commands_pipe: return - - self.player.logger.log(VERBOSE_LOG_LEVEL, "sending command %s", command) self.player.last_command_sent = time.time() + if not command.endswith("\n"): + command += "\n" await self.commands_pipe.write(command.encode("utf-8")) async def send_metadata(self, progress: int | None, metadata: PlayerMedia | None) -> None: @@ -126,6 +126,6 @@ class AirPlayProtocol(ABC): await self.send_cli_command(cmd) # get image if metadata.image_url: - await self.send_cli_command(f"ARTWORK={metadata.image_url}\n") + await self.send_cli_command(f"ARTWORK={metadata.image_url}") if progress is not None: - await self.send_cli_command(f"PROGRESS={progress}\n") + await self.send_cli_command(f"PROGRESS={progress}") diff --git a/music_assistant/providers/airplay/protocols/airplay2.py b/music_assistant/providers/airplay/protocols/airplay2.py index 19dd6395..ab6896e3 100644 --- a/music_assistant/providers/airplay/protocols/airplay2.py +++ b/music_assistant/providers/airplay/protocols/airplay2.py @@ -4,28 +4,30 @@ from __future__ import annotations import asyncio import logging +from typing import TYPE_CHECKING, cast from music_assistant_models.enums import PlaybackState -from music_assistant_models.errors import PlayerCommandFailed from music_assistant.constants import CONF_SYNC_ADJUST, VERBOSE_LOG_LEVEL from music_assistant.helpers.process import AsyncProcess from music_assistant.providers.airplay.constants import ( AIRPLAY2_MIN_LOG_LEVEL, + CONF_AIRPLAY_CREDENTIALS, ) from music_assistant.providers.airplay.helpers import get_cli_binary from ._protocol import AirPlayProtocol +if TYPE_CHECKING: + from music_assistant.providers.airplay.provider import AirPlayProvider + class AirPlay2Stream(AirPlayProtocol): """ AirPlay 2 Audio Streamer. - Python is not suitable for realtime audio streaming so we do the actual streaming - of audio using a small executable written in C based on owntones to do - the actual timestamped playback. It reads pcm audio from a named pipe - and we can send some interactive commands using another named pipe. + Uses cliap2 (C executable based on owntone) for timestamped playback. + Audio is fed via stdin, commands via a named pipe. """ @property @@ -51,8 +53,8 @@ class AirPlay2Stream(AirPlayProtocol): mass_level = 5 return max(mass_level, AIRPLAY2_MIN_LOG_LEVEL) - async def start(self, start_ntp: int, skip: int = 0) -> None: - """Initialize CLI process for a player.""" + async def start(self, start_ntp: int) -> None: + """Start cliap2 process.""" cli_binary = await get_cli_binary(self.player.protocol) assert self.player.airplay_discovery_info is not None @@ -64,13 +66,18 @@ class AirPlay2Stream(AirPlayProtocol): for key, value in self.player.airplay_discovery_info.decoded_properties.items(): txt_kv += f'"{key}={value}" ' - # Note: skip parameter is accepted for API compatibility with base class - # but is not currently used by the cliap2 binary (AirPlay2 handles late joiners differently) - # cliap2 is the binary that handles the actual streaming to the player # this binary leverages from the AirPlay2 support in owntones # https://github.com/music-assistant/cliairplay + # Get AirPlay credentials if available (for Apple devices that require pairing) + airplay_credentials: str | None = None + if creds := self.player.config.get_value(CONF_AIRPLAY_CREDENTIALS): + airplay_credentials = str(creds) + + # Get the provider's DACP ID for remote control callbacks + prov = cast("AirPlayProvider", self.prov) + cli_args = [ cli_binary, "--name", @@ -89,31 +96,34 @@ class AirPlay2Stream(AirPlayProtocol): str(self.player.volume_level), "--loglevel", str(self._cli_loglevel), + "--dacp_id", + prov.dacp_id, + "--active_remote", + self.active_remote_id, "--pipe", - self.audio_pipe.path, + "-", # Use stdin for audio input "--command_pipe", self.commands_pipe.path, ] + # Add credentials for authenticated AirPlay devices (Apple TV, HomePod, etc.) + # Native HAP pairing format: 192 hex chars = client_private_key(128) + server_public_key(64) + if airplay_credentials: + if len(airplay_credentials) == 192: + cli_args += ["--auth", airplay_credentials] + else: + self.player.logger.warning( + "Invalid credentials length: %d (expected 192)", + len(airplay_credentials), + ) + self.player.logger.debug( "Starting cliap2 process for player %s with args: %s", player_id, cli_args, ) - self._cli_proc = AsyncProcess(cli_args, stdin=False, stderr=True, name="cliap2") + self._cli_proc = AsyncProcess(cli_args, stdin=True, stderr=True, name="cliap2") await self._cli_proc.start() - # read up to first num_lines lines of stderr to get the initial status - num_lines: int = 50 - if self.prov.logger.level > logging.INFO: - num_lines *= 10 - for _ in range(num_lines): - line = (await self._cli_proc.read_stderr()).decode("utf-8", errors="ignore").strip() - self.player.logger.debug(line) - if f"airplay: Adding AirPlay device '{self.player.display_name}'" in line: - self.player.logger.info("AirPlay device connected. Starting playback.") - break - if f"The AirPlay 2 device '{self.player.display_name}' failed" in line: - raise PlayerCommandFailed("Cannot connect to AirPlay device") # start reading the stderr of the cliap2 process from another task self._cli_proc.attach_stderr_reader(self.mass.create_task(self._stderr_reader())) @@ -124,13 +134,20 @@ class AirPlay2Stream(AirPlayProtocol): if not self._cli_proc: return async for line in self._cli_proc.iter_stderr(): + if self._stopped: + break + if "player: event_play_start()" in line: + # successfully connected + self._connected.set() if "Pause at" in line: - player.set_state_from_stream(state=PlaybackState.PAUSED) - if "Restarted at" in line: - player.set_state_from_stream(state=PlaybackState.PLAYING) - if "Starting at" in line: + player.set_state_from_stream(state=PlaybackState.PAUSED, stream=self) + elif "Restarted at" in line: + player.set_state_from_stream(state=PlaybackState.PLAYING, stream=self) + elif "Starting at" in line: # streaming has started - player.set_state_from_stream(state=PlaybackState.PLAYING, elapsed_time=0) + player.set_state_from_stream( + state=PlaybackState.PLAYING, elapsed_time=0, stream=self + ) if "put delay detected" in line: if "resetting all outputs" in line: logger.error("High packet loss detected, restarting playback...") @@ -150,6 +167,9 @@ class AirPlay2Stream(AirPlayProtocol): logger.info(line) elif "[ WARN]" in line: logger.warning(line) + elif "[DEBUG]" in line and "mass_timer_cb" in line: + # mass_timer_cb is very spammy, reduce it to verbose + logger.log(VERBOSE_LOG_LEVEL, line) elif "[DEBUG]" in line: logger.debug(line) elif "[ SPAM]" in line: @@ -161,4 +181,4 @@ class AirPlay2Stream(AirPlayProtocol): # ensure we're cleaned up afterwards (this also logs the returncode) if not self._stopped: self._stopped = True - self.player.set_state_from_stream(state=PlaybackState.IDLE, elapsed_time=0) + self.player.set_state_from_stream(state=PlaybackState.IDLE, elapsed_time=0, stream=self) diff --git a/music_assistant/providers/airplay/protocols/raop.py b/music_assistant/providers/airplay/protocols/raop.py index f6413fb1..ca95fddb 100644 --- a/music_assistant/providers/airplay/protocols/raop.py +++ b/music_assistant/providers/airplay/protocols/raop.py @@ -7,16 +7,15 @@ import logging from typing import TYPE_CHECKING, cast from music_assistant_models.enums import PlaybackState -from music_assistant_models.errors import PlayerCommandFailed from music_assistant.constants import VERBOSE_LOG_LEVEL from music_assistant.helpers.process import AsyncProcess from music_assistant.providers.airplay.constants import ( AIRPLAY_OUTPUT_BUFFER_DURATION_MS, CONF_ALAC_ENCODE, - CONF_AP_CREDENTIALS, CONF_ENCRYPTION, CONF_PASSWORD, + CONF_RAOP_CREDENTIALS, ) from music_assistant.providers.airplay.helpers import get_cli_binary @@ -36,10 +35,8 @@ class RaopStream(AirPlayProtocol): and we can send some interactive commands using a named pipe. """ - supports_pairing = True - async def start(self, start_ntp: int) -> None: - """Initialize CLIRaop process for a player.""" + """Start CLIRaop process.""" assert self.player.raop_discovery_info is not None # for type checker cli_binary = await get_cli_binary(self.player.protocol) extra_args: list[str] = [] @@ -56,20 +53,17 @@ class RaopStream(AirPlayProtocol): player_id, CONF_PASSWORD ): extra_args += ["-password", str(device_password)] - # Add AirPlay credentials from pairing if available (for Apple devices) - if ap_credentials := self.player.config.get_value(CONF_AP_CREDENTIALS): - extra_args += ["-secret", str(ap_credentials)] + # Add RAOP credentials from pairing if available (for Apple devices) + if raop_credentials := self.player.config.get_value(CONF_RAOP_CREDENTIALS): + # Credentials format is "client_id:auth_secret", cliraop expects just auth_secret + creds_str = str(raop_credentials) + auth_secret = creds_str.split(":", 1)[1] if ":" in creds_str else creds_str + extra_args += ["-secret", auth_secret] if self.prov.logger.isEnabledFor(logging.DEBUG): extra_args += ["-debug", "5"] elif self.prov.logger.isEnabledFor(VERBOSE_LOG_LEVEL): extra_args += ["-debug", "10"] - # cliraop is the binary that handles the actual raop streaming to the player - # this is a slightly modified version of philippe44's libraop - # https://github.com/music-assistant/libraop - # we use this intermediate binary to do the actual streaming because attempts to do - # so using pure python (e.g. pyatv) were not successful due to the realtime nature - cliraop_args = [ cli_binary, "-ntpstart", @@ -90,7 +84,7 @@ class RaopStream(AirPlayProtocol): "-udn", self.player.raop_discovery_info.name, self.player.address, - self.audio_pipe.path, + "-", # Use stdin for audio input ] self.player.logger.debug( "Starting cliraop process for player %s with args: %s", @@ -99,74 +93,8 @@ class RaopStream(AirPlayProtocol): ) self._cli_proc = AsyncProcess(cliraop_args, stdin=True, stderr=True, name="cliraop") await self._cli_proc.start() - - # read up to first 50 lines of stderr to get the initial status - for _ in range(50): - line = (await self._cli_proc.read_stderr()).decode("utf-8", errors="ignore") - self.player.logger.debug(line) - if "connected to " in line: - self.player.logger.info("AirPlay device connected. Starting playback.") - break - if "Cannot connect to AirPlay device" in line: - raise PlayerCommandFailed("Cannot connect to AirPlay device") - - # start reading the stderr of the cliraop process from another task + # start reading the stderr of the cliap2 process from another task self._cli_proc.attach_stderr_reader(self.mass.create_task(self._stderr_reader())) - # repeat sending the volume level to the player because some players seem - # to ignore it the first time - # https://github.com/music-assistant/support/issues/3330 - self.mass.call_later(1, self.send_cli_command(f"VOLUME={self.player.volume_level}\n")) - - async def start_pairing(self) -> None: - """Start pairing process for this protocol (if supported).""" - assert self.player.raop_discovery_info is not None # for type checker - cli_binary = await get_cli_binary(self.player.protocol) - - cliraop_args = [ - cli_binary, - "-pair", - "-if", - self.mass.streams.bind_ip, - "-port", - str(self.player.raop_discovery_info.port), - "-udn", - self.player.raop_discovery_info.name, - self.player.address, - ] - self.player.logger.debug( - "Starting PAIRING with cliraop process for player %s with args: %s", - self.player.player_id, - cliraop_args, - ) - self._cli_proc = AsyncProcess(cliraop_args, stdin=True, stderr=True, name="cliraop") - await self._cli_proc.start() - # read up to first 10 lines of stderr to get the initial status - for _ in range(10): - line = (await self._cli_proc.read_stderr()).decode("utf-8", errors="ignore") - self.player.logger.debug(line) - if "enter PIN code displayed on " in line: - self.is_pairing = True - return - await self._cli_proc.close() - raise PlayerCommandFailed("Pairing failed") - - async def finish_pairing(self, pin: str) -> str: - """Finish pairing process with given PIN (if supported).""" - if not self.is_pairing: - await self.start_pairing() - if not self._cli_proc or self._cli_proc.closed: - raise PlayerCommandFailed("Pairing process not started") - - self.is_pairing = False - _, _stderr = await self._cli_proc.communicate(input=f"{pin}\n".encode(), timeout=10) - for line in _stderr.decode().splitlines(): - self.player.logger.debug(line) - for error in ("device did not respond", "can't authentify", "pin failed"): - if error in line.lower(): - raise PlayerCommandFailed(f"Pairing failed: {error}") - if "secret is " in line: - return line.split("secret is ")[1].strip() - raise PlayerCommandFailed(f"Pairing failed: {_stderr.decode().strip()}") async def _stderr_reader(self) -> None: """Monitor stderr for the running CLIRaop process.""" @@ -176,13 +104,20 @@ class RaopStream(AirPlayProtocol): if not self._cli_proc: return async for line in self._cli_proc.iter_stderr(): + if self._stopped: + break + if "connected to " in line: + self._connected.set() + # successfully connected - playback will/can start if "set pause" in line or "Pause at" in line: - player.set_state_from_stream(state=PlaybackState.PAUSED) - if "Restarted at" in line or "restarting w/ pause" in line: - player.set_state_from_stream(state=PlaybackState.PLAYING) - if "restarting w/o pause" in line: + player.set_state_from_stream(state=PlaybackState.PAUSED, stream=self) + elif "Restarted at" in line or "restarting w/ pause" in line: + player.set_state_from_stream(state=PlaybackState.PLAYING, stream=self) + elif "restarting w/o pause" in line: # streaming has started - player.set_state_from_stream(state=PlaybackState.PLAYING, elapsed_time=0) + player.set_state_from_stream( + state=PlaybackState.PLAYING, elapsed_time=0, stream=self + ) if "lost packet out of backlog" in line: lost_packets += 1 if lost_packets == 100: @@ -199,4 +134,4 @@ class RaopStream(AirPlayProtocol): logger.debug("CLIRaop stderr reader ended") if not self._stopped: self._stopped = True - self.player.set_state_from_stream(state=PlaybackState.IDLE, elapsed_time=0) + self.player.set_state_from_stream(state=PlaybackState.IDLE, elapsed_time=0, stream=self) diff --git a/music_assistant/providers/airplay/provider.py b/music_assistant/providers/airplay/provider.py index 375dffc7..fd9f7de8 100644 --- a/music_assistant/providers/airplay/provider.py +++ b/music_assistant/providers/airplay/provider.py @@ -4,7 +4,6 @@ from __future__ import annotations import asyncio import socket -from random import randrange from typing import cast from music_assistant_models.enums import PlaybackState @@ -49,7 +48,10 @@ class AirPlayProvider(PlayerProvider): """Handle async initialization of the provider.""" # register DACP zeroconf service dacp_port = await select_free_port(39831, 49831) - self.dacp_id = dacp_id = f"{randrange(2**64):X}" + # Use first 16 hex chars of server_id as a persistent DACP ID + # This ensures the DACP ID remains the same across restarts, which is required + # for AirPlay 2 (HAP) pair-verify to work with previously paired devices + self.dacp_id = dacp_id = self.mass.server_id[:16].upper() self.logger.debug("Starting DACP ActiveRemote %s on port %s", dacp_id, dacp_port) self._dacp_server = await asyncio.start_server( self._handle_dacp_request, "0.0.0.0", dacp_port @@ -316,7 +318,10 @@ class AirPlayProvider(PlayerProvider): player.update_volume_from_device(volume) elif "device-prevent-playback=1" in path: # device switched to another source (or is powered off) - if stream := player.stream: + # Ignore during stream transition (stale message from old CLI process) + if player._transitioning: + self.logger.debug("Ignoring prevent-playback during stream transition") + elif stream := player.stream: stream.prevent_playback = True if stream.session: self.mass.create_task(stream.session.remove_client(player)) diff --git a/music_assistant/providers/airplay/stream_session.py b/music_assistant/providers/airplay/stream_session.py index 775bda66..a6fc10a3 100644 --- a/music_assistant/providers/airplay/stream_session.py +++ b/music_assistant/providers/airplay/stream_session.py @@ -8,19 +8,18 @@ from collections.abc import AsyncGenerator from contextlib import suppress from typing import TYPE_CHECKING +from music_assistant_models.errors import PlayerCommandFailed + from music_assistant.constants import CONF_SYNC_ADJUST from music_assistant.helpers.audio import get_player_filter_params from music_assistant.helpers.ffmpeg import FFMpeg -from music_assistant.helpers.util import TaskManager from music_assistant.providers.airplay.helpers import ntp_to_unix_time, unix_time_to_ntp from .constants import ( AIRPLAY2_CONNECT_TIME_MS, - AIRPLAY_OUTPUT_BUFFER_DURATION_MS, - AIRPLAY_PRELOAD_SECONDS, - AIRPLAY_PROCESS_SPAWN_TIME_MS, CONF_ENABLE_LATE_JOIN, ENABLE_LATE_JOIN_DEFAULT, + RAOP_CONNECT_TIME_MS, StreamingProtocol, ) from .protocols.airplay2 import AirPlay2Stream @@ -44,11 +43,9 @@ class AirPlayStreamSession: ) -> None: """Initialize AirPlayStreamSession. - Args: - airplay_provider: The AirPlay provider instance - sync_clients: List of AirPlay players to stream to - pcm_format: PCM format of the input stream - audio_source: Async generator yielding audio chunks + :param airplay_provider: The AirPlay provider instance. + :param sync_clients: List of AirPlay players to stream to. + :param pcm_format: PCM format of the input stream. """ assert sync_clients self.prov = airplay_provider @@ -60,76 +57,49 @@ class AirPlayStreamSession: self._lock = asyncio.Lock() self.start_ntp: int = 0 self.start_time: float = 0.0 - self.wait_start: float = 0.0 # in seconds - self.seconds_streamed: float = 0 # Total seconds sent to session - # because we reuse an existing stream session for new play_media requests, - # we need to track when the last stream was started - self.last_stream_started: float = 0.0 + self.wait_start: float = 0.0 + self.seconds_streamed: float = 0 self.total_pause_time: float = 0.0 self.last_paused: float | None = None - self._clients_ready = asyncio.Event() self._first_chunk_received = asyncio.Event() async def start(self, audio_source: AsyncGenerator[bytes, None]) -> None: """Initialize stream session for all players.""" - self.prov.logger.debug( - "Starting stream session with %d clients", - len(self.sync_clients), - ) - # Prepare all clients - # this will create the stream objects, named pipes and start ffmpeg - async with TaskManager(self.mass) as tm: - for _airplay_player in self.sync_clients: - tm.create_task(self._prepare_client(_airplay_player)) - - # Start audio source streamer task - # this will read from the audio source and distribute - # to all player-specific ffmpeg processes - # we start this task early because some streams (especially radio) - # may need more time to buffer - this way we ensure we have audio ready - # when the players should start playing - self._audio_source_task = asyncio.create_task(self._audio_streamer(audio_source)) - # wait until the first chunk is received before starting clients - await self._first_chunk_received.wait() - - # Start all clients - # Get current NTP timestamp and calculate wait time cur_time = time.time() - # AirPlay2 clients need around 2500ms to establish connection and start playback - # The also have a fixed 2000ms output buffer. We will not be able to respect the - # ntpstart time unless we cater for all these time delays. - # RAOP clients need less due to less RTSP exchanges and different packet buffer - # handling - # Plus we need to cater for process spawn and initialisation time - wait_start = ( - AIRPLAY2_CONNECT_TIME_MS - + AIRPLAY_OUTPUT_BUFFER_DURATION_MS - + AIRPLAY_PROCESS_SPAWN_TIME_MS - + (250 * len(self.sync_clients)) - ) # in milliseconds + has_airplay2_client = any( + p.protocol == StreamingProtocol.AIRPLAY2 for p in self.sync_clients + ) + wait_start = AIRPLAY2_CONNECT_TIME_MS if has_airplay2_client else RAOP_CONNECT_TIME_MS wait_start_seconds = wait_start / 1000 - self.wait_start = wait_start_seconds # in seconds + self.wait_start = wait_start_seconds self.start_time = cur_time + wait_start_seconds - self.last_stream_started = self.start_time self.start_ntp = unix_time_to_ntp(self.start_time) + await asyncio.gather(*[self._start_client(p, self.start_ntp) for p in self.sync_clients]) + self._audio_source_task = asyncio.create_task(self._audio_streamer(audio_source)) + try: + await asyncio.gather( + *[p.stream.wait_for_connection() for p in self.sync_clients if p.stream] + ) + except Exception: + # playback failed to start, cleanup + await self.stop() + raise PlayerCommandFailed("Playback failed to start") - async with TaskManager(self.mass) as tm: - for _airplay_player in self.sync_clients: - tm.create_task(self._start_client(_airplay_player, self.start_ntp)) - - # All clients started - self._clients_ready.set() - - async def stop(self) -> None: + async def stop(self, force: bool = False) -> None: """Stop playback and cleanup.""" - await asyncio.gather( - *[self.remove_client(x) for x in self.sync_clients], - return_exceptions=True, - ) if self._audio_source_task and not self._audio_source_task.done(): self._audio_source_task.cancel() with suppress(asyncio.CancelledError): await self._audio_source_task + if force: + await asyncio.gather( + *[self.stop_client(x, force=True) for x in self.sync_clients], + ) + self.sync_clients = [] + else: + await asyncio.gather( + *[self.remove_client(x) for x in self.sync_clients], + ) async def remove_client(self, airplay_player: AirPlayPlayer) -> None: """Remove a sync client from the session.""" @@ -137,15 +107,31 @@ class AirPlayStreamSession: if airplay_player not in self.sync_clients: return self.sync_clients.remove(airplay_player) - if airplay_player.stream and airplay_player.stream.session == self: - await airplay_player.stream.stop() - if ffmpeg := self._player_ffmpeg.pop(airplay_player.player_id, None): - await ffmpeg.close() + await self.stop_client(airplay_player) # If this was the last client, stop the session if not self.sync_clients: await self.stop() return + async def stop_client(self, airplay_player: AirPlayPlayer, force: bool = False) -> None: + """ + Stop a client's stream and ffmpeg. + + :param airplay_player: The player to stop. + :param force: If True, kill CLI process immediately. + """ + ffmpeg = self._player_ffmpeg.pop(airplay_player.player_id, None) + if force: + if ffmpeg and not ffmpeg.closed: + await ffmpeg.kill() + if airplay_player.stream and airplay_player.stream.session == self: + await airplay_player.stream.stop(force=True) + else: + if ffmpeg and not ffmpeg.closed: + await ffmpeg.close() + if airplay_player.stream and airplay_player.stream.session == self: + await airplay_player.stream.stop() + async def add_client(self, airplay_player: AirPlayPlayer) -> None: """Add a sync client to the session as a late joiner. @@ -162,10 +148,7 @@ class AirPlayStreamSession: CONF_ENABLE_LATE_JOIN, ENABLE_LATE_JOIN_DEFAULT ) if not allow_late_join: - # Late joining is not allowed - restart the session for all players - await self.stop() # we need to stop the current session to add a new client - # this could potentially be called by multiple players at the exact same time - # so we debounce the resync a bit here with a timer + await self.stop() if sync_leader.current_media: self.mass.call_later( 0.5, @@ -174,110 +157,39 @@ class AirPlayStreamSession: ) return - # Prepare the new client for streaming - await self._prepare_client(airplay_player) - - # Snapshot seconds_streamed inside lock to prevent race conditions - # Keep lock held during stream.start() to ensure player doesn't miss any chunks async with self._lock: - # Calculate skip_seconds based on how many chunks have been sent skip_seconds = self.seconds_streamed - # Start the stream at compensated NTP timestamp start_at = self.start_time + skip_seconds start_ntp = unix_time_to_ntp(start_at) - self.prov.logger.debug( - "Adding late joiner %s to session, playback starts %.3fs from now", - airplay_player.player_id, - start_at - time.time(), - ) - # Add player to sync clients list if airplay_player not in self.sync_clients: self.sync_clients.append(airplay_player) await self._start_client(airplay_player, start_ntp) - - async def replace_stream(self, audio_source: AsyncGenerator[bytes, None]) -> None: - """Replace the audio source of the stream.""" - self._first_chunk_received.clear() - new_audio_source_task = asyncio.create_task(self._audio_streamer(audio_source)) - await self._first_chunk_received.wait() - async with self._lock: - # Cancel the current audio source task - assert self._audio_source_task # for type checker - old_audio_source_task = self._audio_source_task - old_audio_source_task.cancel() - self._audio_source_task = new_audio_source_task - self.last_stream_started = time.time() + self.wait_start - self.total_pause_time = 0.0 - self.last_paused = None - for sync_client in self.sync_clients: - sync_client.set_state_from_stream(state=None, elapsed_time=0) - # ensure we cleanly wait for the old audio source task to finish - with suppress(asyncio.CancelledError): - await old_audio_source_task + if airplay_player.stream: + await airplay_player.stream.wait_for_connection() async def _audio_streamer(self, audio_source: AsyncGenerator[bytes, None]) -> None: """Stream audio to all players.""" pcm_sample_size = self.pcm_format.pcm_sample_size - stream_start_time = time.time() - first_chunk_received = False - # each chunk is exactly one second of audio data based on the pcm format. - async for chunk in audio_source: - if first_chunk_received is False: - first_chunk_received = True - self.prov.logger.debug( - "First audio chunk received after %.3fs", - time.time() - stream_start_time, - ) - self._first_chunk_received.set() - # Wait until all clients are ready - await self._clients_ready.wait() - # Send chunk to all players - async with self._lock: - sync_clients = [x for x in self.sync_clients if x.stream and x.stream.running] - if not sync_clients: - self.prov.logger.debug( - "Audio streamer exiting: No running clients left in session" - ) - return - - # Write chunk to all players - write_tasks = [ - self._write_chunk_to_player(x, chunk) for x in sync_clients if x.stream - ] - results = await asyncio.gather(*write_tasks, return_exceptions=True) - - # Check for write errors or timeouts - players_to_remove: list[AirPlayPlayer] = [] - for i, result in enumerate(results): - if i >= len(sync_clients): - continue - player = sync_clients[i] - - if isinstance(result, asyncio.TimeoutError): - self.prov.logger.warning( - "Removing player %s from session: stopped reading data (write timeout)", - player.player_id, - ) - players_to_remove.append(player) - elif isinstance(result, Exception): - self.prov.logger.warning( - "Removing player %s from session due to write error: %s", - player.player_id, - result, - ) - players_to_remove.append(player) - - # Remove failed/timed-out players from sync group - for player in players_to_remove: - self.mass.create_task(self.remove_client(player)) - - # Update chunk counter (each chunk is exactly one second of audio) - chunk_seconds = len(chunk) / pcm_sample_size - self.seconds_streamed += chunk_seconds - - # Entire stream consumed: send EOF - self.prov.logger.debug("Audio source stream exhausted") + watchdog_task = asyncio.create_task(self._silence_watchdog(pcm_sample_size)) + try: + async for chunk in audio_source: + if not self._first_chunk_received.is_set(): + watchdog_task.cancel() + with suppress(asyncio.CancelledError): + await watchdog_task + self._first_chunk_received.set() + + if not self.sync_clients: + break + + await self._write_chunk_to_all_players(chunk) + self.seconds_streamed += len(chunk) / pcm_sample_size + finally: + if not watchdog_task.done(): + watchdog_task.cancel() + with suppress(asyncio.CancelledError): + await watchdog_task async with self._lock: await asyncio.gather( *[ @@ -288,97 +200,114 @@ class AirPlayStreamSession: return_exceptions=True, ) - async def _write_chunk_to_player(self, airplay_player: AirPlayPlayer, chunk: bytes) -> None: - """ - Write audio chunk to a specific player. + async def _silence_watchdog(self, pcm_sample_size: int) -> None: + """Insert silence if audio source is slow to deliver first chunk.""" + grace_period = 0.2 + max_silence_padding = 5.0 + silence_inserted = 0.0 + + await asyncio.sleep(grace_period) + while not self._first_chunk_received.is_set() and silence_inserted < max_silence_padding: + silence_duration = 0.1 + silence_bytes = int(pcm_sample_size * silence_duration) + silence_chunk = bytes(silence_bytes) + await self._write_chunk_to_all_players(silence_chunk) + self.seconds_streamed += silence_duration + silence_inserted += silence_duration + await asyncio.sleep(0.05) + + if silence_inserted > 0: + self.prov.logger.warning( + "Inserted %.1fs silence padding while waiting for audio source", + silence_inserted, + ) - each chunk is (in general) one second of audio data based on the pcm format. - For late joiners, compensates for chunks sent between join time and actual chunk delivery. - Blocks (async) until the data has been written. - """ + async def _write_chunk_to_all_players(self, chunk: bytes) -> None: + """Write a chunk to all connected players.""" + async with self._lock: + sync_clients = [x for x in self.sync_clients if x.stream and x.stream.running] + if not sync_clients: + return + + # Write chunk to all players + write_tasks = [self._write_chunk_to_player(x, chunk) for x in sync_clients if x.stream] + results = await asyncio.gather(*write_tasks, return_exceptions=True) + + # Check for write errors or timeouts + players_to_remove: list[AirPlayPlayer] = [] + for i, result in enumerate(results): + if i >= len(sync_clients): + continue + player = sync_clients[i] + + if isinstance(result, TimeoutError): + self.prov.logger.warning( + "Removing player %s from session: stopped reading data (write timeout)", + player.player_id, + ) + players_to_remove.append(player) + elif isinstance(result, Exception): + self.prov.logger.warning( + "Removing player %s from session due to write error: %s", + player.player_id, + result, + ) + players_to_remove.append(player) + + for player in players_to_remove: + self.mass.create_task(self.remove_client(player)) + + async def _write_chunk_to_player(self, airplay_player: AirPlayPlayer, chunk: bytes) -> None: + """Write audio chunk to a player's ffmpeg process.""" player_id = airplay_player.player_id - # we write the chunk to the player's ffmpeg process which - # applies any player-specific filters (e.g. volume, dsp, etc) - # and outputs in the correct format for the player stream - # to the named pipe associated with the player's stream if ffmpeg := self._player_ffmpeg.get(player_id): if ffmpeg.closed: return - # Use a 35 second timeout - if the write takes longer, the player - # has stopped reading data and we're in a deadlock situation - # 35 seconds is a little bit above out pause timeout (30s) to allow for some margin await asyncio.wait_for(ffmpeg.write(chunk), timeout=35.0) async def _write_eof_to_player(self, airplay_player: AirPlayPlayer) -> None: """Write EOF to a specific player.""" - # cleanup any associated FFMpeg instance first if ffmpeg := self._player_ffmpeg.pop(airplay_player.player_id, None): await ffmpeg.write_eof() await ffmpeg.wait_with_timeout(30) - del ffmpeg + if airplay_player.stream and airplay_player.stream._cli_proc: + await airplay_player.stream._cli_proc.write_eof() - async def _prepare_client(self, airplay_player: AirPlayPlayer) -> None: - """Prepare stream for a single client.""" - # Stop existing stream if running + async def _start_client(self, airplay_player: AirPlayPlayer, start_ntp: int) -> None: + """Start CLI process and ffmpeg for a single client.""" if airplay_player.stream and airplay_player.stream.running: await airplay_player.stream.stop() - # Create appropriate stream type based on protocol if airplay_player.protocol == StreamingProtocol.AIRPLAY2: airplay_player.stream = AirPlay2Stream(airplay_player) else: airplay_player.stream = RaopStream(airplay_player) - # Link stream session to player stream airplay_player.stream.session = self - # create the named pipes - await airplay_player.stream.audio_pipe.create() - await airplay_player.stream.commands_pipe.create() - # start the (player-specific) ffmpeg process - # note that ffmpeg will open the named pipe for writing - await self._start_client_ffmpeg(airplay_player) - await asyncio.sleep(0.05) # allow ffmpeg to open the pipe properly - - async def _start_client(self, airplay_player: AirPlayPlayer, start_ntp: int) -> None: - """Start stream for a single client.""" sync_adjust = airplay_player.config.get_value(CONF_SYNC_ADJUST, 0) assert isinstance(sync_adjust, int) if sync_adjust != 0: - # apply sync adjustment - start_ntp += sync_adjust * 1000 # sync_adjust is in seconds, NTP in milliseconds start_ntp = unix_time_to_ntp(ntp_to_unix_time(start_ntp) + (sync_adjust / 1000)) - # start the stream - assert airplay_player.stream # for type checker await airplay_player.stream.start(start_ntp) - - async def _start_client_ffmpeg(self, airplay_player: AirPlayPlayer) -> None: - """Start or restart the player's ffmpeg stream.""" - # Clean up any existing FFmpeg instance for this player + # Start ffmpeg to feed audio to CLI stdin if ffmpeg := self._player_ffmpeg.pop(airplay_player.player_id, None): await ffmpeg.close() - del ffmpeg - assert airplay_player.stream # for type checker - # Create the FFMpeg instance per player which accepts our PCM audio - # applies any player-specific filters (e.g. volume, dsp, etc) - # and outputs in the correct format for the player stream - # to the named pipe associated with the player's stream filter_params = get_player_filter_params( self.mass, airplay_player.player_id, self.pcm_format, airplay_player.stream.pcm_format, ) + cli_proc = airplay_player.stream._cli_proc + assert cli_proc + assert cli_proc.proc + assert cli_proc.proc.stdin + stdin_transport = cli_proc.proc.stdin.transport + audio_output: str | int = stdin_transport.get_extra_info("pipe").fileno() ffmpeg = FFMpeg( audio_input="-", input_format=self.pcm_format, output_format=airplay_player.stream.pcm_format, filter_params=filter_params, - audio_output=airplay_player.stream.audio_pipe.path, - extra_input_args=[ - "-y", - "-readrate", - "1", - "-readrate_initial_burst", - f"{AIRPLAY_PRELOAD_SECONDS}", - ], + audio_output=audio_output, ) await ffmpeg.start() self._player_ffmpeg[airplay_player.player_id] = ffmpeg diff --git a/requirements_all.txt b/requirements_all.txt index 8c8b769c..987e51ce 100644 --- a/requirements_all.txt +++ b/requirements_all.txt @@ -67,6 +67,7 @@ shortuuid==1.0.13 snapcast==2.3.7 soco==0.30.12 soundcloudpy==0.1.4 +srptools>=1.0.0 sxm==0.2.8 unidecode==1.4.0 uv>=0.8.0