Optimizations to the AirPlay provider
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Fri, 31 Oct 2025 01:16:46 +0000 (02:16 +0100)
committerMarcel van der Veldt <m.vanderveldt@outlook.com>
Fri, 31 Oct 2025 01:48:54 +0000 (02:48 +0100)
- Some refactoring for code maintability
- Add late join support
- Prepare for AirPlay 2 (split up protocols)
- Various fixes for buffering

15 files changed:
music_assistant/providers/airplay/__init__.py
music_assistant/providers/airplay/bin/cliraop-linux-aarch64
music_assistant/providers/airplay/bin/cliraop-linux-x86_64
music_assistant/providers/airplay/bin/cliraop-macos-arm64
music_assistant/providers/airplay/constants.py
music_assistant/providers/airplay/helpers.py
music_assistant/providers/airplay/manifest.json
music_assistant/providers/airplay/player.py
music_assistant/providers/airplay/protocols/__init__.py [new file with mode: 0644]
music_assistant/providers/airplay/protocols/_protocol.py [new file with mode: 0644]
music_assistant/providers/airplay/protocols/airplay2.py [new file with mode: 0644]
music_assistant/providers/airplay/protocols/raop.py [new file with mode: 0644]
music_assistant/providers/airplay/provider.py
music_assistant/providers/airplay/raop.py [deleted file]
music_assistant/providers/airplay/stream_session.py [new file with mode: 0644]

index 2bbd02833f0bdf56a09594552569c9ff1c96a86e..b6121c97597b22983f65c83d6c3b4eb034a23b3f 100644 (file)
@@ -5,10 +5,11 @@ from __future__ import annotations
 from typing import TYPE_CHECKING
 
 from music_assistant_models.config_entries import ProviderConfig
-from music_assistant_models.enums import ProviderFeature
+from music_assistant_models.enums import ConfigEntryType, ProviderFeature
 from music_assistant_models.provider import ProviderManifest
 
 from music_assistant.mass import MusicAssistant
+from music_assistant.providers.airplay.constants import CONF_ENABLE_LATE_JOIN
 
 from .provider import AirPlayProvider
 
@@ -37,7 +38,20 @@ async def get_config_entries(
     values: the (intermediate) raw values for config entries sent with the action.
     """
     # ruff: noqa: ARG001
-    return ()
+    return (
+        ConfigEntry(
+            key=CONF_ENABLE_LATE_JOIN,
+            type=ConfigEntryType.BOOLEAN,
+            default_value=False,
+            label="Enable late joining",
+            description=(
+                "Allow the player to join an existing AirPlay stream instead of "
+                "starting a new one. \n NOTE: may not work in all conditions. "
+                "If you experience issues or players are not fully in sync, disable this option."
+            ),
+            category="airplay",
+        ),
+    )
 
 
 async def setup(
index 2266b5dab21dd872069929ed4120d154f9b3146b..4eae6a1346ac20a45800bd097caddca534966853 100755 (executable)
Binary files a/music_assistant/providers/airplay/bin/cliraop-linux-aarch64 and b/music_assistant/providers/airplay/bin/cliraop-linux-aarch64 differ
index 0b6faf9ef2e44d7c586d8cf3f1a16cd1772e1833..dc6cedb9b54b222905b138258ad0c02c50a4e342 100755 (executable)
Binary files a/music_assistant/providers/airplay/bin/cliraop-linux-x86_64 and b/music_assistant/providers/airplay/bin/cliraop-linux-x86_64 differ
index c62298b89e65493f20329470e4ed3e19b575a9a5..8de2c4cc12df0c48c769bf415064aa78ad2f4fb0 100755 (executable)
Binary files a/music_assistant/providers/airplay/bin/cliraop-macos-arm64 and b/music_assistant/providers/airplay/bin/cliraop-macos-arm64 differ
index e02a5982d33ffb3b566a261204ef67eefe387f45..473604b1440e2cbd3a574de53f5aea68449547ab 100644 (file)
@@ -2,6 +2,7 @@
 
 from __future__ import annotations
 
+from enum import Enum
 from typing import Final
 
 from music_assistant_models.enums import ContentType
@@ -11,6 +12,14 @@ from music_assistant.constants import INTERNAL_PCM_FORMAT
 
 DOMAIN = "airplay"
 
+
+class StreamingProtocol(Enum):
+    """AirPlay streaming protocol versions."""
+
+    RAOP = 1  # AirPlay 1 (RAOP)
+    AIRPLAY2 = 2  # AirPlay 2
+
+
 CACHE_CATEGORY_PREV_VOLUME: Final[int] = 1
 
 CONF_ENCRYPTION: Final[str] = "encryption"
@@ -20,6 +29,22 @@ CONF_PASSWORD: Final[str] = "password"
 CONF_READ_AHEAD_BUFFER: Final[str] = "read_ahead_buffer"
 CONF_IGNORE_VOLUME: Final[str] = "ignore_volume"
 CONF_CREDENTIALS: Final[str] = "credentials"
+CONF_AIRPLAY_PROTOCOL: Final[str] = "airplay_protocol"
+
+AIRPLAY_DISCOVERY_TYPE: Final[str] = "_airplay._tcp.local."
+RAOP_DISCOVERY_TYPE: Final[str] = "_raop._tcp.local."
+DACP_DISCOVERY_TYPE: Final[str] = "_dacp._tcp.local."
+
+AIRPLAY2_MIN_LOG_LEVEL: Final[int] = 3  # Min loglevel to ensure stderr output contains what we need
+CONF_AP_CREDENTIALS: Final[str] = "ap_credentials"
+CONF_MRP_CREDENTIALS: Final[str] = "mrp_credentials"
+CONF_ACTION_START_PAIRING: Final[str] = "start_ap_pairing"
+CONF_ACTION_FINISH_PAIRING: Final[str] = "finish_ap_pairing"
+CONF_ACTION_START_MRP_PAIRING: Final[str] = "start_mrp_pairing"
+CONF_ACTION_FINISH_MRP_PAIRING: Final[str] = "finish_mrp_pairing"
+CONF_PAIRING_PIN: Final[str] = "pairing_pin"
+CONF_MRP_PAIRING_PIN: Final[str] = "mrp_pairing_pin"
+CONF_ENABLE_LATE_JOIN: Final[str] = "enable_late_join"
 
 BACKOFF_TIME_LOWER_LIMIT: Final[int] = 15  # seconds
 BACKOFF_TIME_UPPER_LIMIT: Final[int] = 300  # Five minutes
@@ -47,4 +72,6 @@ BROKEN_RAOP_MODELS = (
     ("Sonos", "Arc Ultra"),
     # Samsung has been repeatedly being reported as having issues with AirPlay 1/raop
     ("Samsung", "*"),
+    ("Ubiquiti Inc.", "*"),
+    ("Juke Audio", "*"),
 )
index 3089990a1b851f9920faa67e2aa7f8199330c014..1b4f03330e5e5c30641745580de8a7b7218e2443 100644 (file)
@@ -4,16 +4,20 @@ from __future__ import annotations
 
 import os
 import platform
+import time
 from typing import TYPE_CHECKING
 
 from zeroconf import IPVersion
 
 from music_assistant.helpers.process import check_output
-from music_assistant.providers.airplay.constants import BROKEN_RAOP_MODELS
+from music_assistant.providers.airplay.constants import BROKEN_RAOP_MODELS, StreamingProtocol
 
 if TYPE_CHECKING:
     from zeroconf.asyncio import AsyncServiceInfo
 
+# NTP epoch delta: difference between Unix epoch (1970) and NTP epoch (1900)
+NTP_EPOCH_DELTA = 0x83AA7E80  # 2208988800 seconds
+
 
 def convert_airplay_volume(value: float) -> int:
     """Remap AirPlay Volume to 0..100 scale."""
@@ -25,7 +29,7 @@ def convert_airplay_volume(value: float) -> int:
     return int(portion + normal_min)
 
 
-def get_model_info(info: AsyncServiceInfo) -> tuple[str, str]:
+def get_model_info(info: AsyncServiceInfo) -> tuple[str, str]:  # noqa: PLR0911
     """Return Manufacturer and Model name from mdns info."""
     manufacturer = info.decoded_properties.get("manufacturer")
     model = info.decoded_properties.get("model")
@@ -57,6 +61,8 @@ def get_model_info(info: AsyncServiceInfo) -> tuple[str, str]:
         return ("Apple", "Apple TV 4K Gen2")
     if model == "AppleTV14,1":
         return ("Apple", "Apple TV 4K Gen3")
+    if model == "UPL-AMP":
+        return ("Ubiquiti Inc.", "UPL-AMP")
     if "AirPort" in model:
         return ("Apple", "AirPort Express")
     if "AudioAccessory" in model:
@@ -64,6 +70,26 @@ def get_model_info(info: AsyncServiceInfo) -> tuple[str, str]:
     if "AppleTV" in model:
         model = "Apple TV"
         manufacturer = "Apple"
+    # Detect Mac devices (Mac mini, MacBook, iMac, etc.)
+    # Model identifiers like: Mac16,11, MacBookPro18,3, iMac21,1
+    if model.startswith(("Mac", "iMac")):
+        # Parse Mac model to friendly name
+        if model.startswith("MacBookPro"):
+            return ("Apple", f"MacBook Pro ({model})")
+        if model.startswith("MacBookAir"):
+            return ("Apple", f"MacBook Air ({model})")
+        if model.startswith("MacBook"):
+            return ("Apple", f"MacBook ({model})")
+        if model.startswith("iMac"):
+            return ("Apple", f"iMac ({model})")
+        if model.startswith("Macmini"):
+            return ("Apple", f"Mac mini ({model})")
+        if model.startswith("MacPro"):
+            return ("Apple", f"Mac Pro ({model})")
+        if model.startswith("MacStudio"):
+            return ("Apple", f"Mac Studio ({model})")
+        # Generic Mac device (e.g. Mac16,11 for Mac mini M4)
+        return ("Apple", f"Mac ({model})")
 
     return (manufacturer or "AirPlay", model)
 
@@ -89,17 +115,43 @@ def is_broken_raop_model(manufacturer: str, model: str) -> bool:
     return False
 
 
-async def get_cliraop_binary() -> str:
-    """Find the correct raop/airplay binary belonging to the platform."""
+async def get_cli_binary(protocol: StreamingProtocol) -> str:
+    """Find the correct raop/airplay binary belonging to the platform.
+
+    Args:
+        protocol: The streaming protocol (RAOP or AIRPLAY2)
+
+    Returns:
+        Path to the CLI binary
 
-    async def check_binary(cliraop_path: str) -> str | None:
+    Raises:
+        RuntimeError: If the binary cannot be found
+    """
+
+    async def check_binary(cli_path: str) -> str | None:
         try:
-            returncode, output = await check_output(
-                cliraop_path,
-                "-check",
-            )
-            if returncode == 0 and output.strip().decode() == "cliraop check":
-                return cliraop_path
+            if protocol == StreamingProtocol.RAOP:
+                args = [
+                    cli_path,
+                    "-check",
+                ]
+                passing_output = "cliraop check"
+            else:
+                config_file = os.path.join(os.path.dirname(__file__), "bin", "cliap2.conf")
+                args = [
+                    cli_path,
+                    "--testrun",
+                    "--config",
+                    config_file,
+                ]
+
+            returncode, output = await check_output(*args)
+            if (
+                protocol == StreamingProtocol.RAOP
+                and returncode == 0
+                and output.strip().decode() == passing_output
+            ) or (protocol == StreamingProtocol.AIRPLAY2 and returncode == 0):
+                return cli_path
         except OSError:
             pass
         return None
@@ -108,10 +160,125 @@ async def get_cliraop_binary() -> str:
     system = platform.system().lower().replace("darwin", "macos")
     architecture = platform.machine().lower()
 
+    if protocol == StreamingProtocol.RAOP:
+        package = "cliraop"
+    elif protocol == StreamingProtocol.AIRPLAY2:
+        package = "cliap2"
+    else:
+        raise RuntimeError(f"Unsupported streaming protocol requested: {protocol}")
+
     if bridge_binary := await check_binary(
-        os.path.join(base_path, f"cliraop-{system}-{architecture}")
+        os.path.join(base_path, f"{package}-{system}-{architecture}")
     ):
         return bridge_binary
 
-    msg = f"Unable to locate RAOP Play binary for {system}/{architecture}"
+    msg = (
+        f"Unable to locate {protocol.name} CLI stream binary {package} for {system}/{architecture}"
+    )
     raise RuntimeError(msg)
+
+
+def get_ntp_timestamp() -> int:
+    """
+    Get current NTP timestamp (64-bit).
+
+    Returns:
+        int: 64-bit NTP timestamp (upper 32 bits = seconds, lower 32 bits = fraction)
+    """
+    # Get current Unix timestamp with microsecond precision
+    current_time = time.time()
+
+    # Split into seconds and microseconds
+    seconds = int(current_time)
+    microseconds = int((current_time - seconds) * 1_000_000)
+
+    # Convert to NTP epoch (add offset from 1970 to 1900)
+    ntp_seconds = seconds + NTP_EPOCH_DELTA
+
+    # Convert microseconds to NTP fraction (2^32 parts per second)
+    # fraction = (microseconds * 2^32) / 1_000_000
+    ntp_fraction = int((microseconds << 32) / 1_000_000)
+
+    # Combine into 64-bit value
+    return (ntp_seconds << 32) | ntp_fraction
+
+
+def ntp_to_seconds_fraction(ntp_timestamp: int) -> tuple[int, int]:
+    """
+    Split NTP timestamp into seconds and fraction components.
+
+    Args:
+        ntp_timestamp: 64-bit NTP timestamp
+
+    Returns:
+        tuple: (seconds, fraction)
+    """
+    seconds = ntp_timestamp >> 32
+    fraction = ntp_timestamp & 0xFFFFFFFF
+    return seconds, fraction
+
+
+def ntp_to_unix_time(ntp_timestamp: int) -> float:
+    """
+    Convert NTP timestamp to Unix timestamp (float).
+
+    Args:
+        ntp_timestamp: 64-bit NTP timestamp
+
+    Returns:
+        float: Unix timestamp (seconds since 1970-01-01)
+    """
+    seconds = ntp_timestamp >> 32
+    fraction = ntp_timestamp & 0xFFFFFFFF
+
+    # Convert back to Unix epoch
+    unix_seconds = seconds - NTP_EPOCH_DELTA
+
+    # Convert fraction to microseconds
+    microseconds = (fraction * 1_000_000) >> 32
+
+    return unix_seconds + (microseconds / 1_000_000)
+
+
+def unix_time_to_ntp(unix_timestamp: float) -> int:
+    """
+    Convert Unix timestamp (float) to NTP timestamp.
+
+    Args:
+        unix_timestamp: Unix timestamp (seconds since 1970-01-01)
+
+    Returns:
+        int: 64-bit NTP timestamp
+    """
+    seconds = int(unix_timestamp)
+    microseconds = int((unix_timestamp - seconds) * 1_000_000)
+
+    # Convert to NTP epoch
+    ntp_seconds = seconds + NTP_EPOCH_DELTA
+
+    # Convert microseconds to NTP fraction
+    ntp_fraction = int((microseconds << 32) / 1_000_000)
+
+    return (ntp_seconds << 32) | ntp_fraction
+
+
+def add_seconds_to_ntp(ntp_timestamp: int, seconds: float) -> int:
+    """
+    Add seconds to an NTP timestamp.
+
+    Args:
+        ntp_timestamp: 64-bit NTP timestamp
+        seconds: Number of seconds to add (can be fractional)
+
+    Returns:
+        int: New NTP timestamp with seconds added
+    """
+    # Extract whole seconds and fraction
+    whole_seconds = int(seconds)
+    fraction = seconds - whole_seconds
+
+    # Convert to NTP format (upper 32 bits = seconds, lower 32 bits = fraction)
+    ntp_seconds = whole_seconds << 32
+    ntp_fraction = int(fraction * (1 << 32))
+
+    return ntp_timestamp + ntp_seconds + ntp_fraction
index 9f5724c26de32015caca6a725337edf7ab2eaf8b..278a70f86aa85b3b3d60461d073a6842cb390306 100644 (file)
@@ -10,5 +10,5 @@
   "multi_instance": false,
   "builtin": false,
   "icon": "cast-variant",
-  "mdns_discovery": ["_raop._tcp.local."]
+  "mdns_discovery": ["_airplay._tcp.local.", "_raop._tcp.local."]
 }
index c11b7a0764e431130df4a7f9be863110a09d9479..40aa721fbd475e3f1b1a14b7fdd5ac3f960060ea 100644 (file)
@@ -1,4 +1,4 @@
-"""AirPlay Player implementation."""
+"""AirPlay Player implementations."""
 
 from __future__ import annotations
 
@@ -6,7 +6,7 @@ import asyncio
 import time
 from typing import TYPE_CHECKING, cast
 
-from music_assistant_models.config_entries import ConfigEntry, ConfigValueType
+from music_assistant_models.config_entries import ConfigEntry, ConfigValueOption, ConfigValueType
 from music_assistant_models.enums import (
     ConfigEntryType,
     ContentType,
@@ -16,6 +16,7 @@ from music_assistant_models.enums import (
     PlayerType,
 )
 from music_assistant_models.media_items import AudioFormat
+from propcache import under_cached_property as cached_property
 
 from music_assistant.constants import (
     CONF_ENTRY_DEPRECATED_EQ_BASS,
@@ -31,26 +32,35 @@ from music_assistant.models.player import DeviceInfo, Player, PlayerMedia
 from music_assistant.providers.universal_group.constants import UGP_PREFIX
 
 from .constants import (
+    AIRPLAY_DISCOVERY_TYPE,
     AIRPLAY_FLOW_PCM_FORMAT,
     AIRPLAY_PCM_FORMAT,
     CACHE_CATEGORY_PREV_VOLUME,
+    CONF_ACTION_FINISH_PAIRING,
+    CONF_ACTION_START_PAIRING,
+    CONF_AIRPLAY_PROTOCOL,
     CONF_ALAC_ENCODE,
+    CONF_AP_CREDENTIALS,
     CONF_ENCRYPTION,
     CONF_IGNORE_VOLUME,
+    CONF_PAIRING_PIN,
     CONF_PASSWORD,
     CONF_READ_AHEAD_BUFFER,
     FALLBACK_VOLUME,
+    RAOP_DISCOVERY_TYPE,
+    StreamingProtocol,
 )
 from .helpers import get_primary_ip_address_from_zeroconf, is_broken_raop_model
-from .raop import RaopStreamSession
+from .stream_session import AirPlayStreamSession
 
 if TYPE_CHECKING:
     from zeroconf.asyncio import AsyncServiceInfo
 
     from music_assistant.providers.universal_group import UniversalGroupPlayer
 
+    from .protocols.airplay2 import AirPlay2Stream
+    from .protocols.raop import RaopStream
     from .provider import AirPlayProvider
-    from .raop import RaopStream
 
 
 BROKEN_RAOP_WARN = ConfigEntry(
@@ -70,7 +80,7 @@ class AirPlayPlayer(Player):
         self,
         provider: AirPlayProvider,
         player_id: str,
-        discovery_info: AsyncServiceInfo,
+        discovery_info: AsyncServiceInfo | None,
         address: str,
         display_name: str,
         manufacturer: str,
@@ -81,10 +91,9 @@ class AirPlayPlayer(Player):
         super().__init__(provider, player_id)
         self.discovery_info = discovery_info
         self.address = address
-        self.raop_stream: RaopStream | None = None
+        self.stream: RaopStream | AirPlay2Stream | None = None
         self.last_command_sent = 0.0
         self._lock = asyncio.Lock()
-
         # Set (static) player attributes
         self._attr_type = PlayerType.PLAYER
         self._attr_name = display_name
@@ -104,12 +113,31 @@ class AirPlayPlayer(Player):
         self._attr_can_group_with = {provider.lookup_key}
         self._attr_enabled_by_default = not is_broken_raop_model(manufacturer, model)
 
+    @cached_property
+    def protocol(self) -> StreamingProtocol:
+        """Get the streaming protocol to use for this player."""
+        protocol_value = self.config.get_value(CONF_AIRPLAY_PROTOCOL)
+        # Convert integer value to StreamingProtocol enum
+        if protocol_value == 2:
+            return StreamingProtocol.AIRPLAY2
+        return StreamingProtocol.RAOP
+
     async def get_config_entries(
         self,
         action: str | None = None,
         values: dict[str, ConfigValueType] | None = None,
     ) -> list[ConfigEntry]:
         """Return all (provider/player specific) Config Entries for the given player (if any)."""
+        base_entries = await super().get_config_entries()
+
+        # Handle pairing actions
+        if action and self._requires_pairing():
+            await self._handle_pairing_action(action=action, values=values)
+
+        # Add pairing config entries for Apple TV and macOS devices
+        if self._requires_pairing():
+            base_entries = [*self._get_pairing_config_entries(), *base_entries]
+
         base_entries = await super().get_config_entries(action=action, values=values)
         base_entries += [
             CONF_ENTRY_FLOW_MODE_ENFORCED,
@@ -117,6 +145,23 @@ class AirPlayPlayer(Player):
             CONF_ENTRY_DEPRECATED_EQ_MID,
             CONF_ENTRY_DEPRECATED_EQ_TREBLE,
             CONF_ENTRY_OUTPUT_CODEC_HIDDEN,
+            ConfigEntry(
+                key=CONF_AIRPLAY_PROTOCOL,
+                type=ConfigEntryType.INTEGER,
+                default_value=StreamingProtocol.RAOP.value,
+                required=False,
+                label="AirPlay version to use for streaming",
+                description="AirPlay version 1 protocol uses RAOP.\n"
+                "AirPlay version 2 is an extension of RAOP.\n"
+                "Some newer devices do not fully support RAOP and "
+                "will only work with AirPlay version 2.",
+                category="airplay",
+                options=[
+                    ConfigValueOption("AirPlay 1 (RAOP)", StreamingProtocol.RAOP.value),
+                    ConfigValueOption("AirPlay 2", StreamingProtocol.AIRPLAY2.value),
+                ],
+                hidden=True,
+            ),
             ConfigEntry(
                 key=CONF_ENCRYPTION,
                 type=ConfigEntryType.BOOLEAN,
@@ -153,9 +198,10 @@ class AirPlayPlayer(Player):
                 label="Audio buffer (ms)",
                 description="Amount of buffer (in milliseconds), "
                 "the player should keep to absorb network throughput jitter. "
-                "If you experience audio dropouts, try increasing this value.",
+                "Lower values reduce latency but may cause dropouts. "
+                "Recommended: 1000ms for stable playback.",
                 category="airplay",
-                range=(500, 3000),
+                range=(500, 2000),
             ),
             # airplay has fixed sample rate/bit depth so make this config entry static and hidden
             create_sample_rates_config_entry(
@@ -177,16 +223,212 @@ class AirPlayPlayer(Player):
             ),
         ]
 
+        # Regular AirPlay config entries
+        base_entries.extend(
+            [
+                ConfigEntry(
+                    key=CONF_ENCRYPTION,
+                    type=ConfigEntryType.BOOLEAN,
+                    default_value=True,
+                    label="Enable encryption",
+                    description="Enable encrypted communication with the player, "
+                    "should by default be enabled for most devices.",
+                    category="airplay",
+                ),
+                ConfigEntry(
+                    key=CONF_ALAC_ENCODE,
+                    type=ConfigEntryType.BOOLEAN,
+                    default_value=True,
+                    label="Enable compression",
+                    description="Save some network bandwidth by sending the audio as "
+                    "(lossless) ALAC at the cost of a bit CPU.",
+                    category="airplay",
+                ),
+                CONF_ENTRY_SYNC_ADJUST,
+                ConfigEntry(
+                    key=CONF_PASSWORD,
+                    type=ConfigEntryType.SECURE_STRING,
+                    default_value=None,
+                    required=False,
+                    label="Device password",
+                    description="Some devices require a password to connect/play.",
+                    category="airplay",
+                ),
+                ConfigEntry(
+                    key=CONF_READ_AHEAD_BUFFER,
+                    type=ConfigEntryType.INTEGER,
+                    default_value=1000,
+                    required=False,
+                    label="Audio buffer (ms)",
+                    description="Amount of buffer (in milliseconds), "
+                    "the player should keep to absorb network throughput jitter. "
+                    "If you experience audio dropouts, try increasing this value.",
+                    category="airplay",
+                    range=(500, 3000),
+                ),
+                # airplay has fixed sample rate/bit depth so make this config entry
+                # static and hidden
+                create_sample_rates_config_entry(
+                    supported_sample_rates=[44100], supported_bit_depths=[16], hidden=True
+                ),
+                ConfigEntry(
+                    key=CONF_IGNORE_VOLUME,
+                    type=ConfigEntryType.BOOLEAN,
+                    default_value=False,
+                    label="Ignore volume reports sent by the device itself",
+                    description=(
+                        "The AirPlay protocol allows devices to report their own volume "
+                        "level. \n"
+                        "For some devices this is not reliable and can cause unexpected "
+                        "volume changes. \n"
+                        "Enable this option to ignore these reports."
+                    ),
+                    category="airplay",
+                ),
+            ]
+        )
+
         if is_broken_raop_model(self.device_info.manufacturer, self.device_info.model):
             base_entries.insert(-1, BROKEN_RAOP_WARN)
 
         return base_entries
 
+    def _requires_pairing(self) -> bool:
+        """Check if this device requires pairing (Apple TV or macOS)."""
+        if self.device_info.manufacturer.lower() != "apple":
+            return False
+
+        model = self.device_info.model
+        # Apple TV devices
+        if "appletv" in model.lower() or "apple tv" in model.lower():
+            return True
+        # Mac devices (including iMac, MacBook, Mac mini, Mac Pro, Mac Studio)
+        return model.startswith(("Mac", "iMac"))
+
+    def _get_pairing_config_entries(self) -> list[ConfigEntry]:
+        """Return pairing config entries for Apple TV and macOS devices.
+
+        Uses cliraop for AirPlay/RAOP pairing.
+        """
+        entries: list[ConfigEntry] = []
+
+        # Check if we have credentials stored
+        has_credentials = bool(self.config.get_value(CONF_AP_CREDENTIALS))
+
+        if not has_credentials:
+            # Show pairing instructions and start button
+            entries.append(
+                ConfigEntry(
+                    key="pairing_instructions",
+                    type=ConfigEntryType.LABEL,
+                    label="AirPlay Pairing Required",
+                    description=(
+                        "This device requires pairing before it can be used. "
+                        "Click the button below to start the pairing process."
+                    ),
+                )
+            )
+            entries.append(
+                ConfigEntry(
+                    key=CONF_ACTION_START_PAIRING,
+                    type=ConfigEntryType.ACTION,
+                    label="Start Pairing",
+                    description="Start the AirPlay pairing process",
+                    action=CONF_ACTION_START_PAIRING,
+                )
+            )
+        else:
+            # Show paired status
+            entries.append(
+                ConfigEntry(
+                    key="pairing_status",
+                    type=ConfigEntryType.LABEL,
+                    label="AirPlay Pairing Status",
+                    description="Device is paired and ready to use.",
+                )
+            )
+
+        # If pairing was started, show PIN entry
+        if self.config.get_value("_pairing_in_progress"):
+            entries.append(
+                ConfigEntry(
+                    key=CONF_PAIRING_PIN,
+                    type=ConfigEntryType.STRING,
+                    label="Enter PIN",
+                    description="Enter the 4-digit PIN shown on the device",
+                    required=True,
+                )
+            )
+            entries.append(
+                ConfigEntry(
+                    key=CONF_ACTION_FINISH_PAIRING,
+                    type=ConfigEntryType.ACTION,
+                    label="Finish Pairing",
+                    description="Complete the pairing process with the PIN",
+                    action=CONF_ACTION_FINISH_PAIRING,
+                )
+            )
+
+        # Store credentials (hidden from UI)
+        entries.append(
+            ConfigEntry(
+                key=CONF_AP_CREDENTIALS,
+                type=ConfigEntryType.SECURE_STRING,
+                label="AirPlay Credentials",
+                default_value=None,
+                required=False,
+                hidden=True,
+            )
+        )
+
+        return entries
+
+    async def _handle_pairing_action(
+        self, action: str, values: dict[str, ConfigValueType] | None
+    ) -> None:
+        """Handle pairing actions using cliraop.
+
+        TODO: Implement actual cliraop-based pairing.
+        """
+        if action == CONF_ACTION_START_PAIRING:
+            # TODO: Start pairing using cliraop
+            # For now, just set a flag to show the PIN entry
+            self.mass.config.set_raw_player_config_value(
+                self.player_id, "_pairing_in_progress", True
+            )
+            self.logger.info("Started AirPlay pairing for %s", self.display_name)
+
+        elif action == CONF_ACTION_FINISH_PAIRING:
+            # TODO: Finish pairing using cliraop with the provided PIN
+            if not values:
+                return
+
+            pin = values.get(CONF_PAIRING_PIN)
+            if not pin:
+                self.logger.warning("No PIN provided for pairing")
+                return
+
+            # TODO: Use cliraop to complete pairing with the PIN
+            # For now, just clear the pairing in progress flag
+            self.mass.config.set_raw_player_config_value(
+                self.player_id, "_pairing_in_progress", False
+            )
+
+            # TODO: Store the actual credentials obtained from cliraop
+            # self.mass.config.set_raw_player_config_value(
+            #     self.player_id, CONF_AP_CREDENTIALS, credentials_from_cliraop
+            # )
+
+            self.logger.info(
+                "Finished AirPlay pairing for %s (TODO: implement actual pairing)",
+                self.display_name,
+            )
+
     async def stop(self) -> None:
         """Send STOP command to player."""
-        if self.raop_stream and self.raop_stream.session:
+        if self.stream and self.stream.session:
             # forward stop to the entire stream session
-            await self.raop_stream.session.stop()
+            await self.stream.session.stop()
         self._attr_active_source = None
         self._attr_current_media = None
         self.update_state()
@@ -194,8 +436,8 @@ class AirPlayPlayer(Player):
     async def play(self) -> None:
         """Send PLAY (unpause) command to player."""
         async with self._lock:
-            if self.raop_stream and self.raop_stream.running:
-                await self.raop_stream.send_cli_command("ACTION=PLAY")
+            if self.stream and self.stream.running:
+                await self.stream.send_cli_command("ACTION=PLAY")
 
     async def pause(self) -> None:
         """Send PAUSE command to player."""
@@ -206,9 +448,9 @@ class AirPlayPlayer(Player):
             return
 
         async with self._lock:
-            if not self.raop_stream or not self.raop_stream.running:
+            if not self.stream or not self.stream.running:
                 return
-            await self.raop_stream.send_cli_command("ACTION=PAUSE")
+            await self.stream.send_cli_command("ACTION=PAUSE")
 
     async def play_media(self, media: PlayerMedia) -> None:
         """Handle PLAY MEDIA on given player."""
@@ -225,7 +467,7 @@ class AirPlayPlayer(Player):
         if media.media_type == MediaType.ANNOUNCEMENT:
             # special case: stream announcement
             assert media.custom_data
-            input_format = AIRPLAY_PCM_FORMAT
+            pcm_format = AIRPLAY_PCM_FORMAT
             audio_source = self.mass.streams.get_announcement_stream(
                 media.custom_data["announcement_url"],
                 output_format=AIRPLAY_PCM_FORMAT,
@@ -234,7 +476,7 @@ class AirPlayPlayer(Player):
             )
         elif media.media_type == MediaType.PLUGIN_SOURCE:
             # special case: plugin source stream
-            input_format = AIRPLAY_PCM_FORMAT
+            pcm_format = AIRPLAY_PCM_FORMAT
             assert media.custom_data
             audio_source = self.mass.streams.get_plugin_source_stream(
                 plugin_source_id=media.custom_data["source_id"],
@@ -248,11 +490,11 @@ class AirPlayPlayer(Player):
             ugp_player = cast("UniversalGroupPlayer", self.mass.players.get(media.source_id))
             ugp_stream = ugp_player.stream
             assert ugp_stream is not None  # for type checker
-            input_format = ugp_stream.base_pcm_format
+            pcm_format = ugp_stream.base_pcm_format
             audio_source = ugp_stream.subscribe_raw()
         elif media.source_id and media.queue_item_id:
             # regular queue (flow) stream request
-            input_format = AIRPLAY_FLOW_PCM_FORMAT
+            pcm_format = AIRPLAY_FLOW_PCM_FORMAT
             queue = self.mass.player_queues.get(media.source_id)
             assert queue
             start_queue_item = self.mass.player_queues.get_item(
@@ -262,12 +504,12 @@ class AirPlayPlayer(Player):
             audio_source = self.mass.streams.get_queue_flow_stream(
                 queue=queue,
                 start_queue_item=start_queue_item,
-                pcm_format=input_format,
+                pcm_format=pcm_format,
             )
         else:
             # assume url or some other direct path
             # NOTE: this will fail if its an uri not playable by ffmpeg
-            input_format = AIRPLAY_PCM_FORMAT
+            pcm_format = AIRPLAY_PCM_FORMAT
             audio_source = get_ffmpeg_stream(
                 audio_input=media.uri,
                 input_format=AudioFormat(content_type=ContentType.try_parse(media.uri)),
@@ -275,25 +517,25 @@ class AirPlayPlayer(Player):
             )
 
         # if an existing stream session is running, we could replace it with the new stream
-        if self.raop_stream and self.raop_stream.running:
+        if self.stream and self.stream.running:
             # check if we need to replace the stream
-            if self.raop_stream.prevent_playback:
+            if self.stream.prevent_playback:
                 # player is in prevent playback mode, we need to stop the stream
                 await self.stop()
             else:
-                await self.raop_stream.session.replace_stream(audio_source)
+                await self.stream.session.replace_stream(audio_source)
                 return
 
-        # setup RaopStreamSession for player (and its sync childs if any)
+        # setup StreamSession for player (and its sync childs if any)
         sync_clients = self._get_sync_clients()
         provider = cast("AirPlayProvider", self.provider)
-        raop_stream_session = RaopStreamSession(provider, sync_clients, input_format, audio_source)
-        await raop_stream_session.start()
+        stream_session = AirPlayStreamSession(provider, sync_clients, pcm_format, audio_source)
+        await stream_session.start()
 
     async def volume_set(self, volume_level: int) -> None:
         """Send VOLUME_SET command to given player."""
-        if self.raop_stream and self.raop_stream.running:
-            await self.raop_stream.send_cli_command(f"VOLUME={volume_level}\n")
+        if self.stream and self.stream.running:
+            await self.stream.send_cli_command(f"VOLUME={volume_level}\n")
         self._attr_volume_level = volume_level
         self.update_state()
         # store last state in cache
@@ -317,22 +559,22 @@ class AirPlayPlayer(Player):
             # nothing to do
             return
 
-        raop_session = self.raop_stream.session if self.raop_stream else None
+        stream_session = self.stream.session if self.stream else None
         # handle removals first
         if player_ids_to_remove:
             if self.player_id in player_ids_to_remove:
                 # dissolve the entire sync group
-                if self.raop_stream and self.raop_stream.running:
+                if self.stream and self.stream.running:
                     # stop the stream session if it is running
-                    await self.raop_stream.session.stop()
+                    await self.stream.session.stop()
                 self._attr_group_members = []
                 self.update_state()
                 return
 
             for child_player in self._get_sync_clients():
                 if child_player.player_id in player_ids_to_remove:
-                    if raop_session:
-                        await raop_session.remove_client(child_player)
+                    if stream_session:
+                        await stream_session.remove_client(child_player)
                     self._attr_group_members.remove(child_player.player_id)
 
         # handle additions
@@ -354,16 +596,16 @@ class AirPlayPlayer(Player):
                 "AirPlayPlayer | None", self.mass.players.get(player_id)
             ):
                 if (
-                    child_player_to_add.raop_stream
-                    and child_player_to_add.raop_stream.running
-                    and child_player_to_add.raop_stream.session != raop_session
+                    child_player_to_add.stream
+                    and child_player_to_add.stream.running
+                    and child_player_to_add.stream.session != stream_session
                 ):
-                    await child_player_to_add.raop_stream.session.remove_client(child_player_to_add)
+                    await child_player_to_add.stream.session.remove_client(child_player_to_add)
 
-            # add new child to the existing raop session (if any)
+            # add new child to the existing stream (RAOP or AirPlay2) session (if any)
             self._attr_group_members.append(player_id)
-            if raop_session:
-                await raop_session.add_client(child_player_to_add)
+            if stream_session:
+                await stream_session.add_client(child_player_to_add)
 
         # always update the state after modifying group members
         self.update_state()
@@ -371,7 +613,7 @@ class AirPlayPlayer(Player):
     def update_volume_from_device(self, volume: int) -> None:
         """Update volume from device feedback."""
         ignore_volume_report = (
-            self.mass.config.get_raw_player_config_value(self.player_id, CONF_IGNORE_VOLUME, False)
+            self.config.get_value(CONF_IGNORE_VOLUME)
             or self.device_info.manufacturer.lower() == "apple"
         )
 
@@ -388,7 +630,12 @@ class AirPlayPlayer(Player):
     def set_discovery_info(self, discovery_info: AsyncServiceInfo, display_name: str) -> None:
         """Set/update the discovery info for the player."""
         self._attr_name = display_name
-        self.discovery_info = discovery_info
+        if discovery_info.type == AIRPLAY_DISCOVERY_TYPE:
+            self.airplay_discovery_info = discovery_info
+        elif discovery_info.type == RAOP_DISCOVERY_TYPE:
+            self.raop_discovery_info = discovery_info
+        else:  # guard
+            return
         cur_address = self.address
         new_address = get_primary_ip_address_from_zeroconf(discovery_info)
         if new_address is None:
@@ -400,10 +647,10 @@ class AirPlayPlayer(Player):
             self._attr_device_info.ip_address = new_address
         self.update_state()
 
-    def set_state_from_raop(
+    def set_state_from_stream(
         self, state: PlaybackState | None = None, elapsed_time: float | None = None
     ) -> None:
-        """Set the playback state from RAOP."""
+        """Set the playback state from stream (RAOP or AirPlay2)."""
         if state is not None:
             self._attr_playback_state = state
         if elapsed_time is not None:
@@ -414,11 +661,11 @@ class AirPlayPlayer(Player):
     async def on_unload(self) -> None:
         """Handle logic when the player is unloaded from the Player controller."""
         await super().on_unload()
-        if self.raop_stream:
+        if self.stream:
             # stop the stream session if it is running
-            if self.raop_stream.running:
-                self.mass.create_task(self.raop_stream.session.stop())
-            self.raop_stream = None
+            if self.stream.running:
+                self.mass.create_task(self.stream.session.stop())
+            self.stream = None
 
     def _get_sync_clients(self) -> list[AirPlayPlayer]:
         """Get all sync clients for a player."""
diff --git a/music_assistant/providers/airplay/protocols/__init__.py b/music_assistant/providers/airplay/protocols/__init__.py
new file mode 100644 (file)
index 0000000..459329e
--- /dev/null
@@ -0,0 +1 @@
+"""AirPlay (streaming) Protocols."""
diff --git a/music_assistant/providers/airplay/protocols/_protocol.py b/music_assistant/providers/airplay/protocols/_protocol.py
new file mode 100644 (file)
index 0000000..52a5f7d
--- /dev/null
@@ -0,0 +1,218 @@
+"""Base protocol class for AirPlay streaming implementations."""
+
+from __future__ import annotations
+
+import asyncio
+import os
+import time
+from abc import ABC, abstractmethod
+from contextlib import suppress
+from random import randint
+from typing import TYPE_CHECKING
+
+from music_assistant_models.enums import ContentType, PlaybackState
+from music_assistant_models.media_items import AudioFormat
+
+from music_assistant.constants import VERBOSE_LOG_LEVEL
+from music_assistant.helpers.namedpipe import AsyncNamedPipeWriter
+
+if TYPE_CHECKING:
+    from music_assistant_models.player import PlayerMedia
+
+    from music_assistant.helpers.process import AsyncProcess
+    from music_assistant.providers.airplay.player import AirPlayPlayer
+    from music_assistant.providers.airplay.stream_session import AirPlayStreamSession
+
+
+class AirPlayProtocol(ABC):
+    """Base class for AirPlay streaming protocols (RAOP and AirPlay2).
+
+    This class contains common logic shared between protocol implementations,
+    with abstract methods for protocol-specific behavior.
+    """
+
+    # the pcm audio format used for streaming to this protocol
+    pcm_format = AudioFormat(
+        content_type=ContentType.PCM_S16LE, sample_rate=44100, bit_depth=16, channels=2
+    )
+
+    def __init__(
+        self,
+        session: AirPlayStreamSession,
+        player: AirPlayPlayer,
+    ) -> None:
+        """Initialize base AirPlay protocol.
+
+        Args:
+            session: The stream session managing this protocol instance
+            player: The player to stream to
+        """
+        self.session = session
+        self.prov = session.prov
+        self.mass = session.prov.mass
+        self.player = player
+        # Generate unique ID to prevent race conditions with named pipes
+        self.active_remote_id: str = str(randint(1000, 8000))
+        self.prevent_playback: bool = False
+        self._cli_proc: AsyncProcess | None = None
+        # State tracking
+        self._started = asyncio.Event()
+        self._stopped = False
+        self._total_bytes_sent = 0
+        self._stream_bytes_sent = 0
+        self.audio_named_pipe = (
+            f"/tmp/{player.protocol.value}-{self.player.player_id}-{self.active_remote_id}-audio"  # noqa: S108
+        )
+        self.commands_named_pipe = (
+            f"/tmp/{player.protocol.value}-{self.player.player_id}-{self.active_remote_id}-cmd"  # noqa: S108
+        )
+        # Async named pipe writers (kept open for session duration)
+        self._audio_pipe: AsyncNamedPipeWriter | None = None
+        self._commands_pipe: AsyncNamedPipeWriter | None = None
+
+    @property
+    def running(self) -> bool:
+        """Return boolean if this stream is running."""
+        return (
+            not self._stopped
+            and self._started.is_set()
+            and self._cli_proc is not None
+            and not self._cli_proc.closed
+        )
+
+    @abstractmethod
+    async def get_ntp(self) -> int:
+        """Get current NTP timestamp from the CLI binary."""
+
+    @abstractmethod
+    async def start(self, start_ntp: int, skip: int = 0) -> None:
+        """Initialize streaming process for the player.
+
+        Args:
+            start_ntp: NTP timestamp to start streaming
+            skip: Number of seconds to skip (for late joiners)
+        """
+
+    async def _open_pipes(self) -> None:
+        """Open both named pipes in non-blocking mode for async I/O."""
+        # Open audio pipe with buffer size optimization
+        self._audio_pipe = AsyncNamedPipeWriter(self.audio_named_pipe, logger=self.player.logger)
+        await self._audio_pipe.open(increase_buffer=True)
+
+        # Open command pipe (no need to increase buffer for small commands)
+        self._commands_pipe = AsyncNamedPipeWriter(
+            self.commands_named_pipe, logger=self.player.logger
+        )
+        await self._commands_pipe.open(increase_buffer=False)
+
+        self.player.logger.debug("Named pipes opened in non-blocking mode for streaming session")
+
+    async def stop(self) -> None:
+        """Stop playback and cleanup."""
+        # Send stop command before setting _stopped flag
+        await self.send_cli_command("ACTION=STOP")
+
+        self._stopped = True
+
+        # Close named pipes (sends EOF to C side, triggering graceful shutdown)
+        if self._audio_pipe is not None:
+            await self._audio_pipe.close()
+            self._audio_pipe = None
+
+        if self._commands_pipe is not None:
+            await self._commands_pipe.close()
+            self._commands_pipe = None
+
+        # Close the CLI process (wait for it to terminate)
+        if self._cli_proc and not self._cli_proc.closed:
+            await self._cli_proc.close(True)
+
+        self.player.set_state_from_stream(state=PlaybackState.IDLE, elapsed_time=0)
+
+        # Remove named pipes from filesystem
+        with suppress(Exception):
+            await asyncio.to_thread(os.remove, self.audio_named_pipe)
+        with suppress(Exception):
+            await asyncio.to_thread(os.remove, self.commands_named_pipe)
+
+    async def write_chunk(self, chunk: bytes) -> None:
+        """
+        Write a (pcm) audio chunk to the stream.
+
+        Writes one second worth of audio data based on the pcm format.
+        Uses non-blocking I/O with asyncio event loop (no thread pool consumption).
+        """
+        if self._audio_pipe is None or not self._audio_pipe.is_open:
+            return
+
+        pipe_write_start = time.time()
+
+        try:
+            await self._audio_pipe.write(chunk)
+        except TimeoutError as e:
+            # Re-raise with player context
+            raise TimeoutError(f"Player {self.player.player_id}: {e}") from e
+
+        pipe_write_elapsed = time.time() - pipe_write_start
+
+        # Log only truly abnormal pipe writes (>5s indicates a real stall)
+        # Normal writes take ~1s due to pipe rate-limiting to playback speed
+        # Can take up to ~4s if player's latency buffer is full
+        if pipe_write_elapsed > 5.0:
+            self.player.logger.error(
+                "!!! STALLED PIPE WRITE: Player %s took %.3fs to write %d bytes to pipe",
+                self.player.player_id,
+                pipe_write_elapsed,
+                len(chunk),
+            )
+
+    async def write_eof(self) -> None:
+        """Write EOF to signal end of stream."""
+        # default implementation simply closes the named pipe
+        # can be overridden with protocol specific implementation if needed
+        if self._audio_pipe is not None:
+            await self._audio_pipe.close()
+            self._audio_pipe = None
+
+    async def send_cli_command(self, command: str) -> None:
+        """Send an interactive command to the running CLI binary using non-blocking I/O."""
+        if self._stopped or not self._cli_proc or self._cli_proc.closed:
+            return
+        if self._commands_pipe is None or not self._commands_pipe.is_open:
+            return
+
+        await self._started.wait()
+
+        if not command.endswith("\n"):
+            command += "\n"
+
+        self.player.logger.log(VERBOSE_LOG_LEVEL, "sending command %s", command)
+        self.player.last_command_sent = time.time()
+
+        # Write command to pipe
+        data = command.encode("utf-8")
+
+        with suppress(BrokenPipeError):
+            try:
+                # Use shorter timeout for commands (1 second per wait iteration)
+                await self._commands_pipe.write(data, timeout_per_wait=1.0)
+            except TimeoutError:
+                self.player.logger.warning("Command pipe write timeout for %s", command.strip())
+
+    async def send_metadata(self, progress: int | None, metadata: PlayerMedia | None) -> None:
+        """Send metadata to player."""
+        if self._stopped:
+            return
+        if metadata:
+            duration = min(metadata.duration or 0, 3600)
+            title = metadata.title or ""
+            artist = metadata.artist or ""
+            album = metadata.album or ""
+            cmd = f"TITLE={title}\nARTIST={artist}\nALBUM={album}\n"
+            cmd += f"DURATION={duration}\nPROGRESS=0\nACTION=SENDMETA\n"
+            await self.send_cli_command(cmd)
+            # get image
+            if metadata.image_url:
+                await self.send_cli_command(f"ARTWORK={metadata.image_url}\n")
+        if progress is not None:
+            await self.send_cli_command(f"PROGRESS={progress}\n")
diff --git a/music_assistant/providers/airplay/protocols/airplay2.py b/music_assistant/providers/airplay/protocols/airplay2.py
new file mode 100644 (file)
index 0000000..1e67070
--- /dev/null
@@ -0,0 +1,193 @@
+"""Logic for AirPlay 2 audio streaming to AirPlay devices."""
+
+from __future__ import annotations
+
+import asyncio
+import logging
+import os
+import platform
+
+from music_assistant_models.enums import PlaybackState
+from music_assistant_models.errors import PlayerCommandFailed
+
+from music_assistant.constants import CONF_SYNC_ADJUST, VERBOSE_LOG_LEVEL
+from music_assistant.helpers.process import AsyncProcess
+from music_assistant.providers.airplay.constants import (
+    AIRPLAY2_MIN_LOG_LEVEL,
+    CONF_READ_AHEAD_BUFFER,
+)
+from music_assistant.providers.airplay.helpers import get_cli_binary, get_ntp_timestamp
+
+from ._protocol import AirPlayProtocol
+
+
+class AirPlay2Stream(AirPlayProtocol):
+    """
+    AirPlay 2 Audio Streamer.
+
+    Python is not suitable for realtime audio streaming so we do the actual streaming
+    of audio using a small executable written in C based on owntones to do
+    the actual timestamped playback. It reads pcm audio from a named pipe
+    and we can send some interactive commands using another named pipe.
+    """
+
+    _stderr_reader_task: asyncio.Task[None] | None = None
+
+    async def get_ntp(self) -> int:
+        """Get current NTP timestamp."""
+        # TODO!
+        return get_ntp_timestamp()
+
+    @property
+    def _cli_loglevel(self) -> int:
+        """
+        Return a cliap2 aligned loglevel.
+
+        Ensures that minimum level required for required cliap2 stderr output is respected.
+        """
+        force_verbose: bool = False  # just for now
+        mass_level: int = 0
+        match self.prov.logger.level:
+            case logging.CRITICAL:
+                mass_level = 0
+            case logging.ERROR:
+                mass_level = 1
+            case logging.WARNING:
+                mass_level = 2
+            case logging.INFO:
+                mass_level = 3
+            case logging.DEBUG:
+                mass_level = 4
+        if self.prov.logger.isEnabledFor(VERBOSE_LOG_LEVEL):
+            mass_level = 5
+        if force_verbose:
+            mass_level = 5  # always use max log level for now to capture all stderr output
+        return max(mass_level, AIRPLAY2_MIN_LOG_LEVEL)
+
+    async def start(self, start_ntp: int, skip: int = 0) -> None:
+        """Initialize CLI process for a player."""
+        cli_binary = await get_cli_binary(self.player.protocol)
+        assert self.player.discovery_info is not None
+
+        player_id = self.player.player_id
+        sync_adjust = self.mass.config.get_raw_player_config_value(player_id, CONF_SYNC_ADJUST, 0)
+        assert isinstance(sync_adjust, int)
+        read_ahead = await self.mass.config.get_player_config_value(
+            player_id, CONF_READ_AHEAD_BUFFER
+        )
+
+        txt_kv: str = ""
+        for key, value in self.player.discovery_info.decoded_properties.items():
+            txt_kv += f'"{key}={value}" '
+
+        # Note: skip parameter is accepted for API compatibility with base class
+        # but is not currently used by the cliap2 binary (AirPlay2 handles late joiners differently)
+
+        # cliap2 is the binary that handles the actual streaming to the player
+        # this binary leverages from the AirPlay2 support in owntones
+        # https://github.com/music-assistant/cliairplay
+        cli_args = [
+            cli_binary,
+            "--config",
+            os.path.join(os.path.dirname(__file__), "bin", "cliap2.conf"),
+            "--name",
+            self.player.display_name,
+            "--hostname",
+            str(self.player.discovery_info.server),
+            "--address",
+            str(self.player.address),
+            "--port",
+            str(self.player.discovery_info.port),
+            "--txt",
+            txt_kv,
+            "--ntpstart",
+            str(start_ntp),
+            "--latency",
+            str(read_ahead),
+            "--volume",
+            str(self.player.volume_level),
+            "--loglevel",
+            str(self._cli_loglevel),
+            "--pipe",
+            self.audio_named_pipe,
+        ]
+        self.player.logger.debug(
+            "Starting cliap2 process for player %s with args: %s",
+            player_id,
+            cli_args,
+        )
+        self._cli_proc = AsyncProcess(cli_args, stdin=True, stderr=True, name="cliap2")
+        if platform.system() == "Darwin":
+            os.environ["DYLD_LIBRARY_PATH"] = "/usr/local/lib"
+        await self._cli_proc.start()
+        # read up to first num_lines lines of stderr to get the initial status
+        num_lines: int = 50
+        if self.prov.logger.level > logging.INFO:
+            num_lines *= 10
+        for _ in range(num_lines):
+            line = (await self._cli_proc.read_stderr()).decode("utf-8", errors="ignore")
+            self.player.logger.debug(line)
+            if f"airplay: Adding AirPlay device '{self.player.display_name}'" in line:
+                self.player.logger.info("AirPlay device connected. Starting playback.")
+                self._started.set()
+                # Open pipes now that cliraop is ready
+                await self._open_pipes()
+                break
+            if f"The AirPlay 2 device '{self.player.display_name}' failed" in line:
+                raise PlayerCommandFailed("Cannot connect to AirPlay device")
+        # start reading the stderr of the cliap2 process from another task
+        self._stderr_reader_task = self.mass.create_task(self._stderr_reader())
+
+    async def _stderr_reader(self) -> None:
+        """Monitor stderr for the running CLIap2 process."""
+        player = self.player
+        queue = self.mass.players.get_active_queue(player)
+        logger = player.logger
+        lost_packets = 0
+        if not self._cli_proc:
+            return
+        async for line in self._cli_proc.iter_stderr():
+            # TODO @bradkeifer make cliap2 work this way
+            if "elapsed milliseconds:" in line:
+                # this is received more or less every second while playing
+                # millis = int(line.split("elapsed milliseconds: ")[1])
+                # self.player.elapsed_time = (millis / 1000) - self.elapsed_time_correction
+                # self.player.elapsed_time_last_updated = time.time()
+                # NOTE: Metadata is now handled at the session level
+                pass
+            if "set pause" in line or "Pause at" in line:
+                player.set_state_from_stream(state=PlaybackState.PAUSED)
+            if "Restarted at" in line or "restarting w/ pause" in line:
+                player.set_state_from_stream(state=PlaybackState.PLAYING)
+            if "restarting w/o pause" in line:
+                # streaming has started
+                player.set_state_from_stream(state=PlaybackState.PLAYING, elapsed_time=0)
+            if "lost packet out of backlog" in line:
+                lost_packets += 1
+                if lost_packets == 100 and queue:
+                    logger.error("High packet loss detected, restarting playback...")
+                    self.mass.create_task(self.mass.player_queues.resume(queue.queue_id, False))
+                else:
+                    logger.warning("Packet loss detected!")
+            if "end of stream reached" in line:
+                logger.debug("End of stream reached")
+                break
+
+            # log cli stderr output in alignment with mass logging level
+            if "[FATAL]" in line:
+                logger.critical(line)
+            elif "[  LOG]" in line:
+                logger.error(line)
+            elif "[ INFO]" in line:
+                logger.info(line)
+            elif "[ WARN]" in line:
+                logger.warning(line)
+            elif "[DEBUG]" in line:
+                logger.debug(line)
+            elif "[ SPAM]" in line:
+                logger.log(VERBOSE_LOG_LEVEL, line)
+            else:  # for now, log unknown lines as error
+                logger.error(line)
+
+        # ensure we're cleaned up afterwards (this also logs the returncode)
+        await self.stop()
diff --git a/music_assistant/providers/airplay/protocols/raop.py b/music_assistant/providers/airplay/protocols/raop.py
new file mode 100644 (file)
index 0000000..805a6d2
--- /dev/null
@@ -0,0 +1,200 @@
+"""Logic for RAOP audio streaming to AirPlay devices."""
+
+from __future__ import annotations
+
+import asyncio
+import logging
+
+from music_assistant_models.enums import PlaybackState
+from music_assistant_models.errors import PlayerCommandFailed
+
+from music_assistant.constants import CONF_SYNC_ADJUST, VERBOSE_LOG_LEVEL
+from music_assistant.helpers.process import AsyncProcess, check_output
+from music_assistant.providers.airplay.constants import (
+    CONF_ALAC_ENCODE,
+    CONF_ENCRYPTION,
+    CONF_PASSWORD,
+    CONF_READ_AHEAD_BUFFER,
+)
+from music_assistant.providers.airplay.helpers import get_cli_binary
+
+from ._protocol import AirPlayProtocol
+
+
+class RaopStream(AirPlayProtocol):
+    """
+    RAOP (AirPlay 1) Audio Streamer.
+
+    Python is not suitable for realtime audio streaming so we do the actual streaming
+    of (RAOP) audio using a small executable written in C based on libraop to do
+    the actual timestamped playback, which reads pcm audio from stdin
+    and we can send some interactive commands using a named pipe.
+    """
+
+    _stderr_reader_task: asyncio.Task[None] | None = None
+
+    @property
+    def running(self) -> bool:
+        """Return boolean if this stream is running."""
+        return (
+            not self._stopped
+            and self._started.is_set()
+            and self._cli_proc is not None
+            and not self._cli_proc.closed
+        )
+
+    async def get_ntp(self) -> int:
+        """Get current NTP timestamp from the CLI binary."""
+        cli_binary = await get_cli_binary(self.player.protocol)
+        # TODO: we can potentially also just generate this ourselves?
+        self.prov.logger.debug("Getting NTP timestamp from %s CLI binary", self.player.protocol)
+        _, stdout = await check_output(cli_binary, "-ntp")
+        self.prov.logger.debug(f"Output from ntp check: {stdout.decode().strip()}")
+        return int(stdout.strip())
+
+    async def start(self, start_ntp: int, skip: int = 0) -> None:
+        """Initialize CLIRaop process for a player."""
+        assert self.player.discovery_info is not None  # for type checker
+        cli_binary = await get_cli_binary(self.player.protocol)
+
+        extra_args: list[str] = []
+        player_id = self.player.player_id
+        extra_args += ["-if", self.mass.streams.bind_ip]
+        if self.player.config.get_value(CONF_ENCRYPTION, True):
+            extra_args += ["-encrypt"]
+        if self.player.config.get_value(CONF_ALAC_ENCODE, True):
+            extra_args += ["-alac"]
+        for prop in ("et", "md", "am", "pk", "pw"):
+            if prop_value := self.player.discovery_info.decoded_properties.get(prop):
+                extra_args += [f"-{prop}", prop_value]
+        if skip > 0:
+            extra_args += ["-skip", str(skip)]
+        sync_adjust = self.player.config.get_value(CONF_SYNC_ADJUST, 0)
+        assert isinstance(sync_adjust, int)
+        if device_password := self.mass.config.get_raw_player_config_value(
+            player_id, CONF_PASSWORD, None
+        ):
+            extra_args += ["-password", str(device_password)]
+        # Add AirPlay credentials from pyatv pairing if available (for Apple devices)
+        # if raop_credentials := self.player.config.get_value(CONF_AP_CREDENTIALS):
+        #     # pyatv AirPlay credentials are in format "identifier:secret_key:other:data"
+        #     # cliraop expects just the secret_key (2nd part, 64-char hex string) for -secret
+        #     parts = str(raop_credentials).split(":")
+        #     if len(parts) >= 2:
+        #         # Take the second part (index 1) as the secret key
+        #         secret_key = parts[1]
+        #         self.prov.logger.debug(
+        #             "Using AirPlay credentials for %s: id=%s, secret_len=%d, parts=%d",
+        #             self.player.player_id,
+        #             parts[0],
+        #             len(secret_key),
+        #             len(parts),
+        #         )
+        #         extra_args += ["-secret", secret_key]
+        #     else:
+        #         # Fallback: assume it's already just the key
+        #         self.prov.logger.debug(
+        #             "Using AirPlay credentials for %s: single value, length=%d",
+        #             self.player.player_id,
+        #             len(str(raop_credentials)),
+        #         )
+        #         extra_args += ["-secret", str(raop_credentials)]
+        if self.prov.logger.isEnabledFor(logging.DEBUG):
+            extra_args += ["-debug", "5"]
+        elif self.prov.logger.isEnabledFor(VERBOSE_LOG_LEVEL):
+            extra_args += ["-debug", "10"]
+        read_ahead = await self.mass.config.get_player_config_value(
+            player_id, CONF_READ_AHEAD_BUFFER
+        )
+        self.player.logger.info("Starting cliraop with latency buffer: %dms", read_ahead)
+
+        # cliraop is the binary that handles the actual raop streaming to the player
+        # this is a slightly modified version of philippe44's libraop
+        # https://github.com/music-assistant/libraop
+        # we use this intermediate binary to do the actual streaming because attempts to do
+        # so using pure python (e.g. pyatv) were not successful due to the realtime nature
+        cliraop_args = [
+            cli_binary,
+            "-ntpstart",
+            str(start_ntp),
+            "-port",
+            str(self.player.discovery_info.port),
+            "-latency",
+            str(read_ahead),
+            "-volume",
+            str(self.player.volume_level),
+            *extra_args,
+            "-dacp",
+            self.prov.dacp_id,
+            "-activeremote",
+            self.active_remote_id,
+            "-cmdpipe",
+            self.commands_named_pipe,
+            "-udn",
+            self.player.discovery_info.name,
+            self.player.address,
+            self.audio_named_pipe,
+        ]
+        self.player.logger.debug(
+            "Starting cliraop process for player %s with args: %s",
+            self.player.player_id,
+            cliraop_args,
+        )
+        self._cli_proc = AsyncProcess(cliraop_args, stdin=False, stderr=True, name="cliraop")
+        await self._cli_proc.start()
+        # read up to first 50 lines of stderr to get the initial status
+        for _ in range(50):
+            line = (await self._cli_proc.read_stderr()).decode("utf-8", errors="ignore")
+            self.player.logger.debug(line)
+            if "connected to " in line:
+                self.player.logger.info("AirPlay device connected. Starting playback.")
+                self._started.set()
+                # Open pipes now that cliraop is ready
+                await self._open_pipes()
+                break
+            if "Cannot connect to AirPlay device" in line:
+                raise PlayerCommandFailed("Cannot connect to AirPlay device")
+        # repeat sending the volume level to the player because some players seem
+        # to ignore it the first time
+        # https://github.com/music-assistant/support/issues/3330
+        await self.send_cli_command(f"VOLUME={self.player.volume_level}\n")
+        # start reading the stderr of the cliraop process from another task
+        self._stderr_reader_task = self.mass.create_task(self._stderr_reader())
+
+    async def _stderr_reader(self) -> None:
+        """Monitor stderr for the running CLIRaop process."""
+        player = self.player
+        logger = player.logger
+        lost_packets = 0
+        if not self._cli_proc:
+            return
+        async for line in self._cli_proc.iter_stderr():
+            if "elapsed milliseconds:" in line:
+                # this is received more or less every second while playing
+                # millis = int(line.split("elapsed milliseconds: ")[1])
+                # self.player.elapsed_time = (millis / 1000) - self.elapsed_time_correction
+                # self.player.elapsed_time_last_updated = time.time()
+                logger.log(VERBOSE_LOG_LEVEL, line)
+                continue
+            if "set pause" in line or "Pause at" in line:
+                player.set_state_from_stream(state=PlaybackState.PAUSED)
+            if "Restarted at" in line or "restarting w/ pause" in line:
+                player.set_state_from_stream(state=PlaybackState.PLAYING)
+            if "restarting w/o pause" in line:
+                # streaming has started
+                player.set_state_from_stream(state=PlaybackState.PLAYING, elapsed_time=0)
+            if "lost packet out of backlog" in line:
+                lost_packets += 1
+                if lost_packets == 100:
+                    logger.error("High packet loss detected, restarting playback...")
+                    self.mass.create_task(self.mass.players.cmd_resume(self.player.player_id))
+                else:
+                    logger.warning("Packet loss detected!")
+            if "end of stream reached" in line:
+                logger.debug("End of stream reached")
+                break
+            logger.debug(line)
+
+        # ensure we're cleaned up afterwards (this also logs the returncode)
+        logger.debug("CLIRaop stderr reader ended")
+        await self.stop()
index feaa9397204053b5d33607298bf051eae464eef5..73cdafa37cdc46ad72a47c4106c0b1f2f8eaff33 100644 (file)
@@ -12,16 +12,20 @@ from zeroconf import ServiceStateChange
 from zeroconf.asyncio import AsyncServiceInfo
 
 from music_assistant.helpers.datetime import utc
-from music_assistant.helpers.util import get_ip_pton, select_free_port
+from music_assistant.helpers.util import (
+    get_ip_pton,
+    get_primary_ip_address_from_zeroconf,
+    select_free_port,
+)
 from music_assistant.models.player_provider import PlayerProvider
 
-from .constants import CACHE_CATEGORY_PREV_VOLUME, CONF_IGNORE_VOLUME, FALLBACK_VOLUME
-from .helpers import (
-    convert_airplay_volume,
-    get_cliraop_binary,
-    get_model_info,
-    get_primary_ip_address_from_zeroconf,
+from .constants import (
+    CACHE_CATEGORY_PREV_VOLUME,
+    CONF_IGNORE_VOLUME,
+    DACP_DISCOVERY_TYPE,
+    FALLBACK_VOLUME,
 )
+from .helpers import convert_airplay_volume, get_model_info
 from .player import AirPlayPlayer
 
 # TODO: AirPlay provider
@@ -36,14 +40,11 @@ from .player import AirPlayPlayer
 class AirPlayProvider(PlayerProvider):
     """Player provider for AirPlay based players."""
 
-    cliraop_bin: str | None
     _dacp_server: asyncio.Server
     _dacp_info: AsyncServiceInfo
 
     async def handle_async_init(self) -> None:
         """Handle async initialization of the provider."""
-        # we locate the cliraop binary here, so we can fail early if it is not available
-        self.cliraop_bin: str | None = await get_cliraop_binary()
         # register DACP zeroconf service
         dacp_port = await select_free_port(39831, 49831)
         self.dacp_id = dacp_id = f"{randrange(2**64):X}"
@@ -51,10 +52,9 @@ class AirPlayProvider(PlayerProvider):
         self._dacp_server = await asyncio.start_server(
             self._handle_dacp_request, "0.0.0.0", dacp_port
         )
-        zeroconf_type = "_dacp._tcp.local."
-        server_id = f"iTunes_Ctrl_{dacp_id}.{zeroconf_type}"
+        server_id = f"iTunes_Ctrl_{dacp_id}.{DACP_DISCOVERY_TYPE}"
         self._dacp_info = AsyncServiceInfo(
-            zeroconf_type,
+            DACP_DISCOVERY_TYPE,
             name=server_id,
             addresses=[await get_ip_pton(str(self.mass.streams.publish_ip))],
             port=dacp_port,
@@ -101,7 +101,6 @@ class AirPlayProvider(PlayerProvider):
             # update the latest discovery info for existing player
             player.set_discovery_info(info, display_name)
             return
-        # handle new player
         await self._setup_player(player_id, display_name, info)
 
     async def unload(self, is_removed: bool = False) -> None:
@@ -117,11 +116,20 @@ class AirPlayProvider(PlayerProvider):
         self, player_id: str, display_name: str, discovery_info: AsyncServiceInfo
     ) -> None:
         """Handle setup of a new player that is discovered using mdns."""
-        # prefer airplay mdns info as it has more details
-        # fallback to raop info if airplay info is not available
-        airplay_info = AsyncServiceInfo(
-            "_airplay._tcp.local.", discovery_info.name.split("@")[-1].replace("_raop", "_airplay")
-        )
+        if discovery_info.type == "_raop._tcp.local.":
+            # RAOP service discovered
+            self.logger.debug("Discovered RAOP service for %s", display_name)
+            # always prefer airplay mdns info as it has more details
+            # fallback to raop info if airplay info is not available,
+            # (old device only announcing raop)
+            airplay_info = AsyncServiceInfo(
+                "_airplay._tcp.local.",
+                discovery_info.name.split("@")[-1].replace("_raop", "_airplay"),
+            )
+        else:
+            # AirPlay service discovered
+            self.logger.debug("Discovered AirPlay service for %s", display_name)
+            airplay_info = discovery_info
         if await airplay_info.async_request(self.mass.aiozc.zeroconf, 3000):
             manufacturer, model = get_model_info(airplay_info)
         else:
@@ -131,27 +139,15 @@ class AirPlayProvider(PlayerProvider):
             self.logger.debug("Ignoring %s in discovery as it is disabled.", display_name)
             return
 
-        if "apple tv" in model.lower():
-            # For now, we ignore the Apple TV until we implement the authentication.
-            # maybe we can simply use pyatv only for this part?
-            # the cliraop application has already been prepared to accept the secret.
-            self.logger.info(
-                "Ignoring %s in discovery because it is not yet supported.", display_name
-            )
-            return
-
         address = get_primary_ip_address_from_zeroconf(discovery_info)
         if not address:
             return  # should not happen, but guard just in case
+        if not discovery_info:
+            return  # should not happen, but guard just in case
 
         # if we reach this point, all preflights are ok and we can create the player
         self.logger.debug("Discovered AirPlay device %s on %s", display_name, address)
 
-        # append airplay to the default display name for generic (non-apple) devices
-        # this makes it easier for users to distinguish between airplay and non-airplay devices
-        if manufacturer.lower() != "apple" and "airplay" not in display_name.lower():
-            display_name += " (AirPlay)"
-
         # Get volume from cache
         if not (
             volume := await self.mass.cache.get(
@@ -160,6 +156,21 @@ class AirPlayProvider(PlayerProvider):
         ):
             volume = FALLBACK_VOLUME
 
+        # Append airplay to the default name for non-apple devices
+        # to make it easier for users to distinguish
+        is_apple = manufacturer.lower() == "apple"
+        if not is_apple and "airplay" not in display_name.lower():
+            display_name += " (AirPlay)"
+
+        self.logger.debug(
+            "Setting up player %s: manufacturer=%s, model=%s",
+            display_name,
+            manufacturer,
+            model,
+        )
+
+        # Create single AirPlayPlayer for all devices
+        # Pairing config entries will be shown conditionally based on device type
         player = AirPlayPlayer(
             provider=self,
             player_id=player_id,
@@ -170,6 +181,15 @@ class AirPlayProvider(PlayerProvider):
             model=model,
             initial_volume=volume,
         )
+
+        # Final check before registration to handle race conditions
+        # (multiple MDNS events processed in parallel for same device)
+        if self.mass.players.get(player_id):
+            self.logger.debug(
+                "Player %s already registered during setup, skipping registration", player_id
+            )
+            return
+
         await self.mass.players.register(player)
 
     async def _handle_dacp_request(  # noqa: PLR0915
@@ -204,17 +224,17 @@ class AirPlayProvider(PlayerProvider):
             active_remote = headers.get("Active-Remote")
             _, path, _ = headers_split[0].split(" ")
             # lookup airplay player by active remote id
-            player = next(
+            player: AirPlayPlayer | None = next(
                 (
                     x
                     for x in self.get_players()
-                    if x.raop_stream and x.raop_stream.active_remote_id == active_remote
+                    if x.stream and x.stream.active_remote_id == active_remote
                 ),
                 None,
             )
             self.logger.debug(
                 "DACP request for %s (%s): %s -- %s",
-                player.discovery_info.name if player else "UNKNOWN PLAYER",
+                player.name if player else "UNKNOWN PLAYER",
                 active_remote,
                 path,
                 body,
@@ -269,8 +289,8 @@ class AirPlayProvider(PlayerProvider):
                 # we've sent or the device requesting a new volume itself.
                 # In case of a small rounding difference, we ignore this,
                 # to prevent an endless pingpong of volume changes
-                raop_volume = float(path.split("dmcp.device-volume=", 1)[-1])
-                volume = convert_airplay_volume(raop_volume)
+                airplay_volume = float(path.split("dmcp.device-volume=", 1)[-1])
+                volume = convert_airplay_volume(airplay_volume)
                 player.update_volume_from_device(volume)
             elif "dmcp.volume=" in path:
                 # volume change request from device (e.g. volume buttons)
@@ -278,13 +298,13 @@ class AirPlayProvider(PlayerProvider):
                 player.update_volume_from_device(volume)
             elif "device-prevent-playback=1" in path:
                 # device switched to another source (or is powered off)
-                if raop_stream := player.raop_stream:
-                    raop_stream.prevent_playback = True
-                    self.mass.create_task(player.raop_stream.session.remove_client(player))
+                if stream := player.stream:
+                    stream.prevent_playback = True
+                    self.mass.create_task(player.stream.session.remove_client(player))
             elif "device-prevent-playback=0" in path:
                 # device reports that its ready for playback again
-                if raop_stream := player.raop_stream:
-                    raop_stream.prevent_playback = False
+                if stream := player.stream:
+                    stream.prevent_playback = False
 
             # send response
             date_str = utc().strftime("%a, %-d %b %Y %H:%M:%S")
diff --git a/music_assistant/providers/airplay/raop.py b/music_assistant/providers/airplay/raop.py
deleted file mode 100644 (file)
index 50ae76f..0000000
+++ /dev/null
@@ -1,473 +0,0 @@
-"""Logic for RAOP (AirPlay 1) audio streaming to AirPlay devices."""
-
-from __future__ import annotations
-
-import asyncio
-import logging
-import time
-from collections.abc import AsyncGenerator
-from contextlib import suppress
-from random import randint
-from typing import TYPE_CHECKING
-
-from music_assistant_models.enums import PlaybackState
-from music_assistant_models.errors import PlayerCommandFailed
-
-from music_assistant.constants import CONF_SYNC_ADJUST, VERBOSE_LOG_LEVEL
-from music_assistant.helpers.audio import get_chunksize, get_player_filter_params
-from music_assistant.helpers.ffmpeg import FFMpeg
-from music_assistant.helpers.process import AsyncProcess, check_output
-from music_assistant.helpers.util import TaskManager, close_async_generator
-
-from .constants import (
-    AIRPLAY_PCM_FORMAT,
-    CONF_ALAC_ENCODE,
-    CONF_ENCRYPTION,
-    CONF_PASSWORD,
-    CONF_READ_AHEAD_BUFFER,
-)
-
-if TYPE_CHECKING:
-    from music_assistant_models.media_items import AudioFormat
-
-    from .player import AirPlayPlayer
-    from .provider import AirPlayProvider
-
-
-class RaopStreamSession:
-    """Object that holds the details of a (RAOP) stream session to one or more players."""
-
-    def __init__(
-        self,
-        airplay_provider: AirPlayProvider,
-        sync_clients: list[AirPlayPlayer],
-        input_format: AudioFormat,
-        audio_source: AsyncGenerator[bytes, None],
-    ) -> None:
-        """Initialize RaopStreamSession."""
-        assert sync_clients
-        self.prov = airplay_provider
-        self.mass = airplay_provider.mass
-        self.input_format = input_format
-        self.sync_clients = sync_clients
-        self._audio_source = audio_source
-        self._audio_source_task: asyncio.Task[None] | None = None
-        self._lock = asyncio.Lock()
-
-    async def start(self) -> None:
-        """Initialize RaopStreamSession."""
-        # initialize raop stream for all players
-
-        # get current ntp and start RaopStream per player
-        assert self.prov.cliraop_bin
-        _, stdout = await check_output(self.prov.cliraop_bin, "-ntp")
-        start_ntp = int(stdout.strip())
-        wait_start = 1750 + (250 * len(self.sync_clients))
-
-        async def _start_client(raop_player: AirPlayPlayer) -> None:
-            # stop existing stream if running
-            if raop_player.raop_stream and raop_player.raop_stream.running:
-                await raop_player.raop_stream.stop()
-
-            raop_player.raop_stream = RaopStream(self, raop_player)
-            await raop_player.raop_stream.start(start_ntp, wait_start)
-
-        async with TaskManager(self.mass) as tm:
-            for _raop_player in self.sync_clients:
-                tm.create_task(_start_client(_raop_player))
-        self._audio_source_task = asyncio.create_task(self._audio_streamer())
-
-    async def stop(self) -> None:
-        """Stop playback and cleanup."""
-        if self._audio_source_task and not self._audio_source_task.done():
-            self._audio_source_task.cancel()
-            with suppress(asyncio.CancelledError):
-                await self._audio_source_task
-        await asyncio.gather(
-            *[self.remove_client(x) for x in self.sync_clients],
-            return_exceptions=True,
-        )
-
-    async def remove_client(self, airplay_player: AirPlayPlayer) -> None:
-        """Remove a sync client from the session."""
-        if airplay_player not in self.sync_clients:
-            return
-        assert airplay_player.raop_stream
-        assert airplay_player.raop_stream.session == self
-        async with self._lock:
-            self.sync_clients.remove(airplay_player)
-        await airplay_player.raop_stream.stop()
-        airplay_player.raop_stream = None
-        # if this was the last client, stop the session
-        if not self.sync_clients:
-            await self.stop()
-            return
-
-    async def add_client(self, airplay_player: AirPlayPlayer) -> None:
-        """Add a sync client to the session."""
-        # TODO: Add the ability to add a new client to an existing session
-        # e.g. by counting the number of frames sent etc.
-
-        # temp solution: just restart the whole playback session when new client(s) join
-        sync_leader = self.sync_clients[0]
-        if not sync_leader.raop_stream or not sync_leader.raop_stream.running:
-            return
-
-        await self.stop()  # we need to stop the current session to add a new client
-        # this could potentially be called by multiple players at the exact same time
-        # so we debounce the resync a bit here with a timer
-        if sync_leader.current_media:
-            self.mass.call_later(
-                0.5,
-                self.mass.players.cmd_resume(sync_leader.player_id),
-                task_id=f"resync_session_{sync_leader.player_id}",
-            )
-
-    async def replace_stream(self, audio_source: AsyncGenerator[bytes, None]) -> None:
-        """Replace the audio source of the stream."""
-        # cancel the current audio source task
-        assert self._audio_source_task  # for type checker
-        self._audio_source_task.cancel()
-        with suppress(asyncio.CancelledError, RuntimeError):
-            await self._audio_source_task
-        # set new audio source and restart the stream
-        self._audio_source = audio_source
-        self._audio_source_task = asyncio.create_task(self._audio_streamer())
-        # restart the (player-specific) ffmpeg stream for all players
-        # this is the easiest way to ensure the new audio source is used
-        # as quickly as possible, without waiting for the buffers to be drained
-        # it also allows to change the player settings such as DSP on the fly
-        for sync_client in self.sync_clients:
-            if not sync_client.raop_stream:
-                continue  # guard
-            sync_client.raop_stream.start_ffmpeg_stream()
-
-    async def _audio_streamer(self) -> None:
-        """Stream audio to all players."""
-        generator_exhausted = False
-        try:
-            async for chunk in self._audio_source:
-                async with self._lock:
-                    sync_clients = [
-                        x for x in self.sync_clients if x.raop_stream and x.raop_stream.running
-                    ]
-                    if not sync_clients:
-                        return
-                    await asyncio.gather(
-                        *[x.raop_stream.write_chunk(chunk) for x in sync_clients if x.raop_stream],
-                        return_exceptions=True,
-                    )
-            # entire stream consumed: send EOF
-            generator_exhausted = True
-            async with self._lock:
-                await asyncio.gather(
-                    *[
-                        x.raop_stream.write_eof()
-                        for x in self.sync_clients
-                        if x.raop_stream and x.raop_stream.running
-                    ],
-                    return_exceptions=True,
-                )
-        except Exception as err:
-            logger = self.prov.logger
-            logger.error(
-                "Stream error: %s",
-                str(err) or err.__class__.__name__,
-                exc_info=err if logger.isEnabledFor(logging.DEBUG) else None,
-            )
-            raise
-        finally:
-            if not generator_exhausted:
-                await close_async_generator(self._audio_source)
-
-
-class RaopStream:
-    """
-    RAOP (AirPlay 1) Audio Streamer.
-
-    Python is not suitable for realtime audio streaming so we do the actual streaming
-    of (RAOP) audio using a small executable written in C based on libraop to do
-    the actual timestamped playback, which reads pcm audio from stdin
-    and we can send some interactive commands using a named pipe.
-    """
-
-    def __init__(
-        self,
-        session: RaopStreamSession,
-        player: AirPlayPlayer,
-    ) -> None:
-        """Initialize RaopStream."""
-        self.session = session
-        self.prov = session.prov
-        self.mass = session.prov.mass
-        self.player = player
-
-        # always generate a new active remote id to prevent race conditions
-        # with the named pipe used to send audio
-        self.active_remote_id: str = str(randint(1000, 8000))
-        self.prevent_playback: bool = False
-        self._stderr_reader_task: asyncio.Task[None] | None = None
-        self._cliraop_proc: AsyncProcess | None = None
-        self._ffmpeg_proc: AsyncProcess | None = None
-        self._ffmpeg_reader_task: asyncio.Task[None] | None = None
-        self._started = asyncio.Event()
-        self._stopped = False
-        self._total_bytes_sent = 0
-        self._stream_bytes_sent = 0
-
-    @property
-    def running(self) -> bool:
-        """Return boolean if this stream is running."""
-        return (
-            not self._stopped
-            and self._started.is_set()
-            and self._cliraop_proc is not None
-            and not self._cliraop_proc.closed
-        )
-
-    async def start(self, start_ntp: int, wait_start: int = 1000) -> None:
-        """Initialize CLIRaop process for a player."""
-        assert self.prov.cliraop_bin
-        extra_args: list[str] = []
-        player_id = self.player.player_id
-        extra_args += ["-if", self.mass.streams.bind_ip]
-        if self.mass.config.get_raw_player_config_value(player_id, CONF_ENCRYPTION, False):
-            extra_args += ["-encrypt"]
-        if self.mass.config.get_raw_player_config_value(player_id, CONF_ALAC_ENCODE, True):
-            extra_args += ["-alac"]
-        for prop in ("et", "md", "am", "pk", "pw"):
-            if prop_value := self.player.discovery_info.decoded_properties.get(prop):
-                extra_args += [f"-{prop}", prop_value]
-        sync_adjust = self.mass.config.get_raw_player_config_value(player_id, CONF_SYNC_ADJUST, 0)
-        assert isinstance(sync_adjust, int)
-        if device_password := self.mass.config.get_raw_player_config_value(
-            player_id, CONF_PASSWORD, None
-        ):
-            extra_args += ["-password", str(device_password)]
-        if self.prov.logger.isEnabledFor(logging.DEBUG):
-            extra_args += ["-debug", "5"]
-        elif self.prov.logger.isEnabledFor(VERBOSE_LOG_LEVEL):
-            extra_args += ["-debug", "10"]
-        read_ahead = await self.mass.config.get_player_config_value(
-            player_id, CONF_READ_AHEAD_BUFFER
-        )
-        # ffmpeg handles the player specific stream + filters and pipes
-        # audio to the cliraop process
-        self.start_ffmpeg_stream()
-
-        # cliraop is the binary that handles the actual raop streaming to the player
-        # this is a slightly modified version of philippe44's libraop
-        # https://github.com/music-assistant/libraop
-        # we use this intermediate binary to do the actual streaming because attempts to do
-        # so using pure python (e.g. pyatv) were not successful due to the realtime nature
-        # TODO: Either enhance libraop with airplay 2 support or find a better alternative
-        cliraop_args = [
-            self.prov.cliraop_bin,
-            "-ntpstart",
-            str(start_ntp),
-            "-port",
-            str(self.player.discovery_info.port),
-            "-wait",
-            str(wait_start - sync_adjust),
-            "-latency",
-            str(read_ahead),
-            "-volume",
-            str(self.player.volume_level),
-            *extra_args,
-            "-dacp",
-            self.prov.dacp_id,
-            "-activeremote",
-            self.active_remote_id,
-            "-udn",
-            self.player.discovery_info.name,
-            self.player.address,
-            "-",
-        ]
-        self._cliraop_proc = AsyncProcess(cliraop_args, stdin=True, stderr=True, name="cliraop")
-        await self._cliraop_proc.start()
-        # read first 20 lines of stderr to get the initial status
-        for _ in range(20):
-            line = (await self._cliraop_proc.read_stderr()).decode("utf-8", errors="ignore")
-            self.player.logger.debug(line)
-            if "connected to " in line:
-                self._started.set()
-                break
-            if "Cannot connect to AirPlay device" in line:
-                if self._ffmpeg_reader_task:
-                    self._ffmpeg_reader_task.cancel()
-                raise PlayerCommandFailed("Cannot connect to AirPlay device")
-        # repeat sending the volume level to the player because some players seem
-        # to ignore it the first time
-        # https://github.com/music-assistant/support/issues/3330
-        await self.send_cli_command(f"VOLUME={self.player.volume_level}\n")
-        # start reading the stderr of the cliraop process from another task
-        self._stderr_reader_task = self.mass.create_task(self._stderr_reader())
-
-    async def stop(self) -> None:
-        """Stop playback and cleanup."""
-        await self.send_cli_command("ACTION=STOP")
-        self._stopped = True
-        if self._stderr_reader_task and not self._stderr_reader_task.done():
-            self._stderr_reader_task.cancel()
-        if self._ffmpeg_reader_task and not self._ffmpeg_reader_task.done():
-            self._ffmpeg_reader_task.cancel()
-        if self._cliraop_proc and not self._cliraop_proc.closed:
-            await self._cliraop_proc.close(True)
-        if self._ffmpeg_proc and not self._ffmpeg_proc.closed:
-            await self._ffmpeg_proc.close(True)
-        self.player.set_state_from_raop(state=PlaybackState.IDLE, elapsed_time=0)
-
-    async def write_chunk(self, chunk: bytes) -> None:
-        """Write a (pcm) audio chunk."""
-        if self._stopped:
-            raise RuntimeError("Stream is already stopped")
-        await self._started.wait()
-        assert self._ffmpeg_proc
-        await self._ffmpeg_proc.write(chunk)
-
-    async def write_eof(self) -> None:
-        """Write EOF."""
-        if self._stopped:
-            raise RuntimeError("Stream is already stopped")
-        await self._started.wait()
-        assert self._ffmpeg_proc
-        await self._ffmpeg_proc.write_eof()
-
-    async def send_cli_command(self, command: str) -> None:
-        """Send an interactive command to the running CLIRaop binary."""
-        if self._stopped or not self._cliraop_proc or self._cliraop_proc.closed:
-            return
-        await self._started.wait()
-
-        if not command.endswith("\n"):
-            command += "\n"
-
-        def send_data() -> None:
-            with suppress(BrokenPipeError), open(named_pipe, "w") as f:
-                f.write(command)
-
-        named_pipe = f"/tmp/raop-{self.active_remote_id}"  # noqa: S108
-        self.player.logger.log(VERBOSE_LOG_LEVEL, "sending command %s", command)
-        self.player.last_command_sent = time.time()
-        await asyncio.to_thread(send_data)
-
-    def start_ffmpeg_stream(self) -> None:
-        """Start (or replace) the player-specific ffmpeg stream to feed cliraop."""
-        # cancel existing ffmpeg reader task
-        if self._ffmpeg_reader_task and not self._ffmpeg_reader_task.done():
-            self._ffmpeg_reader_task.cancel()
-        if self._ffmpeg_proc and not self._ffmpeg_proc.closed:
-            self.mass.create_task(self._ffmpeg_proc.close(True))
-        # start new ffmpeg reader task
-        self._ffmpeg_reader_task = self.mass.create_task(self._ffmpeg_reader())
-
-    async def _ffmpeg_reader(self) -> None:
-        """Read audio from the audio source and pipe it to the CLIRaop process."""
-        self._ffmpeg_proc = FFMpeg(
-            audio_input="-",
-            input_format=self.session.input_format,
-            output_format=AIRPLAY_PCM_FORMAT,
-            filter_params=get_player_filter_params(
-                self.mass,
-                self.player.player_id,
-                self.session.input_format,
-                AIRPLAY_PCM_FORMAT,
-            ),
-        )
-        self._stream_bytes_sent = 0
-        await self._ffmpeg_proc.start()
-        chunksize = get_chunksize(AIRPLAY_PCM_FORMAT)
-        # wait for cliraop to be ready
-        await asyncio.wait_for(self._started.wait(), 20)
-        async for chunk in self._ffmpeg_proc.iter_chunked(chunksize):
-            if self._stopped:
-                break
-            if not self._cliraop_proc or self._cliraop_proc.closed:
-                break
-            await self._cliraop_proc.write(chunk)
-            self._stream_bytes_sent += len(chunk)
-            self._total_bytes_sent += len(chunk)
-            del chunk
-            # we base elapsed time on the amount of bytes sent
-            # so we can account for reusing the same session for multiple streams
-            self.player.set_state_from_raop(
-                elapsed_time=self._stream_bytes_sent / chunksize,
-            )
-        # if we reach this point, the process exited, most likely because the stream ended
-        if self._cliraop_proc and not self._cliraop_proc.closed:
-            await self._cliraop_proc.write_eof()
-
-    async def _stderr_reader(self) -> None:
-        """Monitor stderr for the running CLIRaop process."""
-        player = self.player
-        logger = player.logger
-        lost_packets = 0
-        prev_metadata_checksum: str = ""
-        prev_progress_report: float = 0
-        if not self._cliraop_proc:
-            return
-        async for line in self._cliraop_proc.iter_stderr():
-            if "elapsed milliseconds:" in line:
-                # this is received more or less every second while playing
-                # millis = int(line.split("elapsed milliseconds: ")[1])
-                # self.player.elapsed_time = (millis / 1000) - self.elapsed_time_correction
-                # self.player.elapsed_time_last_updated = time.time()
-                # send metadata to player(s) if needed
-                # NOTE: this must all be done in separate tasks to not disturb audio
-                now = time.time()
-                if (player.elapsed_time or 0) > 2 and player.current_media:
-                    metadata_checksum = f"{player.current_media.uri}.{player.current_media.title}.{player.current_media.image_url}"  # noqa: E501
-                    if prev_metadata_checksum != metadata_checksum:
-                        prev_metadata_checksum = metadata_checksum
-                        prev_progress_report = now
-                        self.mass.create_task(self._send_metadata())
-                    # send the progress report every 5 seconds
-                    elif now - prev_progress_report >= 5:
-                        prev_progress_report = now
-                        self.mass.create_task(self._send_progress())
-            if "set pause" in line or "Pause at" in line:
-                player.set_state_from_raop(state=PlaybackState.PAUSED)
-            if "Restarted at" in line or "restarting w/ pause" in line:
-                player.set_state_from_raop(state=PlaybackState.PLAYING)
-            if "restarting w/o pause" in line:
-                # streaming has started
-                player.set_state_from_raop(state=PlaybackState.PLAYING, elapsed_time=0)
-            if "lost packet out of backlog" in line:
-                lost_packets += 1
-                if lost_packets == 100:
-                    logger.error("High packet loss detected, restarting playback...")
-                    self.mass.create_task(self.mass.players.cmd_resume(self.player.player_id))
-                else:
-                    logger.warning("Packet loss detected!")
-            if "end of stream reached" in line:
-                logger.debug("End of stream reached")
-                break
-            logger.log(VERBOSE_LOG_LEVEL, line)
-
-        # ensure we're cleaned up afterwards (this also logs the returncode)
-        await self.stop()
-
-    async def _send_metadata(self) -> None:
-        """Send metadata to player (and connected sync childs)."""
-        if not self.player or not self.player.current_media or self._stopped:
-            return
-        duration = min(self.player.current_media.duration or 0, 3600)
-        title = self.player.current_media.title or ""
-        artist = self.player.current_media.artist or ""
-        album = self.player.current_media.album or ""
-        cmd = f"TITLE={title}\nARTIST={artist}\nALBUM={album}\n"
-        cmd += f"DURATION={duration}\nPROGRESS=0\nACTION=SENDMETA\n"
-
-        await self.send_cli_command(cmd)
-
-        # get image
-        if not self.player.current_media.image_url or self._stopped:
-            return
-        await self.send_cli_command(f"ARTWORK={self.player.current_media.image_url}\n")
-
-    async def _send_progress(self) -> None:
-        """Send progress report to player (and connected sync childs)."""
-        if not self.player.current_media or self._stopped:
-            return
-        progress = int(self.player.corrected_elapsed_time or 0)
-        await self.send_cli_command(f"PROGRESS={progress}\n")
diff --git a/music_assistant/providers/airplay/stream_session.py b/music_assistant/providers/airplay/stream_session.py
new file mode 100644 (file)
index 0000000..5e89793
--- /dev/null
@@ -0,0 +1,418 @@
+"""Unified AirPlay/RAOP stream session logic for AirPlay devices."""
+
+from __future__ import annotations
+
+import asyncio
+import logging
+import time
+from collections.abc import AsyncGenerator
+from contextlib import suppress
+from typing import TYPE_CHECKING
+
+from music_assistant.helpers.audio import get_player_filter_params
+from music_assistant.helpers.ffmpeg import FFMpeg
+from music_assistant.helpers.util import TaskManager, close_async_generator
+from music_assistant.providers.airplay.helpers import unix_time_to_ntp
+
+from .constants import CONF_ENABLE_LATE_JOIN, StreamingProtocol
+from .protocols.airplay2 import AirPlay2Stream
+from .protocols.raop import RaopStream
+
+if TYPE_CHECKING:
+    from music_assistant_models.media_items import AudioFormat
+    from music_assistant_models.player import PlayerMedia
+
+    from .player import AirPlayPlayer
+    from .provider import AirPlayProvider
+
+
+class AirPlayStreamSession:
+    """Stream session (RAOP or AirPlay2) to one or more players."""
+
+    def __init__(
+        self,
+        airplay_provider: AirPlayProvider,
+        sync_clients: list[AirPlayPlayer],
+        pcm_format: AudioFormat,
+        audio_source: AsyncGenerator[bytes, None],
+    ) -> None:
+        """Initialize AirPlayStreamSession.
+
+        Args:
+            airplay_provider: The AirPlay provider instance
+            sync_clients: List of AirPlay players to stream to
+            pcm_format: PCM format of the input stream
+            audio_source: Async generator yielding audio chunks
+        """
+        assert sync_clients
+        self.prov = airplay_provider
+        self.mass = airplay_provider.mass
+        self.pcm_format = pcm_format
+        self.sync_clients = sync_clients
+        self._audio_source = audio_source
+        self._audio_source_task: asyncio.Task[None] | None = None
+        self._player_ffmpeg: dict[str, FFMpeg] = {}
+        self._player_start_chunk: dict[str, int] = {}  # Chunk number when player joined
+        self._lock = asyncio.Lock()
+        self.start_ntp: int = 0
+        self.start_time: float = 0.0
+        self.chunks_streamed: int = 0  # Total chunks sent to session (each chunk = 1 second)
+
+    async def start(self) -> None:
+        """Initialize stream session for all players."""
+        # Get current NTP timestamp and calculate wait time
+        cur_time = time.time()
+        wait_start = 1750 + (250 * len(self.sync_clients))  # in milliseconds
+        wait_start_seconds = wait_start / 1000
+        self.wait_start = wait_start_seconds  # in seconds
+        self.start_time = cur_time + wait_start_seconds
+        self.start_ntp = unix_time_to_ntp(self.start_time)
+
+        self.prov.logger.info(
+            "Starting stream session with %d clients",
+            len(self.sync_clients),
+        )
+
+        async def _start_client(airplay_player: AirPlayPlayer) -> None:
+            """Start stream for a single client."""
+            # Stop existing stream if running
+            if airplay_player.stream and airplay_player.stream.running:
+                await airplay_player.stream.stop()
+            if ffmpeg := self._player_ffmpeg.pop(airplay_player.player_id, None):
+                await ffmpeg.close()
+                del ffmpeg
+
+            self._player_start_chunk[airplay_player.player_id] = 1
+
+            # Create appropriate stream type based on protocol
+            if airplay_player.protocol == StreamingProtocol.AIRPLAY2:
+                airplay_player.stream = AirPlay2Stream(self, airplay_player)
+            else:
+                airplay_player.stream = RaopStream(self, airplay_player)
+
+            # create optional FFMpeg instance per player if needed
+            # this is used to do any optional DSP processing/filtering
+            filter_params = get_player_filter_params(
+                self.mass,
+                airplay_player.player_id,
+                self.pcm_format,
+                airplay_player.stream.pcm_format,
+            )
+            if filter_params or self.pcm_format != airplay_player.stream.pcm_format:
+                ffmpeg = FFMpeg(
+                    audio_input="-",
+                    input_format=self.pcm_format,
+                    output_format=airplay_player.stream.pcm_format,
+                    filter_params=filter_params,
+                )
+                await ffmpeg.start()
+                self._player_ffmpeg[airplay_player.player_id] = ffmpeg
+
+            await airplay_player.stream.start(self.start_ntp)
+
+            # Tracking will be initialized on first write
+
+        async with TaskManager(self.mass) as tm:
+            for _airplay_player in self.sync_clients:
+                tm.create_task(_start_client(_airplay_player))
+        # Start audio source streamer task
+        # this will read from the audio source and distribute to all players
+        self._audio_source_task = asyncio.create_task(self._audio_streamer())
+
+    async def stop(self) -> None:
+        """Stop playback and cleanup."""
+        if self._audio_source_task and not self._audio_source_task.done():
+            self._audio_source_task.cancel()
+            with suppress(asyncio.CancelledError):
+                await self._audio_source_task
+        await asyncio.gather(
+            *[self.remove_client(x) for x in self.sync_clients],
+            return_exceptions=True,
+        )
+
+    async def remove_client(self, airplay_player: AirPlayPlayer) -> None:
+        """Remove a sync client from the session."""
+        if airplay_player not in self.sync_clients:
+            return
+        assert airplay_player.stream
+        assert airplay_player.stream.session == self
+        async with self._lock:
+            self.sync_clients.remove(airplay_player)
+        if ffmpeg := self._player_ffmpeg.pop(airplay_player.player_id, None):
+            await ffmpeg.close()
+            del ffmpeg
+        # Clean up player tracking
+        self._player_start_chunk.pop(airplay_player.player_id, None)
+        await airplay_player.stream.stop()
+        airplay_player.stream = None
+        # If this was the last client, stop the session
+        if not self.sync_clients:
+            await self.stop()
+            return
+
+    async def add_client(self, airplay_player: AirPlayPlayer) -> None:
+        """Add a sync client to the session as a late joiner.
+
+        The late joiner will:
+        1. Start playing at a compensated NTP timestamp (start_ntp + offset)
+        2. Receive silence calculated dynamically based on how much audio has been sent
+        3. Then receive real audio chunks in sync with other players
+        """
+        sync_leader = self.sync_clients[0]
+        if not sync_leader.stream or not sync_leader.stream.running:
+            return
+
+        allow_late_join = self.prov.config.get_value(CONF_ENABLE_LATE_JOIN, False)
+        if not allow_late_join:
+            # Late joining is not allowed - restart the session for all players
+            await self.stop()  # we need to stop the current session to add a new client
+            # this could potentially be called by multiple players at the exact same time
+            # so we debounce the resync a bit here with a timer
+            if sync_leader.current_media:
+                self.mass.call_later(
+                    0.5,
+                    self.mass.players.cmd_resume(sync_leader.player_id),
+                    task_id=f"resync_session_{sync_leader.player_id}",
+                )
+
+        # Stop existing stream if the player is already streaming
+        if airplay_player.stream and airplay_player.stream.running:
+            await airplay_player.stream.stop()
+
+        # Clean up any existing FFmpeg instance for this player
+        if ffmpeg := self._player_ffmpeg.pop(airplay_player.player_id, None):
+            await ffmpeg.close()
+            del ffmpeg
+
+        # Create appropriate stream type based on protocol
+        if airplay_player.protocol == StreamingProtocol.AIRPLAY2:
+            airplay_player.stream = AirPlay2Stream(self, airplay_player)
+        else:
+            airplay_player.stream = RaopStream(self, airplay_player)
+
+        # Create optional FFMpeg instance per player if needed
+        filter_params = get_player_filter_params(
+            self.mass,
+            airplay_player.player_id,
+            self.pcm_format,
+            airplay_player.stream.pcm_format,
+        )
+        if filter_params or self.pcm_format != airplay_player.stream.pcm_format:
+            ffmpeg = FFMpeg(
+                audio_input="-",
+                input_format=self.pcm_format,
+                output_format=airplay_player.stream.pcm_format,
+                filter_params=filter_params,
+            )
+            await ffmpeg.start()
+            self._player_ffmpeg[airplay_player.player_id] = ffmpeg
+
+        # Snapshot chunks_streamed inside lock to prevent race conditions
+        # Keep lock held during stream.start() to ensure player doesn't miss any chunks
+        async with self._lock:
+            # Calculate skip_seconds based on how many chunks have been sent
+            skip_seconds = self.chunks_streamed
+
+            # Add player to sync clients list
+            if airplay_player not in self.sync_clients:
+                self.sync_clients.append(airplay_player)
+
+            await airplay_player.stream.start(self.start_ntp, skip_seconds)
+
+    async def replace_stream(self, audio_source: AsyncGenerator[bytes, None]) -> None:
+        """Replace the audio source of the stream."""
+        # Cancel the current audio source task
+        assert self._audio_source_task  # for type checker
+        self._audio_source_task.cancel()
+        with suppress(asyncio.CancelledError):
+            await self._audio_source_task
+        # Set new audio source and restart the stream
+        self._audio_source = audio_source
+        self._audio_source_task = asyncio.create_task(self._audio_streamer())
+        # Restart the (player-specific) ffmpeg stream for all players
+        # This is the easiest way to ensure the new audio source is used
+        # as quickly as possible, without waiting for the buffers to be drained
+        # It also allows changing the player settings such as DSP on the fly
+        # for sync_client in self.sync_clients:
+        #     if not sync_client.stream:
+        #         continue  # guard
+        #     sync_client.stream.start_ffmpeg_stream()
+
+    async def _audio_streamer(self) -> None:
+        """Stream audio to all players."""
+        generator_exhausted = False
+        _last_metadata: str | None = None
+        try:
+            # each chunk is exactly one second of audio data based on the pcm format.
+            async for chunk in self._audio_source:
+                async with self._lock:
+                    sync_clients = [x for x in self.sync_clients if x.stream and x.stream.running]
+                    if not sync_clients:
+                        self.prov.logger.error(
+                            "!!! AUDIO STREAMER EXITING: No running clients left! "
+                            "Total sync_clients: %d, Details: %s",
+                            len(self.sync_clients),
+                            [
+                                (x.player_id, x.stream.running if x.stream else None)
+                                for x in self.sync_clients
+                            ],
+                        )
+                        return
+
+                    # Write to all players with a timeout (10 seconds)
+                    # Timeout must account for player's internal latency buffer (1-4 seconds)
+                    # The player may legitimately not accept data while draining its buffer
+                    write_start = time.time()
+                    write_tasks = [
+                        asyncio.wait_for(self._write_chunk_to_player(x, chunk), timeout=10.0)
+                        for x in sync_clients
+                        if x.stream
+                    ]
+                    results = await asyncio.gather(*write_tasks, return_exceptions=True)
+                    write_elapsed = time.time() - write_start
+
+                    # Check for write errors or timeouts
+                    players_to_remove = []
+                    for i, result in enumerate(results):
+                        if i >= len(sync_clients):
+                            continue
+                        player = sync_clients[i]
+
+                        if isinstance(result, asyncio.TimeoutError):
+                            self.prov.logger.error(
+                                "!!! TIMEOUT writing chunk %d to player %s - "
+                                "REMOVING from sync group! Total write time=%.3fs",
+                                self.chunks_streamed,
+                                player.player_id,
+                                write_elapsed,
+                            )
+                            players_to_remove.append(player)
+                        elif isinstance(result, Exception):
+                            self.prov.logger.error(
+                                "!!! Error writing chunk %d to player %s: %s - "
+                                "REMOVING from sync group! Total write time=%.3fs",
+                                self.chunks_streamed,
+                                player.player_id,
+                                result,
+                                write_elapsed,
+                            )
+                            players_to_remove.append(player)
+
+                    # Remove failed/timed-out players from sync group
+                    for player in players_to_remove:
+                        if player in self.sync_clients:
+                            self.sync_clients.remove(player)
+                            self.prov.logger.warning(
+                                "Player %s removed from sync group due to write failure/timeout",
+                                player.player_id,
+                            )
+                            # Stop the player's stream
+                            if player.stream:
+                                self.mass.create_task(player.stream.stop())
+
+                    # Update chunk counter (each chunk is exactly one second of audio)
+                    self.chunks_streamed += 1
+
+                # send metadata if changed
+                # do this in a separate task to not disturb audio streaming
+                # NOTE: we should probably move this out of the audio stream task into it's own task
+                if (
+                    self.sync_clients
+                    and (_leader := self.sync_clients[0])
+                    and (_leader.corrected_elapsed_time or 0) > 2
+                    and (metadata := _leader.current_media) is not None
+                ):
+                    now = time.time()
+                    metadata_checksum = f"{metadata.uri}.{metadata.title}.{metadata.image_url}"
+                    progress = metadata.corrected_elapsed_time or 0
+                    if _last_metadata != metadata_checksum:
+                        _last_metadata = metadata_checksum
+                        prev_progress_report = now
+                        self.mass.create_task(self._send_metadata(progress, metadata))
+                    # send the progress report every 5 seconds
+                    elif now - prev_progress_report >= 5:
+                        prev_progress_report = now
+                        self.mass.create_task(self._send_metadata(progress, None))
+            # Entire stream consumed: send EOF
+            generator_exhausted = True
+            async with self._lock:
+                await asyncio.gather(
+                    *[
+                        self._write_eof_to_player(x)
+                        for x in self.sync_clients
+                        if x.stream and x.stream.running
+                    ],
+                    return_exceptions=True,
+                )
+        except Exception as err:
+            logger = self.prov.logger
+            logger.error(
+                "Stream error: %s",
+                str(err) or err.__class__.__name__,
+                exc_info=err if logger.isEnabledFor(logging.DEBUG) else None,
+            )
+            raise
+        finally:
+            if not generator_exhausted:
+                await close_async_generator(self._audio_source)
+
+    async def _write_chunk_to_player(self, airplay_player: AirPlayPlayer, chunk: bytes) -> None:
+        """
+        Write audio chunk to a specific player.
+
+        each chunk is exactly one second of audio data based on the pcm format.
+        For late joiners, compensates for chunks sent between join time and actual chunk delivery.
+        Blocks (async) until the data has been written.
+        """
+        write_start = time.time()
+        chunk_number = self.chunks_streamed + 1
+        player_id = airplay_player.player_id
+
+        # Calculate chunk offset based on actual time vs start time
+        self._player_start_chunk.pop(player_id, None)
+
+        # if the player has an associated FFMpeg instance, use that first
+        if ffmpeg := self._player_ffmpeg.get(player_id):
+            await ffmpeg.write(chunk)
+            chunk_to_send = await ffmpeg.read(len(chunk))
+        else:
+            chunk_to_send = chunk
+
+        assert airplay_player.stream
+        stream_write_start = time.time()
+        await airplay_player.stream.write_chunk(chunk_to_send)
+        stream_write_elapsed = time.time() - stream_write_start
+
+        total_elapsed = time.time() - write_start
+
+        # Log only truly abnormal writes (>5s indicates a real stall)
+        # Can take up to ~4s if player's latency buffer is being drained
+        if total_elapsed > 5.0:
+            self.prov.logger.error(
+                "!!! STALLED WRITE: Player %s chunk %d took %.3fs total (stream write: %.3fs)",
+                player_id,
+                chunk_number,
+                total_elapsed,
+                stream_write_elapsed,
+            )
+
+    async def _write_eof_to_player(self, airplay_player: AirPlayPlayer) -> None:
+        """Write EOF to a specific player."""
+        # cleanup any associated FFMpeg instance first
+        if ffmpeg := self._player_ffmpeg.pop(airplay_player.player_id, None):
+            await ffmpeg.write_eof()
+            await ffmpeg.close()
+        assert airplay_player.stream
+        await airplay_player.stream.write_eof()
+
+    async def _send_metadata(self, progress: int | None, metadata: PlayerMedia | None) -> None:
+        """Send metadata to all players."""
+        async with self._lock:
+            await asyncio.gather(
+                *[
+                    x.stream.send_metadata(progress, metadata)
+                    for x in self.sync_clients
+                    if x.stream and x.stream.running
+                ],
+                return_exceptions=True,
+            )