Fixes for the AirPlay provider (#3014)
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Sat, 24 Jan 2026 00:26:40 +0000 (01:26 +0100)
committerGitHub <noreply@github.com>
Sat, 24 Jan 2026 00:26:40 +0000 (01:26 +0100)
21 files changed:
music_assistant/controllers/config.py
music_assistant/helpers/named_pipe.py
music_assistant/helpers/process.py
music_assistant/models/player.py
music_assistant/providers/airplay/bin/cliap2-linux-aarch64
music_assistant/providers/airplay/bin/cliap2-linux-x86_64
music_assistant/providers/airplay/bin/cliap2-macos-arm64
music_assistant/providers/airplay/bin/cliraop-linux-aarch64
music_assistant/providers/airplay/bin/cliraop-linux-x86_64
music_assistant/providers/airplay/bin/cliraop-macos-arm64
music_assistant/providers/airplay/constants.py
music_assistant/providers/airplay/helpers.py
music_assistant/providers/airplay/manifest.json
music_assistant/providers/airplay/pairing.py [new file with mode: 0644]
music_assistant/providers/airplay/player.py
music_assistant/providers/airplay/protocols/_protocol.py
music_assistant/providers/airplay/protocols/airplay2.py
music_assistant/providers/airplay/protocols/raop.py
music_assistant/providers/airplay/provider.py
music_assistant/providers/airplay/stream_session.py
requirements_all.txt

index 7a06fe7afeb952ced9ba68f9478cd953c8611d3a..9a07583c56e4e9235fdbd3030f14bb67f9446a1d 100644 (file)
@@ -1384,6 +1384,23 @@ class ConfigController:
             player_config["provider"] = prov.instance_id
             changed = True
 
+        # Migrate AirPlay legacy credentials (ap_credentials) to protocol-specific keys
+        # The old key was used for both RAOP and AirPlay, now we have separate keys
+        for player_id, player_config in self._data.get(CONF_PLAYERS, {}).items():
+            if player_config.get("provider") != "airplay":
+                continue
+            if not (values := player_config.get("values")):
+                continue
+            if "ap_credentials" not in values:
+                continue
+            # Migrate to raop_credentials (RAOP is the default/fallback protocol)
+            # The new code will use the correct key based on the protocol
+            old_creds = values.pop("ap_credentials")
+            if old_creds and "raop_credentials" not in values:
+                values["raop_credentials"] = old_creds
+                LOGGER.info("Migrated AirPlay credentials for player %s", player_id)
+            changed = True
+
         if changed:
             await self._async_save()
 
index ac26ddb57d01f416ac59620f4bbeaca07e7333c5..380660f55f9263859de361145e40b2c4f45047f9 100644 (file)
@@ -3,22 +3,23 @@
 from __future__ import annotations
 
 import asyncio
+import errno as errno_module
+import logging
 import os
-import stat
+import time
 from contextlib import suppress
 from pathlib import Path
 
+_LOGGER = logging.getLogger("named_pipe")
+
 
 class AsyncNamedPipeWriter:
-    """Simple async writer for named pipes using thread pool for blocking I/O."""
+    """Async writer for named pipes."""
 
     def __init__(self, pipe_path: str) -> None:
-        """Initialize named pipe writer.
-
-        Args:
-            pipe_path: Path to the named pipe
-        """
+        """Initialize named pipe writer."""
         self._pipe_path = pipe_path
+        self._write_fd: int | None = None
 
     @property
     def path(self) -> str:
@@ -26,39 +27,66 @@ class AsyncNamedPipeWriter:
         return self._pipe_path
 
     async def create(self) -> None:
-        """Create the named pipe (if it does not exist)."""
+        """Create the named pipe."""
 
         def _create() -> None:
-            try:
-                os.mkfifo(self._pipe_path)
-            except FileExistsError:
-                # Check if existing file is actually a named pipe
-                file_stat = os.stat(self._pipe_path)
-                if not stat.S_ISFIFO(file_stat.st_mode):
-                    # Not a FIFO - remove and recreate
-                    Path(self._pipe_path).unlink()
-                    os.mkfifo(self._pipe_path)
+            pipe_path = Path(self._pipe_path)
+            if pipe_path.exists():
+                pipe_path.unlink()
+            os.mkfifo(self._pipe_path)
 
         await asyncio.to_thread(_create)
 
+    def _ensure_write_fd(self) -> bool:
+        """Ensure we have a write fd open. Returns True if successful."""
+        if self._write_fd is not None:
+            return True
+        if not Path(self._pipe_path).exists():
+            return False
+        # Retry opening until reader is available (up to 1s)
+        for _ in range(20):
+            try:
+                self._write_fd = os.open(self._pipe_path, os.O_WRONLY | os.O_NONBLOCK)
+                return True
+            except OSError as e:
+                if e.errno in (errno_module.ENXIO, errno_module.ENOENT):
+                    time.sleep(0.05)
+                    continue
+                raise
+        _LOGGER.warning("Could not open pipe %s: no reader after retries", self._pipe_path)
+        return False
+
     async def write(self, data: bytes) -> None:
-        """Write data to the named pipe (blocking operation runs in thread)."""
+        """Write data to the named pipe."""
 
         def _write() -> None:
-            with open(self._pipe_path, "wb") as pipe_file:
-                pipe_file.write(data)
+            if not self._ensure_write_fd():
+                return
+            try:
+                assert self._write_fd is not None
+                os.write(self._write_fd, data)
+            except OSError as e:
+                if e.errno == errno_module.EPIPE:
+                    # Reader closed, reset fd for next attempt
+                    if self._write_fd is not None:
+                        with suppress(Exception):
+                            os.close(self._write_fd)
+                        self._write_fd = None
+                else:
+                    raise
 
-        # Run blocking write in thread pool
         await asyncio.to_thread(_write)
 
     async def remove(self) -> None:
-        """Remove the named pipe."""
-
-        def _remove() -> None:
+        """Close write fd and remove the pipe."""
+        if self._write_fd is not None:
             with suppress(Exception):
-                Path(self._pipe_path).unlink()
-
-        await asyncio.to_thread(_remove)
+                os.close(self._write_fd)
+            self._write_fd = None
+        pipe_path = Path(self._pipe_path)
+        if pipe_path.exists():
+            with suppress(Exception):
+                pipe_path.unlink()
 
     def __str__(self) -> str:
         """Return string representation."""
index 0fa6183b7ebe10543a050f0d688e0e6b1f1dd66f..1da3541f2baf954935cc16ff570c97eb54d318a9 100644 (file)
@@ -266,22 +266,26 @@ class AsyncProcess:
                 await self._stdin_feeder_task
 
         # close stdin to signal we're done sending data
-        await asyncio.wait_for(self._stdin_lock.acquire(), 10)
+        with suppress(TimeoutError, asyncio.CancelledError):
+            await asyncio.wait_for(self._stdin_lock.acquire(), 5)
         if self.proc.stdin and not self.proc.stdin.is_closing():
             self.proc.stdin.close()
         elif not self.proc.stdin and self.proc.returncode is None:
             self.proc.send_signal(SIGINT)
 
         # ensure we have no more readers active and stdout is drained
-        await asyncio.wait_for(self._stdout_lock.acquire(), 10)
+        with suppress(TimeoutError, asyncio.CancelledError):
+            await asyncio.wait_for(self._stdout_lock.acquire(), 5)
         if self.proc.stdout and not self.proc.stdout.at_eof():
             with suppress(Exception):
                 await self.proc.stdout.read(-1)
         # if we have a stderr task active, allow it to finish
         if self._stderr_reader_task:
-            await asyncio.wait_for(self._stderr_reader_task, 10)
+            with suppress(TimeoutError, asyncio.CancelledError):
+                await asyncio.wait_for(self._stderr_reader_task, 5)
         elif self.proc.stderr and not self.proc.stderr.at_eof():
-            await asyncio.wait_for(self._stderr_lock.acquire(), 10)
+            with suppress(TimeoutError, asyncio.CancelledError):
+                await asyncio.wait_for(self._stderr_lock.acquire(), 5)
             # drain stderr
             with suppress(Exception):
                 await self.proc.stderr.read(-1)
@@ -289,18 +293,32 @@ class AsyncProcess:
         # make sure the process is really cleaned up.
         # especially with pipes this can cause deadlocks if not properly guarded
         # we need to ensure stdout and stderr are flushed and stdin closed
+        pid = self.proc.pid
+        terminate_attempts = 0
         while self.returncode is None:
             try:
                 # use communicate to flush all pipe buffers
-                await asyncio.wait_for(self.proc.communicate(), 5)
+                await asyncio.wait_for(self.proc.communicate(), 2)
             except TimeoutError:
+                terminate_attempts += 1
                 self.logger.debug(
-                    "Process %s with PID %s did not stop in time. Sending terminate...",
+                    "Process %s with PID %s did not stop in time (attempt %d). Sending SIGKILL...",
                     self.name,
-                    self.proc.pid,
+                    pid,
+                    terminate_attempts,
                 )
-                with suppress(ProcessLookupError):
-                    self.proc.terminate()
+                # Use os.kill for more direct signal delivery
+                with suppress(ProcessLookupError, OSError):
+                    os.kill(pid, 9)  # SIGKILL = 9
+                # Give up after 5 attempts - process may be zombie
+                if terminate_attempts >= 5:
+                    self.logger.warning(
+                        "Process %s (PID %s) did not terminate after %d SIGKILL attempts",
+                        self.name,
+                        pid,
+                        terminate_attempts,
+                    )
+                    break
         self.logger.log(
             VERBOSE_LOG_LEVEL,
             "Process %s with PID %s stopped with returncode %s",
@@ -309,6 +327,70 @@ class AsyncProcess:
             self.returncode,
         )
 
+    async def kill(self) -> None:
+        """
+        Immediately kill the process with SIGKILL.
+
+        Use this for forceful termination when the process doesn't respond to
+        normal termination signals. Unlike close(), this doesn't attempt graceful
+        shutdown - it immediately sends SIGKILL.
+        """
+        self._close_called = True
+        if not self.proc or self.returncode is not None:
+            return
+
+        pid = self.proc.pid
+
+        # Cancel stdin feeder task if any
+        if self._stdin_feeder_task and not self._stdin_feeder_task.done():
+            self._stdin_feeder_task.cancel()
+            with suppress(asyncio.CancelledError, Exception):
+                await self._stdin_feeder_task
+
+        # Cancel stderr reader task if any
+        if self._stderr_reader_task and not self._stderr_reader_task.done():
+            self._stderr_reader_task.cancel()
+            with suppress(asyncio.CancelledError, Exception):
+                await self._stderr_reader_task
+
+        # Close all pipes first to prevent any I/O blocking
+        # This helps processes stuck on blocked I/O to receive signals
+        if self.proc.stdin and not self.proc.stdin.is_closing():
+            self.proc.stdin.close()
+        if self.proc.stdout:
+            self.proc.stdout.feed_eof()
+        if self.proc.stderr:
+            self.proc.stderr.feed_eof()
+
+        # Send SIGKILL immediately using os.kill for more direct signal delivery
+        self.logger.debug("Killing process %s with PID %s", self.name, pid)
+        with suppress(ProcessLookupError, OSError):
+            os.kill(pid, 9)  # SIGKILL = 9
+
+        # Wait for process to actually terminate
+        try:
+            await asyncio.wait_for(self.proc.wait(), 2)
+        except TimeoutError:
+            # Try one more time with os.kill
+            with suppress(ProcessLookupError, OSError):
+                os.kill(pid, 9)
+            try:
+                await asyncio.wait_for(self.proc.wait(), 2)
+            except TimeoutError:
+                self.logger.warning(
+                    "Process %s with PID %s did not terminate after SIGKILL - may be zombie",
+                    self.name,
+                    pid,
+                )
+
+        self.logger.log(
+            VERBOSE_LOG_LEVEL,
+            "Process %s with PID %s killed with returncode %s",
+            self.name,
+            pid,
+            self.returncode,
+        )
+
     async def wait(self) -> int:
         """Wait for the process and return the returncode."""
         if self._returncode is None:
index f9b334acdf8c9f5a28fd24a6f6e86907cda4c84f..8eefaccfb50bb7df9856b90753fddfc06e4d5f5b 100644 (file)
@@ -846,7 +846,9 @@ class Player(ABC):
         """
         # if the player is grouped/synced, use the active source of the group/parent player
         if parent_player_id := (self.active_group or self.synced_to):
-            if parent_player := self.mass.players.get(parent_player_id):
+            if parent_player_id != self.player_id and (
+                parent_player := self.mass.players.get(parent_player_id)
+            ):
                 return parent_player.active_source
         for plugin_source in self.mass.players.get_plugin_sources():
             if plugin_source.in_use_by == self.player_id:
@@ -1331,7 +1333,9 @@ class Player(ABC):
             )
         # if the player is grouped/synced, use the current_media of the group/parent player
         if parent_player_id := (self.active_group or self.synced_to):
-            if parent_player := self.mass.players.get(parent_player_id):
+            if parent_player_id != self.player_id and (
+                parent_player := self.mass.players.get(parent_player_id)
+            ):
                 return parent_player.current_media
         # if a pluginsource is currently active, return those details
         if (
index 0e085c5436a68f1cbf72db56fdbb0a4553c60764..b5f487e3094d59ba494e6009f17ff88ea71c3329 100755 (executable)
Binary files a/music_assistant/providers/airplay/bin/cliap2-linux-aarch64 and b/music_assistant/providers/airplay/bin/cliap2-linux-aarch64 differ
index 7d95c1a6ef1a95ce6983254978960e348e7d8adf..0a6e222d23beb027001abf8b93bab1c9f51c793f 100755 (executable)
Binary files a/music_assistant/providers/airplay/bin/cliap2-linux-x86_64 and b/music_assistant/providers/airplay/bin/cliap2-linux-x86_64 differ
index 570b796ffbcda1a8cfd814a260f67b20be0bd126..e2a952dd9d4a20db3e47dd403f68fa5f598f3e94 100755 (executable)
Binary files a/music_assistant/providers/airplay/bin/cliap2-macos-arm64 and b/music_assistant/providers/airplay/bin/cliap2-macos-arm64 differ
index f8367fe3d0724f0611900283dbe5d6f705ea5064..2b7b9a4fdb2862e55da530e48c804d219d244d5f 100755 (executable)
Binary files a/music_assistant/providers/airplay/bin/cliraop-linux-aarch64 and b/music_assistant/providers/airplay/bin/cliraop-linux-aarch64 differ
index 29c4c1b08d2578c5cc661c6f5f2539ed2088f547..af3ad678de3102bedcc6f7e1f2778d8d649ab09b 100755 (executable)
Binary files a/music_assistant/providers/airplay/bin/cliraop-linux-x86_64 and b/music_assistant/providers/airplay/bin/cliraop-linux-x86_64 differ
index 4fb8f38731a89b0b9a539213aafe269e19e38922..0f74b1a1a2ec911118c61be6ef689a92a9c0d318 100755 (executable)
Binary files a/music_assistant/providers/airplay/bin/cliraop-macos-arm64 and b/music_assistant/providers/airplay/bin/cliraop-macos-arm64 differ
index 08562028b50e351dfe9dfd6f3db423050594e4e7..a4d114d615ae87365390788edc9cbcc9b50d681d 100644 (file)
@@ -2,7 +2,7 @@
 
 from __future__ import annotations
 
-from enum import Enum
+from enum import IntEnum
 from typing import Final
 
 from music_assistant_models.enums import ContentType
@@ -13,7 +13,7 @@ from music_assistant.constants import INTERNAL_PCM_FORMAT
 DOMAIN = "airplay"
 
 
-class StreamingProtocol(Enum):
+class StreamingProtocol(IntEnum):
     """AirPlay streaming protocol versions."""
 
     RAOP = 1  # AirPlay 1 (RAOP)
@@ -34,25 +34,25 @@ AIRPLAY_DISCOVERY_TYPE: Final[str] = "_airplay._tcp.local."
 RAOP_DISCOVERY_TYPE: Final[str] = "_raop._tcp.local."
 DACP_DISCOVERY_TYPE: Final[str] = "_dacp._tcp.local."
 
-AIRPLAY_PRELOAD_SECONDS: Final[int] = (
-    5  # Number of seconds (in PCM) to preload before throttling back
-)
-AIRPLAY_PROCESS_SPAWN_TIME_MS: Final[int] = (
-    200  # Time in ms to allow AirPlay CLI processes to spawn and initialise
-)
 AIRPLAY_OUTPUT_BUFFER_DURATION_MS: Final[int] = (
     2000  # Read ahead buffer for cliraop. Output buffer duration for cliap2.
 )
 AIRPLAY2_MIN_LOG_LEVEL: Final[int] = 3  # Min loglevel to ensure stderr output contains what we need
 AIRPLAY2_CONNECT_TIME_MS: Final[int] = 2500  # Time in ms to allow AirPlay2 device to connect
+RAOP_CONNECT_TIME_MS: Final[int] = 1000  # Time in ms to allow RAOP device to connect
+
+# Per-protocol credential storage keys
+CONF_RAOP_CREDENTIALS: Final[str] = "raop_credentials"
+CONF_AIRPLAY_CREDENTIALS: Final[str] = "airplay_credentials"
+
+# Legacy credential key (for migration)
 CONF_AP_CREDENTIALS: Final[str] = "ap_credentials"
-CONF_MRP_CREDENTIALS: Final[str] = "mrp_credentials"
-CONF_ACTION_START_PAIRING: Final[str] = "start_ap_pairing"
-CONF_ACTION_FINISH_PAIRING: Final[str] = "finish_ap_pairing"
-CONF_ACTION_START_MRP_PAIRING: Final[str] = "start_mrp_pairing"
-CONF_ACTION_FINISH_MRP_PAIRING: Final[str] = "finish_mrp_pairing"
+
+# Pairing action keys
+CONF_ACTION_START_PAIRING: Final[str] = "start_pairing"
+CONF_ACTION_FINISH_PAIRING: Final[str] = "finish_pairing"
+CONF_ACTION_RESET_PAIRING: Final[str] = "reset_pairing"
 CONF_PAIRING_PIN: Final[str] = "pairing_pin"
-CONF_MRP_PAIRING_PIN: Final[str] = "mrp_pairing_pin"
 CONF_ENABLE_LATE_JOIN: Final[str] = "enable_late_join"
 
 BACKOFF_TIME_LOWER_LIMIT: Final[int] = 15  # seconds
@@ -80,12 +80,13 @@ BROKEN_AIRPLAY_MODELS = (
     ("Sonos", "Move 2"),
     ("Sonos", "Roam 2"),
     ("Sonos", "Arc Ultra"),
-    # Samsung has been repeatedly being reported as having issues with AirPlay 1/raop
+    # Samsung has been repeatedly being reported as having issues with AirPlay (raop and AP2)
     ("Samsung", "*"),
 )
 
 AIRPLAY_2_DEFAULT_MODELS = (
     # Models that are known to work better with AirPlay 2 protocol instead of RAOP
+    # These use the translated/friendly model names from get_model_info()
     ("Ubiquiti Inc.", "*"),
     ("Juke Audio", "*"),
 )
index 68ec1da2edcfbbdb3030f2711f123f9474697d79..31b372f3a66a0f0c567c2890b24dade1c5aed8e6 100644 (file)
@@ -115,7 +115,7 @@ def get_primary_ip_address_from_zeroconf(discovery_info: AsyncServiceInfo) -> st
 
 
 def is_broken_airplay_model(manufacturer: str, model: str) -> bool:
-    """Check if a model is known to have broken RAOP support."""
+    """Check if a model is known to have broken AirPlay support."""
     for broken_manufacturer, broken_model in BROKEN_AIRPLAY_MODELS:
         if broken_manufacturer in (manufacturer, "*") and broken_model in (model, "*"):
             return True
@@ -272,6 +272,32 @@ def unix_time_to_ntp(unix_timestamp: float) -> int:
     return (ntp_seconds << 32) | ntp_fraction
 
 
+def player_id_to_mac_address(player_id: str) -> str:
+    """Convert a player_id to a MAC address-like string."""
+    # the player_id is the mac address prefixed with "ap"
+    hex_str = player_id.replace("ap", "").upper()
+    return ":".join(hex_str[i : i + 2] for i in range(0, 12, 2))
+
+
+def generate_active_remote_id(mac_address: str) -> str:
+    """
+    Generate an Active-Remote ID for DACP communication.
+
+    The Active-Remote ID is used to match DACP callbacks from devices to the
+    correct stream. This function generates a consistent ID based on the
+    player_id (=macaddress, =device id), converted to uint32).
+
+    :return: Active-Remote ID as decimal string.
+    """
+    # Convert MAC address format to uint32
+    # Remove colons: "AA:BB:CC:DD:EE:FF" -> "AABBCCDDEEFF"
+    hex_str = mac_address.replace(":", "").upper()
+    # Parse as uint64 and truncate to uint32 (lower 32 bits)
+    device_id_u64 = int(hex_str, 16)
+    device_id_u32 = device_id_u64 & 0xFFFFFFFF
+    return str(device_id_u32)
+
+
 def add_seconds_to_ntp(ntp_timestamp: int, seconds: float) -> int:
     """
     Add seconds to an NTP timestamp.
index 5468699109570dc66cec61528deb0ee829ec97bb..4129e84b8b078a302cc2640d975672e1159e8426 100644 (file)
@@ -9,7 +9,7 @@
     "[libraop (RAOP)](https://github.com/music-assistant/libraop)",
     "[OwnTone (AirPlay2)](https://github.com/OwnTone)"
   ],
-  "requirements": [],
+  "requirements": ["srptools>=1.0.0"],
   "documentation": "https://music-assistant.io/player-support/airplay/",
   "multi_instance": false,
   "builtin": false,
diff --git a/music_assistant/providers/airplay/pairing.py b/music_assistant/providers/airplay/pairing.py
new file mode 100644 (file)
index 0000000..de6112c
--- /dev/null
@@ -0,0 +1,782 @@
+"""Native pairing implementations for AirPlay devices.
+
+This module provides pairing support for:
+- AirPlay 2 (HAP - HomeKit Accessory Protocol) - for Apple TV 4+, HomePod, Mac
+- RAOP (AirPlay 1 legacy pairing) - for older devices
+
+Both implementations produce credentials compatible with cliap2/cliraop.
+"""
+
+from __future__ import annotations
+
+import binascii
+import hashlib
+import logging
+import os
+import plistlib
+import uuid
+
+import aiohttp
+from cryptography.hazmat.primitives import hashes, serialization
+from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
+from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
+from cryptography.hazmat.primitives.ciphers.aead import ChaCha20Poly1305
+from cryptography.hazmat.primitives.kdf.hkdf import HKDF
+from music_assistant_models.errors import PlayerCommandFailed
+from srptools import SRPClientSession, SRPContext
+
+from .constants import StreamingProtocol
+
+# ============================================================================
+# Common utilities
+# ============================================================================
+
+
+def hkdf_derive(
+    input_key: bytes,
+    salt: bytes,
+    info: bytes,
+    length: int = 32,
+) -> bytes:
+    """Derive key using HKDF-SHA512.
+
+    :param input_key: Input keying material.
+    :param salt: Salt value.
+    :param info: Context info.
+    :param length: Output key length.
+    :return: Derived key bytes.
+    """
+    hkdf = HKDF(
+        algorithm=hashes.SHA512(),
+        length=length,
+        salt=salt,
+        info=info,
+    )
+    return hkdf.derive(input_key)
+
+
+# ============================================================================
+# TLV encoding/decoding for HAP
+# ============================================================================
+
+# TLV types for HAP pairing
+TLV_METHOD = 0x00
+TLV_IDENTIFIER = 0x01
+TLV_SALT = 0x02
+TLV_PUBLIC_KEY = 0x03
+TLV_PROOF = 0x04
+TLV_ENCRYPTED_DATA = 0x05
+TLV_STATE = 0x06
+TLV_ERROR = 0x07
+TLV_SIGNATURE = 0x0A
+
+
+def tlv_encode(items: list[tuple[int, bytes]]) -> bytes:
+    """Encode items into TLV format.
+
+    :param items: List of (type, value) tuples.
+    :return: TLV-encoded bytes.
+    """
+    result = bytearray()
+    for tlv_type, value in items:
+        offset = 0
+        while offset < len(value):
+            chunk = value[offset : offset + 255]
+            result.append(tlv_type)
+            result.append(len(chunk))
+            result.extend(chunk)
+            offset += 255
+        if len(value) == 0:
+            result.append(tlv_type)
+            result.append(0)
+    return bytes(result)
+
+
+def tlv_decode(data: bytes) -> dict[int, bytes]:
+    """Decode TLV format into dictionary.
+
+    :param data: TLV-encoded bytes.
+    :return: Dictionary mapping type to concatenated value.
+    """
+    result: dict[int, bytearray] = {}
+    offset = 0
+    while offset < len(data):
+        tlv_type = data[offset]
+        length = data[offset + 1]
+        value = data[offset + 2 : offset + 2 + length]
+        if tlv_type in result:
+            result[tlv_type].extend(value)
+        else:
+            result[tlv_type] = bytearray(value)
+        offset += 2 + length
+    return {k: bytes(v) for k, v in result.items()}
+
+
+# ============================================================================
+# HAP Pairing constants (for AirPlay 2)
+# ============================================================================
+
+# SRP 3072-bit prime for HAP (hex string format for srptools)
+HAP_SRP_PRIME_3072 = (
+    "FFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD129024E088A67CC74"
+    "020BBEA63B139B22514A08798E3404DDEF9519B3CD3A431B302B0A6DF25F1437"
+    "4FE1356D6D51C245E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7ED"
+    "EE386BFB5A899FA5AE9F24117C4B1FE649286651ECE45B3DC2007CB8A163BF05"
+    "98DA48361C55D39A69163FA8FD24CF5F83655D23DCA3AD961C62F356208552BB"
+    "9ED529077096966D670C354E4ABC9804F1746C08CA18217C32905E462E36CE3B"
+    "E39E772C180E86039B2783A2EC07A28FB5C55DF06F4C52C9DE2BCBF695581718"
+    "3995497CEA956AE515D2261898FA051015728E5A8AAAC42DAD33170D04507A33"
+    "A85521ABDF1CBA64ECFB850458DBEF0A8AEA71575D060C7DB3970F85A6E1E4C7"
+    "ABF5AE8CDB0933D71E8C94E04A25619DCEE3D2261AD2EE6BF12FFA06D98A0864"
+    "D87602733EC86A64521F2B18177B200CBBE117577A615D6C770988C0BAD946E2"
+    "08E24FA074E5AB3143DB5BFCE0FD108E4B82D120A93AD2CAFFFFFFFFFFFFFFFF"
+)
+HAP_SRP_GENERATOR = "5"
+
+
+# ============================================================================
+# RAOP Pairing constants (for AirPlay 1 legacy)
+# ============================================================================
+
+# SRP 2048-bit prime for RAOP (hex string format for srptools)
+RAOP_SRP_PRIME_2048 = (
+    "AC6BDB41324A9A9BF166DE5E1389582FAF72B6651987EE07FC319294"
+    "3DB56050A37329CBB4A099ED8193E0757767A13DD52312AB4B03310D"
+    "CD7F48A9DA04FD50E8083969EDB767B0CF6095179A163AB3661A05FB"
+    "D5FAAAE82918A9962F0B93B855F97993EC975EEAA80D740ADBF4FF74"
+    "7359D041D5C33EA71D281E446B14773BCA97B43A23FB801676BD207A"
+    "436C6481F1D2B9078717461A5B9D32E688F87748544523B524B0D57D"
+    "5EA77A2775D2ECFA032CFBDBF52FB3786160279004E57AE6AF874E73"
+    "03CE53299CCC041C7BC308D82A5698F3A8D0C38271AE35F8E9DBFBB6"
+    "94B5C803D89F7AE435DE236D525F54759B65E372FCD68EF20FA7111F"
+    "9E4AFF73"
+)
+RAOP_SRP_GENERATOR = "02"  # RFC5054-2048bit uses generator 2
+
+
+# ============================================================================
+# Base Pairing class
+# ============================================================================
+
+
+class AirPlayPairing:
+    """Base class for AirPlay pairing.
+
+    Handles both HAP (AirPlay 2) and RAOP (AirPlay 1) pairing protocols.
+    """
+
+    def __init__(
+        self,
+        address: str,
+        name: str,
+        protocol: StreamingProtocol,
+        logger: logging.Logger,
+        port: int | None = None,
+        device_id: str | None = None,
+    ) -> None:
+        """Initialize AirPlay pairing.
+
+        :param address: IP address of the device.
+        :param name: Display name of the device.
+        :param protocol: Streaming protocol (RAOP or AIRPLAY2).
+        :param logger: Logger instance.
+        :param port: Port number (default: 7000 for AirPlay 2, 5000 for RAOP).
+        :param device_id: Device identifier (DACP ID) - must match what cliap2 uses.
+        """
+        self.address = address
+        self.name = name
+        self.protocol = protocol
+        self.logger = logger
+        self.port = port or (7000 if protocol == StreamingProtocol.AIRPLAY2 else 5000)
+
+        # HTTP session
+        self._session: aiohttp.ClientSession | None = None
+        self._base_url: str = f"http://{address}:{self.port}"
+
+        # Common state
+        self._is_pairing: bool = False
+        self._srp_context: SRPContext | None = None
+        self._srp_session: SRPClientSession | None = None
+        self._session_key: bytes | None = None
+
+        # Client identifier (device_id) handling depends on protocol:
+        # - HAP (AirPlay 2): Uses DACP ID as string identifier (must match cliap2 pair-verify)
+        # - RAOP: Uses 8 random bytes (not the DACP ID) - credentials are self-contained
+        if protocol == StreamingProtocol.AIRPLAY2:
+            # For HAP, use DACP ID as the identifier (must match pair-verify)
+            if device_id:
+                self._client_id: bytes = device_id.encode()
+            else:
+                self._client_id = str(uuid.uuid4()).encode()
+        else:
+            # For RAOP, generate 8 random bytes for client_id
+            # The credentials format is client_id_hex:auth_secret_hex
+            self._client_id = os.urandom(8)
+
+        # Ed25519 keypair
+        self._client_private_key: Ed25519PrivateKey | None = None
+        self._client_public_key: bytes | None = None
+
+        # Server's public key
+        self._server_public_key: bytes | None = None
+
+    @property
+    def is_pairing(self) -> bool:
+        """Return True if a pairing session is in progress."""
+        return self._is_pairing
+
+    @property
+    def device_provides_pin(self) -> bool:
+        """Return True if the device displays the PIN."""
+        return True  # Both HAP and RAOP display PIN on device
+
+    @property
+    def protocol_name(self) -> str:
+        """Return human-readable protocol name."""
+        if self.protocol == StreamingProtocol.RAOP:
+            return "RAOP (AirPlay 1)"
+        return "AirPlay"
+
+    async def start_pairing(self) -> bool:
+        """Start the pairing process.
+
+        :return: True if device provides PIN (always True for AirPlay).
+        :raises PlayerCommandFailed: If device connection fails.
+        """
+        self.logger.info(
+            "Starting %s pairing with %s at %s:%d",
+            self.protocol_name,
+            self.name,
+            self.address,
+            self.port,
+        )
+
+        # Generate Ed25519 keypair
+        self._client_private_key = Ed25519PrivateKey.generate()
+        self._client_public_key = self._client_private_key.public_key().public_bytes(
+            encoding=serialization.Encoding.Raw,
+            format=serialization.PublicFormat.Raw,
+        )
+
+        # Create HTTP session
+        self._session = aiohttp.ClientSession()
+
+        try:
+            # Request PIN to be shown on device
+            async with self._session.post(
+                f"{self._base_url}/pair-pin-start",
+                timeout=aiohttp.ClientTimeout(total=10),
+            ) as resp:
+                if resp.status != 200:
+                    raise PlayerCommandFailed(f"Failed to start pairing: HTTP {resp.status}")
+
+            self._is_pairing = True
+            self.logger.info("Device %s is displaying PIN", self.name)
+
+            # SRP context will be created in finish_pairing when we have the PIN
+            return True
+
+        except aiohttp.ClientError as err:
+            await self.close()
+            raise PlayerCommandFailed(f"Connection failed: {err}") from err
+
+    async def finish_pairing(self, pin: str) -> str:
+        """Complete pairing with the provided PIN.
+
+        :param pin: 4-digit PIN from device screen.
+        :return: Credentials string for cliap2/cliraop.
+        :raises PlayerCommandFailed: If pairing fails.
+        """
+        if not self._session:
+            raise PlayerCommandFailed("Pairing not started")
+
+        try:
+            if self.protocol == StreamingProtocol.AIRPLAY2:
+                return await self._finish_hap_pairing(pin)
+            return await self._finish_raop_pairing(pin)
+        except PlayerCommandFailed:
+            raise
+        except Exception as err:
+            self.logger.exception("Pairing failed")
+            raise PlayerCommandFailed(f"Pairing failed: {err}") from err
+        finally:
+            await self.close()
+
+    # ========================================================================
+    # HAP (AirPlay 2) pairing implementation
+    # ========================================================================
+
+    async def _finish_hap_pairing(self, pin: str) -> str:
+        """Complete HAP pairing for AirPlay 2.
+
+        :param pin: 4-digit PIN.
+        :return: Credentials (192 hex chars).
+        """
+        if not self._session:
+            raise PlayerCommandFailed("Pairing not started")
+
+        self.logger.info("Completing HAP pairing with PIN")
+
+        # HAP headers required for pair-setup
+        hap_headers = {
+            "Content-Type": "application/octet-stream",
+            "X-Apple-HKP": "3",
+        }
+
+        # M1: Send method request (state=1, method=0 for pair-setup)
+        m1_data = tlv_encode(
+            [
+                (TLV_METHOD, bytes([0x00])),
+                (TLV_STATE, bytes([0x01])),
+            ]
+        )
+
+        async with self._session.post(
+            f"{self._base_url}/pair-setup",
+            data=m1_data,
+            headers=hap_headers,
+            timeout=aiohttp.ClientTimeout(total=30),
+        ) as resp:
+            if resp.status != 200:
+                raise PlayerCommandFailed(f"M1 failed: HTTP {resp.status}")
+            m2_data = await resp.read()
+
+        # Parse M2
+        m2 = tlv_decode(m2_data)
+        if TLV_ERROR in m2:
+            raise PlayerCommandFailed(f"Device error in M2: {m2[TLV_ERROR].hex()}")
+
+        salt = m2.get(TLV_SALT)
+        server_pk_srp = m2.get(TLV_PUBLIC_KEY)
+        if not salt or not server_pk_srp:
+            raise PlayerCommandFailed("Invalid M2: missing salt or public key")
+
+        # M3: SRP authentication - create context with password
+        # PIN is passed directly as string (not "Pair-Setup:PIN")
+        # Note: pyatv doesn't specify bits_random, uses default
+        self._srp_context = SRPContext(
+            username="Pair-Setup",
+            password=pin,
+            prime=HAP_SRP_PRIME_3072,
+            generator=HAP_SRP_GENERATOR,
+            hash_func=hashlib.sha512,
+        )
+        # Pass Ed25519 private key bytes as the SRP "a" value (random private exponent)
+        # This is what pyatv does - use the client's Ed25519 private key as the SRP private value
+        if not self._client_private_key:
+            raise PlayerCommandFailed("Client private key not initialized")
+        auth_private = self._client_private_key.private_bytes(
+            encoding=serialization.Encoding.Raw,
+            format=serialization.PrivateFormat.Raw,
+            encryption_algorithm=serialization.NoEncryption(),
+        )
+        self._srp_session = SRPClientSession(
+            self._srp_context, binascii.hexlify(auth_private).decode()
+        )
+
+        # Process with server's public key and salt (as hex strings)
+        self._srp_session.process(server_pk_srp.hex(), salt.hex())
+
+        # Get client's public key and proof
+        client_pk_srp = bytes.fromhex(self._srp_session.public)
+        client_proof = bytes.fromhex(self._srp_session.key_proof.decode("ascii"))
+
+        m3_data = tlv_encode(
+            [
+                (TLV_STATE, bytes([0x03])),
+                (TLV_PUBLIC_KEY, client_pk_srp),
+                (TLV_PROOF, client_proof),
+            ]
+        )
+
+        async with self._session.post(
+            f"{self._base_url}/pair-setup",
+            data=m3_data,
+            headers=hap_headers,
+            timeout=aiohttp.ClientTimeout(total=30),
+        ) as resp:
+            if resp.status != 200:
+                raise PlayerCommandFailed(f"M3 failed: HTTP {resp.status}")
+            m4_data = await resp.read()
+
+        # Parse M4
+        m4 = tlv_decode(m4_data)
+        if TLV_ERROR in m4:
+            raise PlayerCommandFailed(f"Device error in M4: {m4[TLV_ERROR].hex()}")
+
+        server_proof = m4.get(TLV_PROOF)
+        if not server_proof:
+            raise PlayerCommandFailed("Invalid M4: missing proof")
+
+        # Verify server proof
+        if not self._srp_session.verify_proof(server_proof.hex().encode("ascii")):
+            raise PlayerCommandFailed("Server proof verification failed")
+
+        # Get session key
+        self._session_key = bytes.fromhex(self._srp_session.key.decode("ascii"))
+
+        # M5: Send encrypted client info
+        await self._send_hap_m5()
+
+        # Generate credentials
+        return self._generate_hap_credentials()
+
+    async def _send_hap_m5(self) -> None:
+        """Send M5 with encrypted client info and receive M6."""
+        if (
+            not self._session_key
+            or not self._client_private_key
+            or not self._client_public_key
+            or not self._session
+        ):
+            raise PlayerCommandFailed("Invalid state for M5")
+
+        # HAP headers required for pair-setup
+        hap_headers = {
+            "Content-Type": "application/octet-stream",
+            "X-Apple-HKP": "3",
+        }
+
+        # Derive keys
+        enc_key = hkdf_derive(
+            self._session_key,
+            b"Pair-Setup-Encrypt-Salt",
+            b"Pair-Setup-Encrypt-Info",
+            32,
+        )
+        sign_key = hkdf_derive(
+            self._session_key,
+            b"Pair-Setup-Controller-Sign-Salt",
+            b"Pair-Setup-Controller-Sign-Info",
+            32,
+        )
+
+        # Sign device info
+        device_info = sign_key + self._client_id + self._client_public_key
+        signature = self._client_private_key.sign(device_info)
+
+        # Create and encrypt inner TLV
+        inner_tlv = tlv_encode(
+            [
+                (TLV_IDENTIFIER, self._client_id),
+                (TLV_PUBLIC_KEY, self._client_public_key),
+                (TLV_SIGNATURE, signature),
+            ]
+        )
+
+        cipher = ChaCha20Poly1305(enc_key)
+        # Nonce format: 4 zero bytes + 8-byte message identifier = 12 bytes
+        nonce = b"\x00\x00\x00\x00PS-Msg05"
+        encrypted = cipher.encrypt(nonce, inner_tlv, None)
+
+        # Send M5
+        m5_data = tlv_encode(
+            [
+                (TLV_STATE, bytes([0x05])),
+                (TLV_ENCRYPTED_DATA, encrypted),
+            ]
+        )
+
+        async with self._session.post(
+            f"{self._base_url}/pair-setup",
+            data=m5_data,
+            headers=hap_headers,
+            timeout=aiohttp.ClientTimeout(total=30),
+        ) as resp:
+            if resp.status != 200:
+                raise PlayerCommandFailed(f"M5 failed: HTTP {resp.status}")
+            m6_data = await resp.read()
+
+        # Parse M6
+        m6 = tlv_decode(m6_data)
+        if TLV_ERROR in m6:
+            raise PlayerCommandFailed(f"Device error in M6: {m6[TLV_ERROR].hex()}")
+
+        encrypted_data = m6.get(TLV_ENCRYPTED_DATA)
+        if not encrypted_data:
+            raise PlayerCommandFailed("Invalid M6: missing encrypted data")
+
+        # Decrypt M6
+        # Nonce format: 4 zero bytes + 8-byte message identifier = 12 bytes
+        nonce = b"\x00\x00\x00\x00PS-Msg06"
+        decrypted = cipher.decrypt(nonce, encrypted_data, None)
+
+        # Extract server's public key
+        inner = tlv_decode(decrypted)
+        self._server_public_key = inner.get(TLV_PUBLIC_KEY)
+        if not self._server_public_key:
+            raise PlayerCommandFailed("Invalid M6: missing server public key")
+
+    def _generate_hap_credentials(self) -> str:
+        """Generate HAP credentials for cliap2.
+
+        Format: client_private_key(128 hex) + server_public_key(64 hex) = 192 hex chars
+
+        :return: Credentials string.
+        """
+        if (
+            not self._client_private_key
+            or not self._server_public_key
+            or not self._client_public_key
+        ):
+            raise PlayerCommandFailed("Missing keys for credential generation")
+
+        # Get raw private key (32 bytes seed)
+        private_key_bytes = self._client_private_key.private_bytes(
+            encoding=serialization.Encoding.Raw,
+            format=serialization.PrivateFormat.Raw,
+            encryption_algorithm=serialization.NoEncryption(),
+        )
+
+        # Expand to 64-byte Ed25519 secret key format (seed + public_key)
+        if len(private_key_bytes) == 32:
+            private_key_bytes = private_key_bytes + self._client_public_key
+
+        if len(private_key_bytes) != 64 or len(self._server_public_key) != 32:
+            raise PlayerCommandFailed("Invalid key lengths")
+
+        return binascii.hexlify(private_key_bytes).decode("ascii") + binascii.hexlify(
+            self._server_public_key
+        ).decode("ascii")
+
+    # ========================================================================
+    # RAOP (AirPlay 1 legacy) pairing implementation
+    # ========================================================================
+
+    def _compute_raop_premaster_secret(
+        self,
+        user_id: str,
+        password: str,
+        salt: bytes,
+        client_private: bytes,
+        client_public: bytes,
+        server_public: bytes,
+    ) -> bytes:
+        """Compute RAOP SRP premaster secret S.
+
+        S = (B - k*v)^(a + u*x) mod N
+
+        :param user_id: Username (hex-encoded client_id).
+        :param password: PIN code.
+        :param salt: Salt from server.
+        :param client_private: Client private key (a) as bytes.
+        :param client_public: Client public key (A) as bytes.
+        :param server_public: Server public key (B) as bytes.
+        :return: Premaster secret S as bytes (padded to N length).
+        """
+        # Convert values to integers
+        n_bytes = bytes.fromhex(RAOP_SRP_PRIME_2048)
+        n_len = len(n_bytes)
+        n = int.from_bytes(n_bytes, "big")
+        g = int.from_bytes(bytes.fromhex(RAOP_SRP_GENERATOR), "big")
+
+        a = int.from_bytes(client_private, "big")
+        b_pub = int.from_bytes(server_public, "big")
+
+        # x = H(s | H(I : P))
+        inner_hash = hashlib.sha1(f"{user_id}:{password}".encode()).digest()
+        x = int.from_bytes(hashlib.sha1(salt + inner_hash).digest(), "big")
+
+        # k = H(N | PAD(g))
+        g_padded = bytes.fromhex(RAOP_SRP_GENERATOR).rjust(n_len, b"\x00")
+        k = int.from_bytes(hashlib.sha1(n_bytes + g_padded).digest(), "big")
+
+        # u = H(PAD(A) | PAD(B))
+        a_padded = client_public.rjust(n_len, b"\x00")
+        b_padded = server_public.rjust(n_len, b"\x00")
+        u = int.from_bytes(hashlib.sha1(a_padded + b_padded).digest(), "big")
+
+        # v = g^x mod N
+        v = pow(g, x, n)
+
+        # S = (B - k*v)^(a + u*x) mod N
+        s_int = pow(b_pub - k * v, a + u * x, n)
+
+        # Convert to bytes and pad to N length
+        s_bytes = s_int.to_bytes((s_int.bit_length() + 7) // 8, "big")
+        return s_bytes.rjust(n_len, b"\x00")
+
+    def _compute_raop_session_key(self, premaster_secret: bytes) -> bytes:
+        r"""Compute RAOP session key K from premaster secret S.
+
+        K = SHA1(S | \x00\x00\x00\x00) | SHA1(S | \x00\x00\x00\x01)
+
+        This produces a 40-byte key (two SHA1 hashes concatenated).
+
+        :param premaster_secret: The SRP premaster secret S.
+        :return: 40-byte session key K.
+        """
+        k1 = hashlib.sha1(premaster_secret + b"\x00\x00\x00\x00").digest()
+        k2 = hashlib.sha1(premaster_secret + b"\x00\x00\x00\x01").digest()
+        return k1 + k2
+
+    def _compute_raop_m1(
+        self, user_id: str, salt: bytes, client_pk: bytes, server_pk: bytes, session_key: bytes
+    ) -> bytes:
+        """Compute RAOP SRP M1 proof with padding for A and B (but not g).
+
+        M1 = H(H(N) XOR H(g) | H(I) | s | PAD(A) | PAD(B) | K)
+
+        Note: g is NOT padded, but A and B ARE padded to N length.
+        K is 40 bytes (from _compute_raop_session_key).
+
+        :param user_id: Username (hex-encoded client_id).
+        :param salt: Salt bytes from server.
+        :param client_pk: Client public key (A).
+        :param server_pk: Server public key (B).
+        :param session_key: Session key (K) - 40 bytes.
+        :return: M1 proof bytes (20 bytes for SHA-1).
+        """
+        n_bytes = bytes.fromhex(RAOP_SRP_PRIME_2048)
+        n_len = len(n_bytes)
+        g_bytes = bytes.fromhex(RAOP_SRP_GENERATOR)
+
+        # H(N) XOR H(g) - g is NOT padded
+        h_n = hashlib.sha1(n_bytes).digest()
+        h_g = hashlib.sha1(g_bytes).digest()
+        h_n_xor_h_g = bytes(a ^ b for a, b in zip(h_n, h_g, strict=True))
+
+        # H(I) - hash of username
+        h_i = hashlib.sha1(user_id.encode("ascii")).digest()
+
+        # PAD A and B to N length
+        a_padded = client_pk.rjust(n_len, b"\x00")
+        b_padded = server_pk.rjust(n_len, b"\x00")
+
+        # M1 = H(H(N) XOR H(g) | H(I) | s | PAD(A) | PAD(B) | K)
+        m1_data = h_n_xor_h_g + h_i + salt + a_padded + b_padded + session_key
+        return hashlib.sha1(m1_data).digest()
+
+    def _compute_raop_client_public(self, auth_secret: bytes) -> bytes:
+        """Compute RAOP SRP client public key A = g^a mod N.
+
+        :param auth_secret: 32-byte random secret (used as SRP private key a).
+        :return: Client public key A as bytes.
+        """
+        n_bytes = bytes.fromhex(RAOP_SRP_PRIME_2048)
+        n = int.from_bytes(n_bytes, "big")
+        g = int.from_bytes(bytes.fromhex(RAOP_SRP_GENERATOR), "big")
+        a = int.from_bytes(auth_secret, "big")
+        a_pub = pow(g, a, n)
+        return a_pub.to_bytes((a_pub.bit_length() + 7) // 8, "big")
+
+    async def _finish_raop_pairing(self, pin: str) -> str:
+        """Complete RAOP pairing for AirPlay 1.
+
+        :param pin: 4-digit PIN.
+        :return: Credentials (client_id:auth_secret format).
+        """
+        if not self._session:
+            raise PlayerCommandFailed("Pairing not started")
+
+        self.logger.info("Completing RAOP pairing with PIN")
+
+        # Generate 32-byte auth secret
+        auth_secret = os.urandom(32)
+
+        # Derive Ed25519 public key from auth secret
+        # For RAOP, we use the auth_secret as the Ed25519 seed
+        from cryptography.hazmat.primitives.asymmetric.ed25519 import (  # noqa: PLC0415
+            Ed25519PrivateKey as Ed25519Key,
+        )
+
+        auth_private_key = Ed25519Key.from_private_bytes(auth_secret)
+        auth_public_key = auth_private_key.public_key().public_bytes(
+            encoding=serialization.Encoding.Raw,
+            format=serialization.PublicFormat.Raw,
+        )
+
+        # Step 1: Send device ID and method
+        user_id = self._client_id.hex().upper()
+        step1_plist = {
+            "method": "pin",
+            "user": user_id,
+        }
+
+        async with self._session.post(
+            f"{self._base_url}/pair-setup-pin",
+            data=plistlib.dumps(step1_plist, fmt=plistlib.FMT_BINARY),
+            headers={"Content-Type": "application/x-apple-binary-plist"},
+            timeout=aiohttp.ClientTimeout(total=30),
+        ) as resp:
+            if resp.status != 200:
+                raise PlayerCommandFailed(f"RAOP step 1 failed: HTTP {resp.status}")
+            step1_response = plistlib.loads(await resp.read())
+
+        # Get salt and server public key
+        salt, server_pk = step1_response.get("salt"), step1_response.get("pk")
+        if not salt or not server_pk:
+            raise PlayerCommandFailed("Invalid RAOP step 1 response")
+
+        # Step 2: SRP authentication
+        # Apple uses a custom K formula: K = SHA1(S|0000) | SHA1(S|0001) (40 bytes)
+        client_pk = self._compute_raop_client_public(auth_secret)
+        premaster_secret = self._compute_raop_premaster_secret(
+            user_id, pin, salt, auth_secret, client_pk, server_pk
+        )
+        session_key = self._compute_raop_session_key(premaster_secret)
+        client_proof = self._compute_raop_m1(user_id, salt, client_pk, server_pk, session_key)
+
+        step2_plist = {
+            "pk": client_pk,
+            "proof": client_proof,
+        }
+
+        async with self._session.post(
+            f"{self._base_url}/pair-setup-pin",
+            data=plistlib.dumps(step2_plist, fmt=plistlib.FMT_BINARY),
+            headers={"Content-Type": "application/x-apple-binary-plist"},
+            timeout=aiohttp.ClientTimeout(total=30),
+        ) as resp:
+            if resp.status != 200:
+                raise PlayerCommandFailed(f"RAOP step 2 failed: HTTP {resp.status}")
+            step2_response = plistlib.loads(await resp.read())
+
+        # Verify server proof M2 exists (verification optional)
+        server_proof = step2_response.get("proof")
+        if not server_proof:
+            raise PlayerCommandFailed("RAOP server did not return proof")
+        self._session_key = session_key
+
+        # Step 3: Encrypt and send auth public key using AES-GCM
+        # Derive AES key and IV from session key K (40 bytes)
+        aes_key = hashlib.sha512(b"Pair-Setup-AES-Key" + session_key).digest()[:16]
+        aes_iv = bytearray(hashlib.sha512(b"Pair-Setup-AES-IV" + session_key).digest()[:16])
+        aes_iv[-1] = (aes_iv[-1] + 1) % 256  # Increment last byte
+
+        # Encrypt auth public key with AES-GCM
+        cipher = Cipher(algorithms.AES(aes_key), modes.GCM(bytes(aes_iv)))
+        encryptor = cipher.encryptor()
+        encrypted_pk = encryptor.update(auth_public_key) + encryptor.finalize()
+        tag = encryptor.tag
+
+        step3_plist = {
+            "epk": encrypted_pk,
+            "authTag": tag,
+        }
+
+        async with self._session.post(
+            f"{self._base_url}/pair-setup-pin",
+            data=plistlib.dumps(step3_plist, fmt=plistlib.FMT_BINARY),
+            headers={"Content-Type": "application/x-apple-binary-plist"},
+            timeout=aiohttp.ClientTimeout(total=30),
+        ) as resp:
+            if resp.status != 200:
+                raise PlayerCommandFailed(f"RAOP step 3 failed: HTTP {resp.status}")
+
+        # Return credentials in cliraop format: client_id:auth_secret
+        return f"{self._client_id.hex()}:{auth_secret.hex()}"
+
+    # ========================================================================
+    # Cleanup
+    # ========================================================================
+
+    async def close(self) -> None:
+        """Clean up resources."""
+        self._is_pairing = False
+        if self._session:
+            await self._session.close()
+            self._session = None
+        self._srp_context = None
+        self._srp_session = None
+        self._session_key = None
index 28ee5a2f1d8ebef8de96b5ca5d8b5f16d7e56406..a2beb401f21f7120ce9c9b4493c53fe7853cb696 100644 (file)
@@ -8,7 +8,6 @@ from typing import TYPE_CHECKING, cast
 
 from music_assistant_models.config_entries import ConfigEntry, ConfigValueOption, ConfigValueType
 from music_assistant_models.enums import ConfigEntryType, PlaybackState, PlayerFeature, PlayerType
-from propcache import under_cached_property as cached_property
 
 from music_assistant.constants import (
     CONF_ENTRY_DEPRECATED_EQ_BASS,
@@ -26,14 +25,16 @@ from .constants import (
     AIRPLAY_FLOW_PCM_FORMAT,
     CACHE_CATEGORY_PREV_VOLUME,
     CONF_ACTION_FINISH_PAIRING,
+    CONF_ACTION_RESET_PAIRING,
     CONF_ACTION_START_PAIRING,
+    CONF_AIRPLAY_CREDENTIALS,
     CONF_AIRPLAY_PROTOCOL,
     CONF_ALAC_ENCODE,
-    CONF_AP_CREDENTIALS,
     CONF_ENCRYPTION,
     CONF_IGNORE_VOLUME,
     CONF_PAIRING_PIN,
     CONF_PASSWORD,
+    CONF_RAOP_CREDENTIALS,
     FALLBACK_VOLUME,
     RAOP_DISCOVERY_TYPE,
     StreamingProtocol,
@@ -42,12 +43,15 @@ from .helpers import (
     get_primary_ip_address_from_zeroconf,
     is_airplay2_preferred_model,
     is_broken_airplay_model,
+    player_id_to_mac_address,
 )
 from .stream_session import AirPlayStreamSession
 
 if TYPE_CHECKING:
     from zeroconf.asyncio import AsyncServiceInfo
 
+    from .pairing import AirPlayPairing
+    from .protocols._protocol import AirPlayProtocol
     from .protocols.airplay2 import AirPlay2Stream
     from .protocols.raop import RaopStream
     from .provider import AirPlayProvider
@@ -89,6 +93,8 @@ class AirPlayPlayer(Player):
         self.stream: RaopStream | AirPlay2Stream | None = None
         self.last_command_sent = 0.0
         self._lock = asyncio.Lock()
+        self._active_pairing: AirPlayPairing | None = None
+        self._transitioning = False  # Set during stream replacement to ignore stale DACP messages
         # Set (static) player attributes
         self._attr_type = PlayerType.PLAYER
         self._attr_name = display_name
@@ -97,6 +103,7 @@ class AirPlayPlayer(Player):
             model=model,
             manufacturer=manufacturer,
             ip_address=address,
+            mac_address=player_id_to_mac_address(player_id),
         )
         self._attr_supported_features = {
             PlayerFeature.PAUSE,
@@ -108,22 +115,19 @@ class AirPlayPlayer(Player):
         self._attr_can_group_with = {provider.instance_id}
         self._attr_enabled_by_default = not is_broken_airplay_model(manufacturer, model)
 
-    @cached_property
+    @property
     def protocol(self) -> StreamingProtocol:
-        """Get the streaming protocol to use for this player."""
-        protocol_value = self.config.get_value(CONF_AIRPLAY_PROTOCOL)
-        # Convert integer value to StreamingProtocol enum
-        if protocol_value == 2:
-            return StreamingProtocol.AIRPLAY2
-        return StreamingProtocol.RAOP
+        """Get the streaming protocol to use/prefer for this player."""
+        preferred_option = cast("int", self.config.get_value(CONF_AIRPLAY_PROTOCOL))
+        return self._get_protocol_for_config_value(preferred_option)
 
     @property
     def available(self) -> bool:
         """Return if the player is currently available."""
         if self._requires_pairing():
-            # check if we have credentials stored
-            credentials = self.config.get_value(CONF_AP_CREDENTIALS)
-            if not credentials:
+            # check if we have credentials stored for the current protocol
+            creds_key = self._get_credentials_key(self.protocol)
+            if not self.config.get_value(creds_key):
                 return False
         return super().available
 
@@ -133,7 +137,7 @@ class AirPlayPlayer(Player):
         if not self.stream or not self.stream.session:
             return super().corrected_elapsed_time or 0.0
         session = self.stream.session
-        elapsed = time.time() - session.last_stream_started - session.total_pause_time
+        elapsed = time.time() - session.start_time - session.total_pause_time
         if session.last_paused is not None:
             current_pause = time.time() - session.last_paused
             elapsed -= current_pause
@@ -168,21 +172,20 @@ class AirPlayPlayer(Player):
                 key=CONF_AIRPLAY_PROTOCOL,
                 type=ConfigEntryType.INTEGER,
                 required=False,
-                label="AirPlay version to use for streaming",
+                label="AirPlay protocol version to use for streaming",
                 description="AirPlay version 1 protocol uses RAOP.\n"
                 "AirPlay version 2 is an extension of RAOP.\n"
                 "Some newer devices do not fully support RAOP and "
-                "will only work with AirPlay version 2.",
+                "will only work with AirPlay version 2, "
+                "while older devices may only support RAOP.\n\n"
+                "In most cases the default automatic selection will work fine.",
                 category="airplay",
                 options=[
-                    ConfigValueOption("AirPlay 1 (RAOP)", StreamingProtocol.RAOP.value),
-                    ConfigValueOption("AirPlay 2", StreamingProtocol.AIRPLAY2.value),
+                    ConfigValueOption("Automatically select", 0),
+                    ConfigValueOption("Prefer AirPlay 1 (RAOP)", StreamingProtocol.RAOP.value),
+                    ConfigValueOption("Prefer AirPlay 2", StreamingProtocol.AIRPLAY2.value),
                 ],
-                default_value=StreamingProtocol.AIRPLAY2.value
-                if is_airplay2_preferred_model(
-                    self.device_info.manufacturer, self.device_info.model
-                )
-                else StreamingProtocol.RAOP.value,
+                default_value=0,
             ),
             ConfigEntry(
                 key=CONF_ENCRYPTION,
@@ -194,6 +197,7 @@ class AirPlayPlayer(Player):
                 category="airplay",
                 depends_on=CONF_AIRPLAY_PROTOCOL,
                 depends_on_value=StreamingProtocol.RAOP.value,
+                hidden=self.protocol != StreamingProtocol.RAOP,
             ),
             ConfigEntry(
                 key=CONF_ALAC_ENCODE,
@@ -205,6 +209,7 @@ class AirPlayPlayer(Player):
                 category="airplay",
                 depends_on=CONF_AIRPLAY_PROTOCOL,
                 depends_on_value=StreamingProtocol.RAOP.value,
+                hidden=self.protocol != StreamingProtocol.RAOP,
             ),
             CONF_ENTRY_SYNC_ADJUST,
             ConfigEntry(
@@ -233,8 +238,10 @@ class AirPlayPlayer(Player):
                     "Enable this option to ignore these reports."
                 ),
                 category="airplay",
+                # TODO: remove depends_on when DACP support is added for AirPlay2
                 depends_on=CONF_AIRPLAY_PROTOCOL,
                 depends_on_value=StreamingProtocol.RAOP.value,
+                hidden=self.protocol != StreamingProtocol.RAOP,
             ),
         ]
 
@@ -255,49 +262,54 @@ class AirPlayPlayer(Player):
         # Mac devices (including iMac, MacBook, Mac mini, Mac Pro, Mac Studio)
         return model.startswith(("Mac", "iMac"))
 
+    def _get_credentials_key(self, protocol: StreamingProtocol) -> str:
+        """Get the config key for credentials for given protocol."""
+        if protocol == StreamingProtocol.RAOP:
+            return CONF_RAOP_CREDENTIALS
+        return CONF_AIRPLAY_CREDENTIALS
+
+    def _get_protocol_for_config_value(self, config_option: int) -> StreamingProtocol:
+        if config_option == StreamingProtocol.AIRPLAY2 and self.airplay_discovery_info:
+            return StreamingProtocol.AIRPLAY2
+        if config_option == StreamingProtocol.RAOP and self.raop_discovery_info:
+            return StreamingProtocol.RAOP
+        # automatic selection
+        if self.airplay_discovery_info and is_airplay2_preferred_model(
+            self.device_info.manufacturer, self.device_info.model
+        ):
+            return StreamingProtocol.AIRPLAY2
+        return StreamingProtocol.RAOP
+
     def _get_pairing_config_entries(
         self, values: dict[str, ConfigValueType] | None
     ) -> list[ConfigEntry]:
-        """Return pairing config entries for Apple TV and macOS devices.
+        """
+        Return pairing config entries for Apple TV and macOS devices.
 
-        Uses cliraop for AirPlay/RAOP pairing.
+        Uses native pairing for both AirPlay 2 (HAP) and RAOP protocols.
         """
         entries: list[ConfigEntry] = []
 
-        # Check if we have credentials stored
-        if values and (creds := values.get(CONF_AP_CREDENTIALS)):
-            credentials = str(creds)
+        # Determine protocol name for UI
+        conf_protocol: int = 0
+        if values and (val := values.get(CONF_AIRPLAY_PROTOCOL)):
+            conf_protocol = cast("int", val)
         else:
-            credentials = str(self.config.get_value(CONF_AP_CREDENTIALS) or "")
-        has_credentials = bool(credentials)
-
-        if not has_credentials:
-            # Show pairing instructions and start button
-            if not self.stream and self.protocol == StreamingProtocol.RAOP:
-                # ensure we have a stream instance to track pairing state
-                from .protocols.raop import RaopStream  # noqa: PLC0415
-
-                self.stream = RaopStream(self)
-            elif not self.stream and self.protocol == StreamingProtocol.AIRPLAY2:
-                # ensure we have a stream instance to track pairing state
-                from .protocols.airplay2 import AirPlay2Stream  # noqa: PLC0415
-
-                self.stream = AirPlay2Stream(self)
-            if self.stream and not self.stream.supports_pairing:
-                # TEMP until ap2 pairing is implemented
-                return [
-                    ConfigEntry(
-                        key="pairing_unsupported",
-                        type=ConfigEntryType.ALERT,
-                        label=(
-                            "This device requires pairing but it is not supported "
-                            "by the current Music Assistant AirPlay implementation."
-                        ),
-                    )
-                ]
+            conf_protocol = cast("int", self.config.get_value(CONF_AIRPLAY_PROTOCOL, 0) or 0)
+        protocol = self._get_protocol_for_config_value(conf_protocol)
+        protocol_name = "RAOP" if protocol == StreamingProtocol.RAOP else "AirPlay"
+        protocol_key = (
+            CONF_RAOP_CREDENTIALS
+            if protocol == StreamingProtocol.RAOP
+            else CONF_AIRPLAY_CREDENTIALS
+        )
+        has_creds_for_current_protocol = (
+            values.get(protocol_key) if values else self.config.get_value(protocol_key)
+        )
 
+        if not has_creds_for_current_protocol:
             # If pairing was started, show PIN entry
-            if self.stream and self.stream.is_pairing:
+            if self._active_pairing and self._active_pairing.is_pairing:
                 entries.append(
                     ConfigEntry(
                         key=CONF_PAIRING_PIN,
@@ -310,17 +322,18 @@ class AirPlayPlayer(Player):
                     ConfigEntry(
                         key=CONF_ACTION_FINISH_PAIRING,
                         type=ConfigEntryType.ACTION,
-                        label="Complete the pairing process with the PIN",
+                        label=f"Complete {protocol_name} pairing with the PIN",
                         action=CONF_ACTION_FINISH_PAIRING,
                     )
                 )
             else:
+                # Show pairing instructions and start button
                 entries.append(
                     ConfigEntry(
                         key="pairing_instructions",
                         type=ConfigEntryType.LABEL,
                         label=(
-                            "This device requires pairing before it can be used. "
+                            f"This device requires {protocol_name} pairing before it can be used. "
                             "Click the button below to start the pairing process."
                         ),
                     )
@@ -329,7 +342,7 @@ class AirPlayPlayer(Player):
                     ConfigEntry(
                         key=CONF_ACTION_START_PAIRING,
                         type=ConfigEntryType.ACTION,
-                        label="Start the AirPlay pairing process",
+                        label=f"Start {protocol_name} pairing",
                         action=CONF_ACTION_START_PAIRING,
                     )
                 )
@@ -339,50 +352,86 @@ class AirPlayPlayer(Player):
                 ConfigEntry(
                     key="pairing_status",
                     type=ConfigEntryType.LABEL,
-                    label="Device is paired and ready to use.",
+                    label=f"Device is paired ({protocol_name}) and ready to use.",
+                )
+            )
+            # Add reset pairing button
+            entries.append(
+                ConfigEntry(
+                    key=CONF_ACTION_RESET_PAIRING,
+                    type=ConfigEntryType.ACTION,
+                    label=f"Reset {protocol_name} pairing",
+                    action=CONF_ACTION_RESET_PAIRING,
                 )
             )
 
         # Store credentials (hidden from UI)
-        entries.append(
-            ConfigEntry(
-                key=CONF_AP_CREDENTIALS,
-                type=ConfigEntryType.SECURE_STRING,
-                label="AirPlay Credentials",
-                default_value=credentials,
-                value=credentials,
-                required=False,
-                hidden=True,
+        for protocol in (StreamingProtocol.RAOP, StreamingProtocol.AIRPLAY2):
+            conf_key = self._get_credentials_key(protocol)
+            entries.append(
+                ConfigEntry(
+                    key=conf_key,
+                    type=ConfigEntryType.SECURE_STRING,
+                    label=conf_key,
+                    default_value=None,
+                    value=values.get(conf_key) if values else None,
+                    required=False,
+                    hidden=True,
+                )
             )
-        )
-
         return entries
 
     async def _handle_pairing_action(
         self, action: str, values: dict[str, ConfigValueType] | None
     ) -> None:
-        """Handle pairing actions using the configured protocol."""
-        if not self.stream and self.protocol == StreamingProtocol.RAOP:
-            # ensure we have a stream instance to track pairing state
-            from .protocols.raop import RaopStream  # noqa: PLC0415
+        """
+        Handle pairing actions.
 
-            self.stream = RaopStream(self)
-        elif not self.stream and self.protocol == StreamingProtocol.AIRPLAY2:
-            # ensure we have a stream instance to track pairing state
-            from .protocols.airplay2 import AirPlay2Stream  # noqa: PLC0415
+        Uses native pairing for both AirPlay 2 (HAP) and RAOP protocols.
+        Both produce credentials compatible with cliap2/cliraop respectively.
+        """
+        conf_protocol: int = 0
+        if values and (val := values.get(CONF_AIRPLAY_PROTOCOL)):
+            conf_protocol = cast("int", val)
+        else:
+            conf_protocol = cast("int", self.config.get_value(CONF_AIRPLAY_PROTOCOL, 0) or 0)
+        protocol = self._get_protocol_for_config_value(conf_protocol)
+        protocol_name = "RAOP" if protocol == StreamingProtocol.RAOP else "AirPlay"
 
-            self.stream = AirPlay2Stream(self)
         if action == CONF_ACTION_START_PAIRING:
-            if self.stream and self.stream.is_pairing:
+            if self._active_pairing and self._active_pairing.is_pairing:
                 self.logger.warning("Pairing process already in progress for %s", self.display_name)
                 return
-            self.logger.info("Started AirPlay pairing for %s", self.display_name)
-            if self.stream:
-                await self.stream.start_pairing()
+
+            self.logger.info("Starting %s pairing for %s", protocol_name, self.display_name)
+
+            from .pairing import AirPlayPairing  # noqa: PLC0415
+
+            # Determine port based on protocol
+            # Note: For Apple devices, pairing always happens on the AirPlay port (7000)
+            # even when streaming will use RAOP. The RAOP port (5000) is only for streaming.
+            port: int | None = None
+            if self.airplay_discovery_info:
+                port = self.airplay_discovery_info.port or 7000
+            elif self.raop_discovery_info:
+                # Fallback for devices without AirPlay service
+                port = self.raop_discovery_info.port or 5000
+            # Get the DACP ID from the provider - must match what cliap2 uses
+            provider = cast("AirPlayProvider", self.provider)
+            device_id = provider.dacp_id
+
+            self._active_pairing = AirPlayPairing(
+                address=self.address,
+                name=self.display_name,
+                protocol=protocol,
+                logger=self.logger,
+                port=port,
+                device_id=device_id,
+            )
+            await self._active_pairing.start_pairing()
 
         elif action == CONF_ACTION_FINISH_PAIRING:
             if not values:
-                # guard
                 return
 
             pin = values.get(CONF_PAIRING_PIN)
@@ -390,17 +439,24 @@ class AirPlayPlayer(Player):
                 self.logger.warning("No PIN provided for pairing")
                 return
 
-            if self.stream:
-                credentials = await self.stream.finish_pairing(pin=str(pin))
-            else:
+            if not self._active_pairing:
+                self.logger.warning("No active pairing session for %s", self.display_name)
                 return
 
-            values[CONF_AP_CREDENTIALS] = credentials
+            credentials = await self._active_pairing.finish_pairing(pin=str(pin))
+            self._active_pairing = None
 
-            self.logger.info(
-                "Finished AirPlay pairing for %s",
-                self.display_name,
-            )
+            # Store credentials with the protocol-specific key
+            cred_key = self._get_credentials_key(protocol)
+            values[cred_key] = credentials
+
+            self.logger.info("Finished %s pairing for %s", protocol_name, self.display_name)
+
+        elif action == CONF_ACTION_RESET_PAIRING:
+            cred_key = self._get_credentials_key(protocol)
+            self.logger.info("Resetting %s pairing for %s", protocol_name, self.display_name)
+            if values is not None:
+                values[cred_key] = None
 
     async def stop(self) -> None:
         """Send STOP command to player."""
@@ -414,7 +470,7 @@ class AirPlayPlayer(Player):
         """Send PLAY (unpause) command to player."""
         async with self._lock:
             if self.stream and self.stream.running:
-                await self.stream.send_cli_command("ACTION=PLAY\n")
+                await self.stream.send_cli_command("ACTION=PLAY")
 
     async def pause(self) -> None:
         """Send PAUSE command to player."""
@@ -427,7 +483,7 @@ class AirPlayPlayer(Player):
         async with self._lock:
             if not self.stream or not self.stream.running:
                 return
-            await self.stream.send_cli_command("ACTION=PAUSE\n")
+            await self.stream.send_cli_command("ACTION=PAUSE")
 
     async def play_media(self, media: PlayerMedia) -> None:
         """Handle PLAY MEDIA on given player."""
@@ -436,29 +492,28 @@ class AirPlayPlayer(Player):
             raise RuntimeError("Player is synced")
         self._attr_current_media = media
 
+        # Always stop any existing stream
+        if self.stream and self.stream.running and self.stream.session:
+            # Set transitioning flag to ignore stale DACP messages (like prevent-playback)
+            self._transitioning = True
+            # Force stop the session (to speed up stopping)
+            await self.stream.session.stop(force=True)
+            self.stream = None
+
         # select audio source
         audio_source = self.mass.streams.get_stream(media, AIRPLAY_FLOW_PCM_FORMAT)
 
-        # if an existing stream session is running, we could replace it with the new stream
-        if self.stream and self.stream.running:
-            # check if we need to replace the stream
-            if self.stream.prevent_playback:
-                # player is in prevent playback mode, we need to stop the stream
-                await self.stop()
-            elif self.stream.session:
-                await self.stream.session.replace_stream(audio_source)
-                return
-
         # setup StreamSession for player (and its sync childs if any)
         sync_clients = self._get_sync_clients()
         provider = cast("AirPlayProvider", self.provider)
         stream_session = AirPlayStreamSession(provider, sync_clients, AIRPLAY_FLOW_PCM_FORMAT)
         await stream_session.start(audio_source)
+        self._transitioning = False
 
     async def volume_set(self, volume_level: int) -> None:
         """Send VOLUME_SET command to given player."""
         if self.stream and self.stream.running:
-            await self.stream.send_cli_command(f"VOLUME={volume_level}\n")
+            await self.stream.send_cli_command(f"VOLUME={volume_level}")
         self._attr_volume_level = volume_level
         self.update_state()
         # store last state in cache
@@ -593,9 +648,21 @@ class AirPlayPlayer(Player):
         self.update_state()
 
     def set_state_from_stream(
-        self, state: PlaybackState | None = None, elapsed_time: float | None = None
+        self,
+        state: PlaybackState | None = None,
+        elapsed_time: float | None = None,
+        stream: AirPlayProtocol | None = None,
     ) -> None:
-        """Set the playback state from stream (RAOP or AirPlay2)."""
+        """Set the playback state from stream (RAOP or AirPlay2).
+
+        :param state: New playback state (or None to keep current).
+        :param elapsed_time: New elapsed time (or None to keep current).
+        :param stream: The stream instance sending this update (for validation).
+        """
+        # Ignore state updates from old/stale streams
+        if stream is not None and stream != self.stream:
+            return
+
         if state is not None:
             prev_state = self._attr_playback_state
             self._attr_playback_state = state
@@ -620,6 +687,9 @@ class AirPlayPlayer(Player):
             if self.stream.running and self.stream.session:
                 self.mass.create_task(self.stream.session.stop())
             self.stream = None
+        if self._active_pairing:
+            await self._active_pairing.close()
+            self._active_pairing = None
 
     def _get_sync_clients(self) -> list[AirPlayPlayer]:
         """Get all sync clients for a player."""
index 5f3afefddfcb6a9510afa9792684ddeb1ae96ff6..fe7dced64bedffa61af5aad38e964565f536bb43 100644 (file)
@@ -2,16 +2,16 @@
 
 from __future__ import annotations
 
+import asyncio
 import time
 from abc import ABC, abstractmethod
-from random import randint
 from typing import TYPE_CHECKING
 
 from music_assistant_models.enums import PlaybackState
 
-from music_assistant.constants import VERBOSE_LOG_LEVEL
 from music_assistant.helpers.named_pipe import AsyncNamedPipeWriter
 from music_assistant.providers.airplay.constants import AIRPLAY_PCM_FORMAT
+from music_assistant.providers.airplay.helpers import generate_active_remote_id
 
 if TYPE_CHECKING:
     from music_assistant_models.player import PlayerMedia
@@ -33,8 +33,6 @@ class AirPlayProtocol(ABC):
 
     # the pcm audio format used for streaming to this protocol
     pcm_format = AIRPLAY_PCM_FORMAT
-    supports_pairing = False  # whether this protocol supports pairing
-    is_pairing: bool = False  # whether this protocol instance is in pairing mode
 
     def __init__(
         self,
@@ -49,20 +47,17 @@ class AirPlayProtocol(ABC):
         self.mass = player.provider.mass
         self.player = player
         self.logger = player.provider.logger.getChild(f"protocol.{self.__class__.__name__}")
-        # Generate unique ID to prevent race conditions with named pipes
-        self.active_remote_id: str = str(randint(1000, 8000))
+        mac_address = self.player.device_info.mac_address or self.player.player_id
+        self.active_remote_id: str = generate_active_remote_id(mac_address)
         self.prevent_playback: bool = False
         self._cli_proc: AsyncProcess | None = None
-        self.audio_pipe = AsyncNamedPipeWriter(
-            f"/tmp/{self.player.protocol.value}-{self.player.player_id}-{self.active_remote_id}-audio",  # noqa: S108
-        )
         self.commands_pipe = AsyncNamedPipeWriter(
             f"/tmp/{self.player.protocol.value}-{self.player.player_id}-{self.active_remote_id}-cmd",  # noqa: S108
         )
-        # State tracking
         self._stopped = False
         self._total_bytes_sent = 0
         self._stream_bytes_sent = 0
+        self._connected = asyncio.Event()
 
     @property
     def running(self) -> bool:
@@ -71,35 +66,40 @@ class AirPlayProtocol(ABC):
 
     @abstractmethod
     async def start(self, start_ntp: int) -> None:
-        """Initialize streaming process for the player.
+        """Start the CLI process.
 
-        Args:
-            start_ntp: NTP timestamp to start streaming
+        :param start_ntp: NTP timestamp to start streaming.
         """
 
-    async def stop(self) -> None:
-        """Stop playback and cleanup."""
-        # Send stop command before setting _stopped flag
-        await self.send_cli_command("ACTION=STOP")
-        self._stopped = True
-
-        # Close the CLI process (wait for it to terminate)
-        if self._cli_proc and not self._cli_proc.closed:
-            await self._cli_proc.close()
+    async def wait_for_connection(self) -> None:
+        """Wait for device connection to be established."""
+        if not self._cli_proc:
+            return
+        await asyncio.wait_for(self._connected.wait(), timeout=10)
+        # repeat sending the volume level to the player because some players seem
+        # to ignore it the first time
+        # https://github.com/music-assistant/support/issues/3330
+        self.mass.call_later(2, self.send_cli_command(f"VOLUME={self.player.volume_level}"))
 
-        self.player.set_state_from_stream(state=PlaybackState.IDLE, elapsed_time=0)
+    async def stop(self, force: bool = False) -> None:
+        """
+        Stop playback and cleanup.
 
-        # Cleanup named pipes
-        await self.audio_pipe.remove()
+        :param force: If True, immediately kill the process without graceful shutdown.
+        """
+        # always send stop command first
+        await self.send_cli_command("ACTION=STOP")
+        if self._cli_proc:
+            await self._cli_proc.write_eof()
+        self._stopped = True
         await self.commands_pipe.remove()
-
-    async def start_pairing(self) -> None:
-        """Start pairing process for this protocol (if supported)."""
-        raise NotImplementedError("Pairing not implemented for this protocol")
-
-    async def finish_pairing(self, pin: str) -> str:
-        """Finish pairing process with given PIN (if supported)."""
-        raise NotImplementedError("Pairing not implemented for this protocol")
+        if force:
+            if self._cli_proc and not self._cli_proc.closed:
+                await self._cli_proc.kill()
+        elif self._cli_proc and not self._cli_proc.closed:
+            await self._cli_proc.close()
+        if not force:
+            self.player.set_state_from_stream(state=PlaybackState.IDLE, elapsed_time=0)
 
     async def send_cli_command(self, command: str) -> None:
         """Send an interactive command to the running CLI binary."""
@@ -107,9 +107,9 @@ class AirPlayProtocol(ABC):
             return
         if not self.commands_pipe:
             return
-
-        self.player.logger.log(VERBOSE_LOG_LEVEL, "sending command %s", command)
         self.player.last_command_sent = time.time()
+        if not command.endswith("\n"):
+            command += "\n"
         await self.commands_pipe.write(command.encode("utf-8"))
 
     async def send_metadata(self, progress: int | None, metadata: PlayerMedia | None) -> None:
@@ -126,6 +126,6 @@ class AirPlayProtocol(ABC):
             await self.send_cli_command(cmd)
             # get image
             if metadata.image_url:
-                await self.send_cli_command(f"ARTWORK={metadata.image_url}\n")
+                await self.send_cli_command(f"ARTWORK={metadata.image_url}")
         if progress is not None:
-            await self.send_cli_command(f"PROGRESS={progress}\n")
+            await self.send_cli_command(f"PROGRESS={progress}")
index 19dd639519dc233d8592ef90ca59af6c0a7b088c..ab6896e32fe10a385af78aa00b3b97be568e8696 100644 (file)
@@ -4,28 +4,30 @@ from __future__ import annotations
 
 import asyncio
 import logging
+from typing import TYPE_CHECKING, cast
 
 from music_assistant_models.enums import PlaybackState
-from music_assistant_models.errors import PlayerCommandFailed
 
 from music_assistant.constants import CONF_SYNC_ADJUST, VERBOSE_LOG_LEVEL
 from music_assistant.helpers.process import AsyncProcess
 from music_assistant.providers.airplay.constants import (
     AIRPLAY2_MIN_LOG_LEVEL,
+    CONF_AIRPLAY_CREDENTIALS,
 )
 from music_assistant.providers.airplay.helpers import get_cli_binary
 
 from ._protocol import AirPlayProtocol
 
+if TYPE_CHECKING:
+    from music_assistant.providers.airplay.provider import AirPlayProvider
+
 
 class AirPlay2Stream(AirPlayProtocol):
     """
     AirPlay 2 Audio Streamer.
 
-    Python is not suitable for realtime audio streaming so we do the actual streaming
-    of audio using a small executable written in C based on owntones to do
-    the actual timestamped playback. It reads pcm audio from a named pipe
-    and we can send some interactive commands using another named pipe.
+    Uses cliap2 (C executable based on owntone) for timestamped playback.
+    Audio is fed via stdin, commands via a named pipe.
     """
 
     @property
@@ -51,8 +53,8 @@ class AirPlay2Stream(AirPlayProtocol):
             mass_level = 5
         return max(mass_level, AIRPLAY2_MIN_LOG_LEVEL)
 
-    async def start(self, start_ntp: int, skip: int = 0) -> None:
-        """Initialize CLI process for a player."""
+    async def start(self, start_ntp: int) -> None:
+        """Start cliap2 process."""
         cli_binary = await get_cli_binary(self.player.protocol)
         assert self.player.airplay_discovery_info is not None
 
@@ -64,13 +66,18 @@ class AirPlay2Stream(AirPlayProtocol):
         for key, value in self.player.airplay_discovery_info.decoded_properties.items():
             txt_kv += f'"{key}={value}" '
 
-        # Note: skip parameter is accepted for API compatibility with base class
-        # but is not currently used by the cliap2 binary (AirPlay2 handles late joiners differently)
-
         # cliap2 is the binary that handles the actual streaming to the player
         # this binary leverages from the AirPlay2 support in owntones
         # https://github.com/music-assistant/cliairplay
 
+        # Get AirPlay credentials if available (for Apple devices that require pairing)
+        airplay_credentials: str | None = None
+        if creds := self.player.config.get_value(CONF_AIRPLAY_CREDENTIALS):
+            airplay_credentials = str(creds)
+
+        # Get the provider's DACP ID for remote control callbacks
+        prov = cast("AirPlayProvider", self.prov)
+
         cli_args = [
             cli_binary,
             "--name",
@@ -89,31 +96,34 @@ class AirPlay2Stream(AirPlayProtocol):
             str(self.player.volume_level),
             "--loglevel",
             str(self._cli_loglevel),
+            "--dacp_id",
+            prov.dacp_id,
+            "--active_remote",
+            self.active_remote_id,
             "--pipe",
-            self.audio_pipe.path,
+            "-",  # Use stdin for audio input
             "--command_pipe",
             self.commands_pipe.path,
         ]
 
+        # Add credentials for authenticated AirPlay devices (Apple TV, HomePod, etc.)
+        # Native HAP pairing format: 192 hex chars = client_private_key(128) + server_public_key(64)
+        if airplay_credentials:
+            if len(airplay_credentials) == 192:
+                cli_args += ["--auth", airplay_credentials]
+            else:
+                self.player.logger.warning(
+                    "Invalid credentials length: %d (expected 192)",
+                    len(airplay_credentials),
+                )
+
         self.player.logger.debug(
             "Starting cliap2 process for player %s with args: %s",
             player_id,
             cli_args,
         )
-        self._cli_proc = AsyncProcess(cli_args, stdin=False, stderr=True, name="cliap2")
+        self._cli_proc = AsyncProcess(cli_args, stdin=True, stderr=True, name="cliap2")
         await self._cli_proc.start()
-        # read up to first num_lines lines of stderr to get the initial status
-        num_lines: int = 50
-        if self.prov.logger.level > logging.INFO:
-            num_lines *= 10
-        for _ in range(num_lines):
-            line = (await self._cli_proc.read_stderr()).decode("utf-8", errors="ignore").strip()
-            self.player.logger.debug(line)
-            if f"airplay: Adding AirPlay device '{self.player.display_name}'" in line:
-                self.player.logger.info("AirPlay device connected. Starting playback.")
-                break
-            if f"The AirPlay 2 device '{self.player.display_name}' failed" in line:
-                raise PlayerCommandFailed("Cannot connect to AirPlay device")
         # start reading the stderr of the cliap2 process from another task
         self._cli_proc.attach_stderr_reader(self.mass.create_task(self._stderr_reader()))
 
@@ -124,13 +134,20 @@ class AirPlay2Stream(AirPlayProtocol):
         if not self._cli_proc:
             return
         async for line in self._cli_proc.iter_stderr():
+            if self._stopped:
+                break
+            if "player: event_play_start()" in line:
+                # successfully connected
+                self._connected.set()
             if "Pause at" in line:
-                player.set_state_from_stream(state=PlaybackState.PAUSED)
-            if "Restarted at" in line:
-                player.set_state_from_stream(state=PlaybackState.PLAYING)
-            if "Starting at" in line:
+                player.set_state_from_stream(state=PlaybackState.PAUSED, stream=self)
+            elif "Restarted at" in line:
+                player.set_state_from_stream(state=PlaybackState.PLAYING, stream=self)
+            elif "Starting at" in line:
                 # streaming has started
-                player.set_state_from_stream(state=PlaybackState.PLAYING, elapsed_time=0)
+                player.set_state_from_stream(
+                    state=PlaybackState.PLAYING, elapsed_time=0, stream=self
+                )
             if "put delay detected" in line:
                 if "resetting all outputs" in line:
                     logger.error("High packet loss detected, restarting playback...")
@@ -150,6 +167,9 @@ class AirPlay2Stream(AirPlayProtocol):
                 logger.info(line)
             elif "[ WARN]" in line:
                 logger.warning(line)
+            elif "[DEBUG]" in line and "mass_timer_cb" in line:
+                # mass_timer_cb is very spammy, reduce it to verbose
+                logger.log(VERBOSE_LOG_LEVEL, line)
             elif "[DEBUG]" in line:
                 logger.debug(line)
             elif "[ SPAM]" in line:
@@ -161,4 +181,4 @@ class AirPlay2Stream(AirPlayProtocol):
         # ensure we're cleaned up afterwards (this also logs the returncode)
         if not self._stopped:
             self._stopped = True
-            self.player.set_state_from_stream(state=PlaybackState.IDLE, elapsed_time=0)
+            self.player.set_state_from_stream(state=PlaybackState.IDLE, elapsed_time=0, stream=self)
index f6413fb11e7861780581ac578b840f0152a2c4d6..ca95fddb6c1d131fef545ae573a4a0dea47a3487 100644 (file)
@@ -7,16 +7,15 @@ import logging
 from typing import TYPE_CHECKING, cast
 
 from music_assistant_models.enums import PlaybackState
-from music_assistant_models.errors import PlayerCommandFailed
 
 from music_assistant.constants import VERBOSE_LOG_LEVEL
 from music_assistant.helpers.process import AsyncProcess
 from music_assistant.providers.airplay.constants import (
     AIRPLAY_OUTPUT_BUFFER_DURATION_MS,
     CONF_ALAC_ENCODE,
-    CONF_AP_CREDENTIALS,
     CONF_ENCRYPTION,
     CONF_PASSWORD,
+    CONF_RAOP_CREDENTIALS,
 )
 from music_assistant.providers.airplay.helpers import get_cli_binary
 
@@ -36,10 +35,8 @@ class RaopStream(AirPlayProtocol):
     and we can send some interactive commands using a named pipe.
     """
 
-    supports_pairing = True
-
     async def start(self, start_ntp: int) -> None:
-        """Initialize CLIRaop process for a player."""
+        """Start CLIRaop process."""
         assert self.player.raop_discovery_info is not None  # for type checker
         cli_binary = await get_cli_binary(self.player.protocol)
         extra_args: list[str] = []
@@ -56,20 +53,17 @@ class RaopStream(AirPlayProtocol):
             player_id, CONF_PASSWORD
         ):
             extra_args += ["-password", str(device_password)]
-        # Add AirPlay credentials from pairing if available (for Apple devices)
-        if ap_credentials := self.player.config.get_value(CONF_AP_CREDENTIALS):
-            extra_args += ["-secret", str(ap_credentials)]
+        # Add RAOP credentials from pairing if available (for Apple devices)
+        if raop_credentials := self.player.config.get_value(CONF_RAOP_CREDENTIALS):
+            # Credentials format is "client_id:auth_secret", cliraop expects just auth_secret
+            creds_str = str(raop_credentials)
+            auth_secret = creds_str.split(":", 1)[1] if ":" in creds_str else creds_str
+            extra_args += ["-secret", auth_secret]
         if self.prov.logger.isEnabledFor(logging.DEBUG):
             extra_args += ["-debug", "5"]
         elif self.prov.logger.isEnabledFor(VERBOSE_LOG_LEVEL):
             extra_args += ["-debug", "10"]
 
-        # cliraop is the binary that handles the actual raop streaming to the player
-        # this is a slightly modified version of philippe44's libraop
-        # https://github.com/music-assistant/libraop
-        # we use this intermediate binary to do the actual streaming because attempts to do
-        # so using pure python (e.g. pyatv) were not successful due to the realtime nature
-
         cliraop_args = [
             cli_binary,
             "-ntpstart",
@@ -90,7 +84,7 @@ class RaopStream(AirPlayProtocol):
             "-udn",
             self.player.raop_discovery_info.name,
             self.player.address,
-            self.audio_pipe.path,
+            "-",  # Use stdin for audio input
         ]
         self.player.logger.debug(
             "Starting cliraop process for player %s with args: %s",
@@ -99,74 +93,8 @@ class RaopStream(AirPlayProtocol):
         )
         self._cli_proc = AsyncProcess(cliraop_args, stdin=True, stderr=True, name="cliraop")
         await self._cli_proc.start()
-
-        # read up to first 50 lines of stderr to get the initial status
-        for _ in range(50):
-            line = (await self._cli_proc.read_stderr()).decode("utf-8", errors="ignore")
-            self.player.logger.debug(line)
-            if "connected to " in line:
-                self.player.logger.info("AirPlay device connected. Starting playback.")
-                break
-            if "Cannot connect to AirPlay device" in line:
-                raise PlayerCommandFailed("Cannot connect to AirPlay device")
-
-        # start reading the stderr of the cliraop process from another task
+        # start reading the stderr of the cliap2 process from another task
         self._cli_proc.attach_stderr_reader(self.mass.create_task(self._stderr_reader()))
-        # repeat sending the volume level to the player because some players seem
-        # to ignore it the first time
-        # https://github.com/music-assistant/support/issues/3330
-        self.mass.call_later(1, self.send_cli_command(f"VOLUME={self.player.volume_level}\n"))
-
-    async def start_pairing(self) -> None:
-        """Start pairing process for this protocol (if supported)."""
-        assert self.player.raop_discovery_info is not None  # for type checker
-        cli_binary = await get_cli_binary(self.player.protocol)
-
-        cliraop_args = [
-            cli_binary,
-            "-pair",
-            "-if",
-            self.mass.streams.bind_ip,
-            "-port",
-            str(self.player.raop_discovery_info.port),
-            "-udn",
-            self.player.raop_discovery_info.name,
-            self.player.address,
-        ]
-        self.player.logger.debug(
-            "Starting PAIRING with cliraop process for player %s with args: %s",
-            self.player.player_id,
-            cliraop_args,
-        )
-        self._cli_proc = AsyncProcess(cliraop_args, stdin=True, stderr=True, name="cliraop")
-        await self._cli_proc.start()
-        # read up to first 10 lines of stderr to get the initial status
-        for _ in range(10):
-            line = (await self._cli_proc.read_stderr()).decode("utf-8", errors="ignore")
-            self.player.logger.debug(line)
-            if "enter PIN code displayed on " in line:
-                self.is_pairing = True
-                return
-        await self._cli_proc.close()
-        raise PlayerCommandFailed("Pairing failed")
-
-    async def finish_pairing(self, pin: str) -> str:
-        """Finish pairing process with given PIN (if supported)."""
-        if not self.is_pairing:
-            await self.start_pairing()
-        if not self._cli_proc or self._cli_proc.closed:
-            raise PlayerCommandFailed("Pairing process not started")
-
-        self.is_pairing = False
-        _, _stderr = await self._cli_proc.communicate(input=f"{pin}\n".encode(), timeout=10)
-        for line in _stderr.decode().splitlines():
-            self.player.logger.debug(line)
-            for error in ("device did not respond", "can't authentify", "pin failed"):
-                if error in line.lower():
-                    raise PlayerCommandFailed(f"Pairing failed: {error}")
-            if "secret is " in line:
-                return line.split("secret is ")[1].strip()
-        raise PlayerCommandFailed(f"Pairing failed: {_stderr.decode().strip()}")
 
     async def _stderr_reader(self) -> None:
         """Monitor stderr for the running CLIRaop process."""
@@ -176,13 +104,20 @@ class RaopStream(AirPlayProtocol):
         if not self._cli_proc:
             return
         async for line in self._cli_proc.iter_stderr():
+            if self._stopped:
+                break
+            if "connected to " in line:
+                self._connected.set()
+                # successfully connected - playback will/can start
             if "set pause" in line or "Pause at" in line:
-                player.set_state_from_stream(state=PlaybackState.PAUSED)
-            if "Restarted at" in line or "restarting w/ pause" in line:
-                player.set_state_from_stream(state=PlaybackState.PLAYING)
-            if "restarting w/o pause" in line:
+                player.set_state_from_stream(state=PlaybackState.PAUSED, stream=self)
+            elif "Restarted at" in line or "restarting w/ pause" in line:
+                player.set_state_from_stream(state=PlaybackState.PLAYING, stream=self)
+            elif "restarting w/o pause" in line:
                 # streaming has started
-                player.set_state_from_stream(state=PlaybackState.PLAYING, elapsed_time=0)
+                player.set_state_from_stream(
+                    state=PlaybackState.PLAYING, elapsed_time=0, stream=self
+                )
             if "lost packet out of backlog" in line:
                 lost_packets += 1
                 if lost_packets == 100:
@@ -199,4 +134,4 @@ class RaopStream(AirPlayProtocol):
         logger.debug("CLIRaop stderr reader ended")
         if not self._stopped:
             self._stopped = True
-            self.player.set_state_from_stream(state=PlaybackState.IDLE, elapsed_time=0)
+            self.player.set_state_from_stream(state=PlaybackState.IDLE, elapsed_time=0, stream=self)
index 375dffc7cee9ded7de782881f94fd0aa12312a08..fd9f7de8d95fe0fbd9856744e8d2870ff22ffbcc 100644 (file)
@@ -4,7 +4,6 @@ from __future__ import annotations
 
 import asyncio
 import socket
-from random import randrange
 from typing import cast
 
 from music_assistant_models.enums import PlaybackState
@@ -49,7 +48,10 @@ class AirPlayProvider(PlayerProvider):
         """Handle async initialization of the provider."""
         # register DACP zeroconf service
         dacp_port = await select_free_port(39831, 49831)
-        self.dacp_id = dacp_id = f"{randrange(2**64):X}"
+        # Use first 16 hex chars of server_id as a persistent DACP ID
+        # This ensures the DACP ID remains the same across restarts, which is required
+        # for AirPlay 2 (HAP) pair-verify to work with previously paired devices
+        self.dacp_id = dacp_id = self.mass.server_id[:16].upper()
         self.logger.debug("Starting DACP ActiveRemote %s on port %s", dacp_id, dacp_port)
         self._dacp_server = await asyncio.start_server(
             self._handle_dacp_request, "0.0.0.0", dacp_port
@@ -316,7 +318,10 @@ class AirPlayProvider(PlayerProvider):
                 player.update_volume_from_device(volume)
             elif "device-prevent-playback=1" in path:
                 # device switched to another source (or is powered off)
-                if stream := player.stream:
+                # Ignore during stream transition (stale message from old CLI process)
+                if player._transitioning:
+                    self.logger.debug("Ignoring prevent-playback during stream transition")
+                elif stream := player.stream:
                     stream.prevent_playback = True
                     if stream.session:
                         self.mass.create_task(stream.session.remove_client(player))
index 775bda66c6ce538a0626fbc442c960b20bc4afda..a6fc10a32eba0ba3bf3126478c231c9974102cd7 100644 (file)
@@ -8,19 +8,18 @@ from collections.abc import AsyncGenerator
 from contextlib import suppress
 from typing import TYPE_CHECKING
 
+from music_assistant_models.errors import PlayerCommandFailed
+
 from music_assistant.constants import CONF_SYNC_ADJUST
 from music_assistant.helpers.audio import get_player_filter_params
 from music_assistant.helpers.ffmpeg import FFMpeg
-from music_assistant.helpers.util import TaskManager
 from music_assistant.providers.airplay.helpers import ntp_to_unix_time, unix_time_to_ntp
 
 from .constants import (
     AIRPLAY2_CONNECT_TIME_MS,
-    AIRPLAY_OUTPUT_BUFFER_DURATION_MS,
-    AIRPLAY_PRELOAD_SECONDS,
-    AIRPLAY_PROCESS_SPAWN_TIME_MS,
     CONF_ENABLE_LATE_JOIN,
     ENABLE_LATE_JOIN_DEFAULT,
+    RAOP_CONNECT_TIME_MS,
     StreamingProtocol,
 )
 from .protocols.airplay2 import AirPlay2Stream
@@ -44,11 +43,9 @@ class AirPlayStreamSession:
     ) -> None:
         """Initialize AirPlayStreamSession.
 
-        Args:
-            airplay_provider: The AirPlay provider instance
-            sync_clients: List of AirPlay players to stream to
-            pcm_format: PCM format of the input stream
-            audio_source: Async generator yielding audio chunks
+        :param airplay_provider: The AirPlay provider instance.
+        :param sync_clients: List of AirPlay players to stream to.
+        :param pcm_format: PCM format of the input stream.
         """
         assert sync_clients
         self.prov = airplay_provider
@@ -60,76 +57,49 @@ class AirPlayStreamSession:
         self._lock = asyncio.Lock()
         self.start_ntp: int = 0
         self.start_time: float = 0.0
-        self.wait_start: float = 0.0  # in seconds
-        self.seconds_streamed: float = 0  # Total seconds sent to session
-        # because we reuse an existing stream session for new play_media requests,
-        # we need to track when the last stream was started
-        self.last_stream_started: float = 0.0
+        self.wait_start: float = 0.0
+        self.seconds_streamed: float = 0
         self.total_pause_time: float = 0.0
         self.last_paused: float | None = None
-        self._clients_ready = asyncio.Event()
         self._first_chunk_received = asyncio.Event()
 
     async def start(self, audio_source: AsyncGenerator[bytes, None]) -> None:
         """Initialize stream session for all players."""
-        self.prov.logger.debug(
-            "Starting stream session with %d clients",
-            len(self.sync_clients),
-        )
-        # Prepare all clients
-        # this will create the stream objects, named pipes and start ffmpeg
-        async with TaskManager(self.mass) as tm:
-            for _airplay_player in self.sync_clients:
-                tm.create_task(self._prepare_client(_airplay_player))
-
-        # Start audio source streamer task
-        # this will read from the audio source and distribute
-        # to all player-specific ffmpeg processes
-        # we start this task early because some streams (especially radio)
-        # may need more time to buffer - this way we ensure we have audio ready
-        # when the players should start playing
-        self._audio_source_task = asyncio.create_task(self._audio_streamer(audio_source))
-        # wait until the first chunk is received before starting clients
-        await self._first_chunk_received.wait()
-
-        # Start all clients
-        # Get current NTP timestamp and calculate wait time
         cur_time = time.time()
-        # AirPlay2 clients need around 2500ms to establish connection and start playback
-        # The also have a fixed 2000ms output buffer. We will not be able to respect the
-        # ntpstart time unless we cater for all these time delays.
-        # RAOP clients need less due to less RTSP exchanges and different packet buffer
-        # handling
-        # Plus we need to cater for process spawn and initialisation time
-        wait_start = (
-            AIRPLAY2_CONNECT_TIME_MS
-            + AIRPLAY_OUTPUT_BUFFER_DURATION_MS
-            + AIRPLAY_PROCESS_SPAWN_TIME_MS
-            + (250 * len(self.sync_clients))
-        )  # in milliseconds
+        has_airplay2_client = any(
+            p.protocol == StreamingProtocol.AIRPLAY2 for p in self.sync_clients
+        )
+        wait_start = AIRPLAY2_CONNECT_TIME_MS if has_airplay2_client else RAOP_CONNECT_TIME_MS
         wait_start_seconds = wait_start / 1000
-        self.wait_start = wait_start_seconds  # in seconds
+        self.wait_start = wait_start_seconds
         self.start_time = cur_time + wait_start_seconds
-        self.last_stream_started = self.start_time
         self.start_ntp = unix_time_to_ntp(self.start_time)
+        await asyncio.gather(*[self._start_client(p, self.start_ntp) for p in self.sync_clients])
+        self._audio_source_task = asyncio.create_task(self._audio_streamer(audio_source))
+        try:
+            await asyncio.gather(
+                *[p.stream.wait_for_connection() for p in self.sync_clients if p.stream]
+            )
+        except Exception:
+            # playback failed to start, cleanup
+            await self.stop()
+            raise PlayerCommandFailed("Playback failed to start")
 
-        async with TaskManager(self.mass) as tm:
-            for _airplay_player in self.sync_clients:
-                tm.create_task(self._start_client(_airplay_player, self.start_ntp))
-
-        # All clients started
-        self._clients_ready.set()
-
-    async def stop(self) -> None:
+    async def stop(self, force: bool = False) -> None:
         """Stop playback and cleanup."""
-        await asyncio.gather(
-            *[self.remove_client(x) for x in self.sync_clients],
-            return_exceptions=True,
-        )
         if self._audio_source_task and not self._audio_source_task.done():
             self._audio_source_task.cancel()
             with suppress(asyncio.CancelledError):
                 await self._audio_source_task
+        if force:
+            await asyncio.gather(
+                *[self.stop_client(x, force=True) for x in self.sync_clients],
+            )
+            self.sync_clients = []
+        else:
+            await asyncio.gather(
+                *[self.remove_client(x) for x in self.sync_clients],
+            )
 
     async def remove_client(self, airplay_player: AirPlayPlayer) -> None:
         """Remove a sync client from the session."""
@@ -137,15 +107,31 @@ class AirPlayStreamSession:
             if airplay_player not in self.sync_clients:
                 return
             self.sync_clients.remove(airplay_player)
-        if airplay_player.stream and airplay_player.stream.session == self:
-            await airplay_player.stream.stop()
-        if ffmpeg := self._player_ffmpeg.pop(airplay_player.player_id, None):
-            await ffmpeg.close()
+        await self.stop_client(airplay_player)
         # If this was the last client, stop the session
         if not self.sync_clients:
             await self.stop()
             return
 
+    async def stop_client(self, airplay_player: AirPlayPlayer, force: bool = False) -> None:
+        """
+        Stop a client's stream and ffmpeg.
+
+        :param airplay_player: The player to stop.
+        :param force: If True, kill CLI process immediately.
+        """
+        ffmpeg = self._player_ffmpeg.pop(airplay_player.player_id, None)
+        if force:
+            if ffmpeg and not ffmpeg.closed:
+                await ffmpeg.kill()
+            if airplay_player.stream and airplay_player.stream.session == self:
+                await airplay_player.stream.stop(force=True)
+        else:
+            if ffmpeg and not ffmpeg.closed:
+                await ffmpeg.close()
+            if airplay_player.stream and airplay_player.stream.session == self:
+                await airplay_player.stream.stop()
+
     async def add_client(self, airplay_player: AirPlayPlayer) -> None:
         """Add a sync client to the session as a late joiner.
 
@@ -162,10 +148,7 @@ class AirPlayStreamSession:
             CONF_ENABLE_LATE_JOIN, ENABLE_LATE_JOIN_DEFAULT
         )
         if not allow_late_join:
-            # Late joining is not allowed - restart the session for all players
-            await self.stop()  # we need to stop the current session to add a new client
-            # this could potentially be called by multiple players at the exact same time
-            # so we debounce the resync a bit here with a timer
+            await self.stop()
             if sync_leader.current_media:
                 self.mass.call_later(
                     0.5,
@@ -174,110 +157,39 @@ class AirPlayStreamSession:
                 )
             return
 
-        # Prepare the new client for streaming
-        await self._prepare_client(airplay_player)
-
-        # Snapshot seconds_streamed inside lock to prevent race conditions
-        # Keep lock held during stream.start() to ensure player doesn't miss any chunks
         async with self._lock:
-            # Calculate skip_seconds based on how many chunks have been sent
             skip_seconds = self.seconds_streamed
-            # Start the stream at compensated NTP timestamp
             start_at = self.start_time + skip_seconds
             start_ntp = unix_time_to_ntp(start_at)
-            self.prov.logger.debug(
-                "Adding late joiner %s to session, playback starts %.3fs from now",
-                airplay_player.player_id,
-                start_at - time.time(),
-            )
-            # Add player to sync clients list
             if airplay_player not in self.sync_clients:
                 self.sync_clients.append(airplay_player)
 
             await self._start_client(airplay_player, start_ntp)
-
-    async def replace_stream(self, audio_source: AsyncGenerator[bytes, None]) -> None:
-        """Replace the audio source of the stream."""
-        self._first_chunk_received.clear()
-        new_audio_source_task = asyncio.create_task(self._audio_streamer(audio_source))
-        await self._first_chunk_received.wait()
-        async with self._lock:
-            # Cancel the current audio source task
-            assert self._audio_source_task  # for type checker
-            old_audio_source_task = self._audio_source_task
-            old_audio_source_task.cancel()
-            self._audio_source_task = new_audio_source_task
-        self.last_stream_started = time.time() + self.wait_start
-        self.total_pause_time = 0.0
-        self.last_paused = None
-        for sync_client in self.sync_clients:
-            sync_client.set_state_from_stream(state=None, elapsed_time=0)
-        # ensure we cleanly wait for the old audio source task to finish
-        with suppress(asyncio.CancelledError):
-            await old_audio_source_task
+            if airplay_player.stream:
+                await airplay_player.stream.wait_for_connection()
 
     async def _audio_streamer(self, audio_source: AsyncGenerator[bytes, None]) -> None:
         """Stream audio to all players."""
         pcm_sample_size = self.pcm_format.pcm_sample_size
-        stream_start_time = time.time()
-        first_chunk_received = False
-        # each chunk is exactly one second of audio data based on the pcm format.
-        async for chunk in audio_source:
-            if first_chunk_received is False:
-                first_chunk_received = True
-                self.prov.logger.debug(
-                    "First audio chunk received after %.3fs",
-                    time.time() - stream_start_time,
-                )
-                self._first_chunk_received.set()
-            # Wait until all clients are ready
-            await self._clients_ready.wait()
-            # Send chunk to all players
-            async with self._lock:
-                sync_clients = [x for x in self.sync_clients if x.stream and x.stream.running]
-                if not sync_clients:
-                    self.prov.logger.debug(
-                        "Audio streamer exiting: No running clients left in session"
-                    )
-                    return
-
-                # Write chunk to all players
-                write_tasks = [
-                    self._write_chunk_to_player(x, chunk) for x in sync_clients if x.stream
-                ]
-                results = await asyncio.gather(*write_tasks, return_exceptions=True)
-
-                # Check for write errors or timeouts
-                players_to_remove: list[AirPlayPlayer] = []
-                for i, result in enumerate(results):
-                    if i >= len(sync_clients):
-                        continue
-                    player = sync_clients[i]
-
-                    if isinstance(result, asyncio.TimeoutError):
-                        self.prov.logger.warning(
-                            "Removing player %s from session: stopped reading data (write timeout)",
-                            player.player_id,
-                        )
-                        players_to_remove.append(player)
-                    elif isinstance(result, Exception):
-                        self.prov.logger.warning(
-                            "Removing player %s from session due to write error: %s",
-                            player.player_id,
-                            result,
-                        )
-                        players_to_remove.append(player)
-
-                # Remove failed/timed-out players from sync group
-                for player in players_to_remove:
-                    self.mass.create_task(self.remove_client(player))
-
-                # Update chunk counter (each chunk is exactly one second of audio)
-                chunk_seconds = len(chunk) / pcm_sample_size
-                self.seconds_streamed += chunk_seconds
-
-        # Entire stream consumed: send EOF
-        self.prov.logger.debug("Audio source stream exhausted")
+        watchdog_task = asyncio.create_task(self._silence_watchdog(pcm_sample_size))
+        try:
+            async for chunk in audio_source:
+                if not self._first_chunk_received.is_set():
+                    watchdog_task.cancel()
+                    with suppress(asyncio.CancelledError):
+                        await watchdog_task
+                    self._first_chunk_received.set()
+
+                if not self.sync_clients:
+                    break
+
+                await self._write_chunk_to_all_players(chunk)
+                self.seconds_streamed += len(chunk) / pcm_sample_size
+        finally:
+            if not watchdog_task.done():
+                watchdog_task.cancel()
+                with suppress(asyncio.CancelledError):
+                    await watchdog_task
         async with self._lock:
             await asyncio.gather(
                 *[
@@ -288,97 +200,114 @@ class AirPlayStreamSession:
                 return_exceptions=True,
             )
 
-    async def _write_chunk_to_player(self, airplay_player: AirPlayPlayer, chunk: bytes) -> None:
-        """
-        Write audio chunk to a specific player.
+    async def _silence_watchdog(self, pcm_sample_size: int) -> None:
+        """Insert silence if audio source is slow to deliver first chunk."""
+        grace_period = 0.2
+        max_silence_padding = 5.0
+        silence_inserted = 0.0
+
+        await asyncio.sleep(grace_period)
+        while not self._first_chunk_received.is_set() and silence_inserted < max_silence_padding:
+            silence_duration = 0.1
+            silence_bytes = int(pcm_sample_size * silence_duration)
+            silence_chunk = bytes(silence_bytes)
+            await self._write_chunk_to_all_players(silence_chunk)
+            self.seconds_streamed += silence_duration
+            silence_inserted += silence_duration
+            await asyncio.sleep(0.05)
+
+        if silence_inserted > 0:
+            self.prov.logger.warning(
+                "Inserted %.1fs silence padding while waiting for audio source",
+                silence_inserted,
+            )
 
-        each chunk is (in general) one second of audio data based on the pcm format.
-        For late joiners, compensates for chunks sent between join time and actual chunk delivery.
-        Blocks (async) until the data has been written.
-        """
+    async def _write_chunk_to_all_players(self, chunk: bytes) -> None:
+        """Write a chunk to all connected players."""
+        async with self._lock:
+            sync_clients = [x for x in self.sync_clients if x.stream and x.stream.running]
+            if not sync_clients:
+                return
+
+            # Write chunk to all players
+            write_tasks = [self._write_chunk_to_player(x, chunk) for x in sync_clients if x.stream]
+            results = await asyncio.gather(*write_tasks, return_exceptions=True)
+
+            # Check for write errors or timeouts
+            players_to_remove: list[AirPlayPlayer] = []
+            for i, result in enumerate(results):
+                if i >= len(sync_clients):
+                    continue
+                player = sync_clients[i]
+
+                if isinstance(result, TimeoutError):
+                    self.prov.logger.warning(
+                        "Removing player %s from session: stopped reading data (write timeout)",
+                        player.player_id,
+                    )
+                    players_to_remove.append(player)
+                elif isinstance(result, Exception):
+                    self.prov.logger.warning(
+                        "Removing player %s from session due to write error: %s",
+                        player.player_id,
+                        result,
+                    )
+                    players_to_remove.append(player)
+
+            for player in players_to_remove:
+                self.mass.create_task(self.remove_client(player))
+
+    async def _write_chunk_to_player(self, airplay_player: AirPlayPlayer, chunk: bytes) -> None:
+        """Write audio chunk to a player's ffmpeg process."""
         player_id = airplay_player.player_id
-        # we write the chunk to the player's ffmpeg process which
-        # applies any player-specific filters (e.g. volume, dsp, etc)
-        # and outputs in the correct format for the player stream
-        # to the named pipe associated with the player's stream
         if ffmpeg := self._player_ffmpeg.get(player_id):
             if ffmpeg.closed:
                 return
-            # Use a 35 second timeout - if the write takes longer, the player
-            # has stopped reading data and we're in a deadlock situation
-            # 35 seconds is a little bit above out pause timeout (30s) to allow for some margin
             await asyncio.wait_for(ffmpeg.write(chunk), timeout=35.0)
 
     async def _write_eof_to_player(self, airplay_player: AirPlayPlayer) -> None:
         """Write EOF to a specific player."""
-        # cleanup any associated FFMpeg instance first
         if ffmpeg := self._player_ffmpeg.pop(airplay_player.player_id, None):
             await ffmpeg.write_eof()
             await ffmpeg.wait_with_timeout(30)
-            del ffmpeg
+            if airplay_player.stream and airplay_player.stream._cli_proc:
+                await airplay_player.stream._cli_proc.write_eof()
 
-    async def _prepare_client(self, airplay_player: AirPlayPlayer) -> None:
-        """Prepare stream for a single client."""
-        # Stop existing stream if running
+    async def _start_client(self, airplay_player: AirPlayPlayer, start_ntp: int) -> None:
+        """Start CLI process and ffmpeg for a single client."""
         if airplay_player.stream and airplay_player.stream.running:
             await airplay_player.stream.stop()
-        # Create appropriate stream type based on protocol
         if airplay_player.protocol == StreamingProtocol.AIRPLAY2:
             airplay_player.stream = AirPlay2Stream(airplay_player)
         else:
             airplay_player.stream = RaopStream(airplay_player)
-        # Link stream session to player stream
         airplay_player.stream.session = self
-        # create the named pipes
-        await airplay_player.stream.audio_pipe.create()
-        await airplay_player.stream.commands_pipe.create()
-        # start the (player-specific) ffmpeg process
-        # note that ffmpeg will open the named pipe for writing
-        await self._start_client_ffmpeg(airplay_player)
-        await asyncio.sleep(0.05)  # allow ffmpeg to open the pipe properly
-
-    async def _start_client(self, airplay_player: AirPlayPlayer, start_ntp: int) -> None:
-        """Start stream for a single client."""
         sync_adjust = airplay_player.config.get_value(CONF_SYNC_ADJUST, 0)
         assert isinstance(sync_adjust, int)
         if sync_adjust != 0:
-            # apply sync adjustment
-            start_ntp += sync_adjust * 1000  # sync_adjust is in seconds, NTP in milliseconds
             start_ntp = unix_time_to_ntp(ntp_to_unix_time(start_ntp) + (sync_adjust / 1000))
-        # start the stream
-        assert airplay_player.stream  # for type checker
         await airplay_player.stream.start(start_ntp)
-
-    async def _start_client_ffmpeg(self, airplay_player: AirPlayPlayer) -> None:
-        """Start or restart the player's ffmpeg stream."""
-        # Clean up any existing FFmpeg instance for this player
+        # Start ffmpeg to feed audio to CLI stdin
         if ffmpeg := self._player_ffmpeg.pop(airplay_player.player_id, None):
             await ffmpeg.close()
-            del ffmpeg
-        assert airplay_player.stream  # for type checker
-        # Create the FFMpeg instance per player which accepts our PCM audio
-        # applies any player-specific filters (e.g. volume, dsp, etc)
-        # and outputs in the correct format for the player stream
-        # to the named pipe associated with the player's stream
         filter_params = get_player_filter_params(
             self.mass,
             airplay_player.player_id,
             self.pcm_format,
             airplay_player.stream.pcm_format,
         )
+        cli_proc = airplay_player.stream._cli_proc
+        assert cli_proc
+        assert cli_proc.proc
+        assert cli_proc.proc.stdin
+        stdin_transport = cli_proc.proc.stdin.transport
+        audio_output: str | int = stdin_transport.get_extra_info("pipe").fileno()
         ffmpeg = FFMpeg(
             audio_input="-",
             input_format=self.pcm_format,
             output_format=airplay_player.stream.pcm_format,
             filter_params=filter_params,
-            audio_output=airplay_player.stream.audio_pipe.path,
-            extra_input_args=[
-                "-y",
-                "-readrate",
-                "1",
-                "-readrate_initial_burst",
-                f"{AIRPLAY_PRELOAD_SECONDS}",
-            ],
+            audio_output=audio_output,
         )
         await ffmpeg.start()
         self._player_ffmpeg[airplay_player.player_id] = ffmpeg
index 8c8b769c11616630658adcee08e9b5124b66890d..987e51ce343234faf136717a42a10b7a020103f4 100644 (file)
@@ -67,6 +67,7 @@ shortuuid==1.0.13
 snapcast==2.3.7
 soco==0.30.12
 soundcloudpy==0.1.4
+srptools>=1.0.0
 sxm==0.2.8
 unidecode==1.4.0
 uv>=0.8.0