Replace Airplay provider (#1084)
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Sun, 18 Feb 2024 01:09:30 +0000 (02:09 +0100)
committerGitHub <noreply@github.com>
Sun, 18 Feb 2024 01:09:30 +0000 (02:09 +0100)
15 files changed:
music_assistant/common/models/media_items.py
music_assistant/server/controllers/players.py
music_assistant/server/helpers/process.py
music_assistant/server/providers/airplay/__init__.py
music_assistant/server/providers/airplay/bin/cliraop-linux-aarch64 [new file with mode: 0755]
music_assistant/server/providers/airplay/bin/cliraop-linux-x86_64 [new file with mode: 0755]
music_assistant/server/providers/airplay/bin/cliraop-macos-arm64 [new file with mode: 0755]
music_assistant/server/providers/airplay/bin/squeeze2raop-darwin-arm64-static [deleted file]
music_assistant/server/providers/airplay/bin/squeeze2raop-darwin-x86_64-static [deleted file]
music_assistant/server/providers/airplay/bin/squeeze2raop-linux-aarch64-static [deleted file]
music_assistant/server/providers/airplay/bin/squeeze2raop-linux-arm-static [deleted file]
music_assistant/server/providers/airplay/bin/squeeze2raop-linux-x86-static [deleted file]
music_assistant/server/providers/airplay/bin/squeeze2raop-linux-x86_64-static [deleted file]
music_assistant/server/providers/airplay/manifest.json
pyproject.toml

index 5085916fd0a3a40007e069718072fcd86a3cbe8a..c8b08a8736129a89f25ca2e307a55dcdfbe845e6 100644 (file)
@@ -9,11 +9,7 @@ from typing import Any, Self
 from mashumaro import DataClassDictMixin
 
 from music_assistant.common.helpers.uri import create_uri
-from music_assistant.common.helpers.util import (
-    create_sort_name,
-    is_valid_uuid,
-    merge_lists,
-)
+from music_assistant.common.helpers.util import create_sort_name, is_valid_uuid, merge_lists
 from music_assistant.common.models.enums import (
     AlbumType,
     ContentType,
@@ -355,6 +351,11 @@ class Track(MediaItem):
             return self.album.image
         return super().image
 
+    @property
+    def artist_str(self) -> str:
+        """Return (combined) artist string for track."""
+        return "/".join(x.name for x in self.artists)
+
 
 @dataclass(kw_only=True)
 class AlbumTrack(Track):
index 671f06bc357b6abcc25d19e713a4527cd384d7e3..04c7ebdd06e7a2760766915b45997270f7bd9d3f 100644 (file)
@@ -401,15 +401,6 @@ class PlayerController(CoreController):
         if player.powered == powered:
             return  # nothing to do
 
-        # inform (active) group player if needed
-        # NOTE: this must be on the top to prevent race conditions
-        if active_group_player := self._get_active_player_group(player):
-            if active_group_player.player_id.startswith(SYNCGROUP_PREFIX):
-                self._on_syncgroup_child_power(
-                    active_group_player.player_id, player.player_id, powered
-                )
-            elif player_prov := self.get_player_provider(active_group_player.player_id):
-                player_prov.on_child_power(active_group_player.player_id, player.player_id, powered)
         # stop player at power off
         if (
             not powered
@@ -437,8 +428,16 @@ class PlayerController(CoreController):
         # as fast as possible and prevent race conditions
         player.powered = powered
         self.update(player_id)
+        # handle actions when a syncgroup child turns on
+        if active_group_player := self._get_active_player_group(player):
+            if active_group_player.player_id.startswith(SYNCGROUP_PREFIX):
+                self._on_syncgroup_child_power(
+                    active_group_player.player_id, player.player_id, powered
+                )
+            elif player_prov := self.get_player_provider(active_group_player.player_id):
+                player_prov.on_child_power(active_group_player.player_id, player.player_id, powered)
         # handle 'auto play on power on'  feature
-        if (
+        elif (
             powered
             and self.mass.config.get_raw_player_config_value(player_id, CONF_AUTO_PLAY, False)
             and player.active_source in (None, player_id)
@@ -899,6 +898,16 @@ class PlayerController(CoreController):
                 continue
             elif child_player.group_childs:
                 return child_player
+        # select new sync leader: return the first playing player
+        for child_player in self.iter_group_members(
+            group_player, only_powered=True, only_playing=True
+        ):
+            return child_player
+        # fallback select new sync leader: return the first powered player
+        for child_player in self.iter_group_members(
+            group_player, only_powered=True, only_playing=False
+        ):
+            return child_player
         return None
 
     async def _sync_syncgroup(self, player_id: str) -> None:
index 3c725296ffc45c60aed67b60d27a79afa57be59c..ea6af71e4cea667f37682bf809153cc158486caa 100644 (file)
@@ -42,6 +42,15 @@ class AsyncProcess:
 
     async def __aenter__(self) -> AsyncProcess:
         """Enter context manager."""
+        await self.start()
+        return self
+
+    async def __aexit__(self, exc_type, exc_value, traceback) -> bool:
+        """Exit context manager."""
+        await self.close()
+
+    async def start(self) -> None:
+        """Perform Async init of process."""
         self._proc = await asyncio.create_subprocess_exec(
             *self._args,
             stdin=asyncio.subprocess.PIPE if self._enable_stdin else None,
@@ -49,25 +58,6 @@ class AsyncProcess:
             stderr=asyncio.subprocess.PIPE if self._enable_stderr else None,
             close_fds=True,
         )
-        return self
-
-    async def __aexit__(self, exc_type, exc_value, traceback) -> bool:
-        """Exit context manager."""
-        self.closed = True
-        # make sure the process is cleaned up
-        self.write_eof()
-        if self._proc.returncode is None:
-            try:
-                async with asyncio.timeout(10):
-                    await self._proc.communicate()
-            except TimeoutError:
-                self._proc.kill()
-                await self._proc.communicate()
-        if self._proc.returncode is None:
-            self._proc.kill()
-        if self._attached_task and not self._attached_task.done():
-            with suppress(asyncio.CancelledError):
-                self._attached_task.cancel()
 
     async def iter_chunked(self, n: int = DEFAULT_CHUNKSIZE) -> AsyncGenerator[bytes, None]:
         """Yield chunks of n size from the process stdout."""
@@ -113,6 +103,8 @@ class AsyncProcess:
 
     def write_eof(self) -> None:
         """Write end of file to to process stdin."""
+        if not self._enable_stdin:
+            return
         if self.closed or self._proc.stdin.is_closing():
             return
         try:
@@ -128,6 +120,30 @@ class AsyncProcess:
             # already exited, race condition
             pass
 
+    async def close(self) -> None:
+        """Close/terminate the process."""
+        self.closed = True
+        if self._attached_task and not self._attached_task.done():
+            with suppress(asyncio.CancelledError):
+                self._attached_task.cancel()
+        # make sure the process is cleaned up
+        self.write_eof()
+        if self._proc.returncode is None:
+            try:
+                async with asyncio.timeout(10):
+                    await self._proc.communicate()
+            except TimeoutError:
+                self._proc.kill()
+        await self.wait()
+
+    async def wait(self) -> int:
+        """Wait for the process and return the returncode."""
+        if self._proc.returncode is not None:
+            return self._proc.returncode
+        exitcode = await self._proc.wait()
+        self.closed = True
+        return exitcode
+
     async def communicate(self, input_data: bytes | None = None) -> tuple[bytes, bytes]:
         """Write bytes to process and read back results."""
         return await self._proc.communicate(input_data)
index 2a82348c94b9d0a4ce95ed323d56ff40fa1602f5..a273870651380a4ce383fb4b43dc07a69702836a 100644 (file)
@@ -1,52 +1,68 @@
-"""Airplay Player provider.
-
-This is more like a "virtual" player provider, running on top of slimproto.
-It uses the amazing work of Philippe44 who created a bridge from airplay to slimproto.
-https://github.com/philippe44/LMS-Raop
-"""
-
+"""Airplay Player provider for Music Assistant."""
 from __future__ import annotations
 
 import asyncio
 import os
 import platform
-import xml.etree.ElementTree as ET  # noqa: N817
-from contextlib import suppress
-from typing import TYPE_CHECKING
+import socket
+import time
+from random import randint, randrange
+from typing import TYPE_CHECKING, cast
 
 import aiofiles
+from pyatv import connect, exceptions, interface, scan
+from pyatv.conf import AppleTV as ATVConf
+from pyatv.const import DeviceModel, DeviceState, PowerState, Protocol
+from pyatv.convert import model_str
+from pyatv.interface import AppleTV as AppleTVInterface
+from pyatv.interface import DeviceListener
+from pyatv.protocols.raop import RaopStream
+from zeroconf import ServiceInfo
 
-from music_assistant.common.models.config_entries import ConfigEntry, ConfigValueType
-from music_assistant.common.models.enums import ConfigEntryType, ProviderFeature
+from music_assistant.common.helpers.datetime import utc
+from music_assistant.common.helpers.util import get_ip_pton
+from music_assistant.common.models.config_entries import (
+    CONF_ENTRY_CROSSFADE,
+    CONF_ENTRY_CROSSFADE_DURATION,
+    ConfigEntry,
+    ConfigValueType,
+)
+from music_assistant.common.models.enums import (
+    ConfigEntryType,
+    ContentType,
+    PlayerFeature,
+    PlayerState,
+    PlayerType,
+    ProviderFeature,
+)
+from music_assistant.common.models.media_items import AudioFormat
 from music_assistant.common.models.player import DeviceInfo, Player
-from music_assistant.constants import CONF_LOG_LEVEL, CONF_PLAYERS
+from music_assistant.server.helpers.process import AsyncProcess, check_output
+from music_assistant.server.helpers.util import create_tempfile
 from music_assistant.server.models.player_provider import PlayerProvider
 
 if TYPE_CHECKING:
-    from music_assistant.common.models.config_entries import PlayerConfig, ProviderConfig
+    from music_assistant.common.models.config_entries import ProviderConfig
     from music_assistant.common.models.provider import ProviderManifest
     from music_assistant.common.models.queue_item import QueueItem
     from music_assistant.server import MusicAssistant
     from music_assistant.server.controllers.streams import MultiClientStreamJob
     from music_assistant.server.models import ProviderInstanceType
-    from music_assistant.server.providers.slimproto import SlimprotoProvider
-
 
+CONF_LATENCY = "latency"
+CONF_ENCRYPTION = "encryption"
+CONF_ALAC_ENCODE = "alac_encode"
+CONF_VOLUME_START = "volume_start"
+CONF_SYNC_ADJUST = "sync_adjust"
 PLAYER_CONFIG_ENTRIES = (
+    CONF_ENTRY_CROSSFADE,
+    CONF_ENTRY_CROSSFADE_DURATION,
     ConfigEntry(
-        key="airplay_header",
-        type=ConfigEntryType.DIVIDER,
-        label="Airplay specific settings",
-        description="Configure Airplay specific settings. "
-        "Note that changing any airplay specific setting, will reconnect all players.",
-        advanced=True,
-    ),
-    ConfigEntry(
-        key="read_ahead",
+        key=CONF_LATENCY,
         type=ConfigEntryType.INTEGER,
         range=(200, 3000),
-        default_value=1000,
-        label="Read ahead buffer",
+        default_value=1200,
+        label="Latency",
         description="Sets the number of milliseconds of audio buffer in the player. "
         "This is important to absorb network throughput jitter. "
         "Note that the resume after pause will be skipping that amount of time "
@@ -54,7 +70,7 @@ PLAYER_CONFIG_ENTRIES = (
         advanced=True,
     ),
     ConfigEntry(
-        key="encryption",
+        key=CONF_ENCRYPTION,
         type=ConfigEntryType.BOOLEAN,
         default_value=False,
         label="Enable encryption",
@@ -63,7 +79,7 @@ PLAYER_CONFIG_ENTRIES = (
         advanced=True,
     ),
     ConfigEntry(
-        key="alac_encode",
+        key=CONF_ALAC_ENCODE,
         type=ConfigEntryType.BOOLEAN,
         default_value=True,
         label="Enable compression",
@@ -72,27 +88,30 @@ PLAYER_CONFIG_ENTRIES = (
         advanced=True,
     ),
     ConfigEntry(
-        key="remove_timeout",
+        key=CONF_VOLUME_START,
+        type=ConfigEntryType.BOOLEAN,
+        default_value=False,
+        label="Send volume at playback start",
+        description="Some players require to send/confirm the volume when playback starts. \n"
+        "Enable this setting if the playback volume does not match the MA interface.",
+        advanced=True,
+    ),
+    ConfigEntry(
+        key=CONF_SYNC_ADJUST,
         type=ConfigEntryType.INTEGER,
+        range=(-500, 500),
         default_value=0,
-        range=(-1, 3600),
-        label="Remove timeout",
-        description="Player discovery is managed using mDNS protocol, "
-        "which means that a player sends regular keep-alive messages and a bye when "
-        "disconnecting. Some faulty mDNS stack implementations (e.g. Riva) do not always"
-        "send keep-alive messages, so the Airplay bridge is disconnecting them regularly. \n\n"
-        "As a workaround, a timer can be set so that the bridge does not immediately remove "
-        "the player from LMS when missing a keep-alive, waiting for it to reconnect. \n\n\n"
-        "A value of -1 will disable this feature and never remove the player. \n\n"
-        "A value of 0 (the default) disabled the player when keep-alive is missed or "
-        "when a bye message is received. \n\n"
-        "Any other value means to disable the player after missing keep-alive for "
-        "this number of seconds.",
+        label="Audio synchronization delay correction",
+        description="If this player is playing audio synced with other players "
+        "and you always hear the audio too early or late on this player, "
+        "you can shift the audio a bit.",
         advanced=True,
     ),
 )
+BACKOFF_TIME_LOWER_LIMIT = 15  # seconds
+BACKOFF_TIME_UPPER_LIMIT = 300  # Five minutes
 
-NEED_BRIDGE_RESTART = {"values/read_ahead", "values/encryption", "values/alac_encode", "enabled"}
+CONF_CREDENTIALS = "credentials"
 
 
 async def setup(
@@ -121,88 +140,505 @@ async def get_config_entries(
     return ()  # we do not have any config entries (yet)
 
 
+def convert_airplay_volume(value: float) -> int:
+    """Remap Airplay Volume to 0..100 scale."""
+    airplay_min = -30
+    airplay_max = 0
+    normal_min = 0
+    normal_max = 100
+    portion = (value - airplay_min) * (normal_max - normal_min) / (airplay_max - airplay_min)
+    return int(portion + normal_min)
+
+
+class AirPlayPlayer(DeviceListener):
+    """Holds the connection to the apyatv instance and the cliraop."""
+
+    def __init__(
+        self, mass: MusicAssistant, player_id: str, discovery_info: interface.BaseConfig
+    ) -> None:
+        """Initialize power manager."""
+        self.player_id = player_id
+        self.discovery_info = discovery_info
+        self.mass = mass
+        self.atv: AppleTVInterface | None = None
+        self.is_on = False
+        self.connected = False
+        self._connection_attempts = 0
+        self._connection_was_lost = False
+        self._task = None
+        self._playing: interface.Playing | None = None
+        self.logger = self.mass.players.logger.getChild("airplay").getChild(self.player_id)
+        self.cliraop_proc: AsyncProcess | None = None
+        self.active_remote_id = str(randint(1000, 8000))
+        self.optimistic_state: PlayerState = PlayerState.IDLE
+
+    def connection_lost(self, _):
+        """Device was unexpectedly disconnected.
+
+        This is a callback function from pyatv.interface.DeviceListener.
+        """
+        self.logger.warning('Connection lost to Apple TV "%s"', self.discovery_info.name)
+        self._connection_was_lost = True
+        self._handle_disconnect()
+
+    def connection_closed(self):
+        """Device connection was (intentionally) closed.
+
+        This is a callback function from pyatv.interface.DeviceListener.
+        """
+        self.connected = False
+        self._handle_disconnect()
+
+    def _handle_disconnect(self):
+        """Handle that the device disconnected and restart connect loop."""
+        self.connected = False
+        if self.atv:
+            self.atv.close()
+            self.atv = None
+        self._start_connect_loop()
+
+    async def connect(self):
+        """Connect to device."""
+        self.is_on = True
+        if self.connected:
+            return
+        self._start_connect_loop()
+        await asyncio.sleep(2)
+
+    async def disconnect(self):
+        """Disconnect from device."""
+        self.logger.debug("Disconnecting from device")
+        self.is_on = False
+        self.connected = False
+        try:
+            if self.atv:
+                self.atv.close()
+                self.atv = None
+            if self._task:
+                self._task.cancel()
+                self._task = None
+        except Exception:  # pylint: disable=broad-except
+            self.logger.exception("An error occurred while disconnecting")
+
+    async def stop(self):
+        """Stop playback and cleanup any running CLIRaop Process."""
+        if self.cliraop_proc and not self.cliraop_proc.closed:
+            # prefer interactive command to our streamer
+            await self.send_cli_command("ACTION=STOP")
+            await self.cliraop_proc.wait()
+            self.optimistic_state = PlayerState.IDLE
+            self.update_attributes()
+        elif atv := self.atv:
+            await atv.remote_control.stop()
+
+    async def send_cli_command(self, command: str) -> None:
+        """Send an interactive command to the running CLIRaop binary."""
+        if not self.cliraop_proc or self.cliraop_proc.closed:
+            return
+
+        named_pipe = f"/tmp/fifo-{self.active_remote_id}"  # noqa: S108
+        if not command.endswith("\n"):
+            command += "\n"
+
+        def send_data():
+            with open(named_pipe, "w") as f:
+                f.write(command)
+                f.flush()
+
+        self.logger.debug("sending command %s", command)
+        await self.mass.create_task(send_data)
+
+    def _start_connect_loop(self):
+        """Start background connect loop to device."""
+        if not self._task and self.atv is None and self.is_on:
+            self._task = asyncio.create_task(self._connect_loop())
+        else:
+            self.logger.debug("Not starting connect loop (%s, %s)", self.atv is None, self.is_on)
+
+    async def connect_once(self) -> None:
+        """Try to connect once."""
+        try:
+            if conf := await self._scan():
+                await self._connect(conf)
+        except exceptions.AuthenticationError:
+            await self.disconnect()
+            self.logger.exception(
+                "Authentication failed for %s, try reconfiguring device",
+                self.discovery_info.name,
+            )
+            return
+        except asyncio.CancelledError:
+            pass
+        except Exception:  # pylint: disable=broad-except
+            self.logger.exception("Failed to connect")
+            self.atv = None
+
+    async def _connect_loop(self):
+        """Connect loop background task function."""
+        self.logger.debug("Starting connect loop")
+
+        # Try to find device and connect as long as the user has said that
+        # we are allowed to connect and we are not already connected.
+        while self.is_on and self.atv is None:
+            await self.connect_once()
+            if self.atv is not None:
+                break
+            self._connection_attempts += 1
+            backoff = min(
+                max(
+                    BACKOFF_TIME_LOWER_LIMIT,
+                    randrange(2**self._connection_attempts),
+                ),
+                BACKOFF_TIME_UPPER_LIMIT,
+            )
+
+            self.logger.debug("Reconnecting in %d seconds", backoff)
+            await asyncio.sleep(backoff)
+
+        self.logger.debug("Connect loop ended")
+        self._task = None
+
+    async def _scan(self) -> ATVConf | None:
+        """Try to find device by scanning for it."""
+        address: str = self.discovery_info.address
+
+        self.logger.debug("Discovering device %s", self.discovery_info.name)
+        atvs = await scan(
+            self.mass.loop,
+            identifier=self.discovery_info.identifier,
+            hosts=[address],
+        )
+        if atvs:
+            return cast(ATVConf, atvs[0])
+
+        self.logger.debug(
+            "Failed to find device %s with address %s",
+            self.discovery_info.name,
+            address,
+        )
+        # We no longer multicast scan for the device since as soon as async_step_zeroconf runs,
+        # it will update the address and reload the config entry when the device is found.
+        return None
+
+    async def _connect(self, conf: ATVConf) -> None:
+        """Connect to device."""
+        credentials: dict[int, str | None] = self.mass.config.get_raw_player_config_value(
+            self.player_id, CONF_CREDENTIALS, {}
+        )
+        name: str = self.discovery_info.name
+        missing_protocols = []
+        for protocol_int, creds in credentials.items():
+            protocol = Protocol(int(protocol_int))
+            if conf.get_service(protocol) is not None:
+                conf.set_credentials(protocol, creds)  # type: ignore[arg-type]
+            else:
+                missing_protocols.append(protocol.name)
+
+        if missing_protocols:
+            missing_protocols_str = ", ".join(missing_protocols)
+            self.logger.info(
+                "Protocol(s) %s not yet found for %s, trying later",
+                missing_protocols_str,
+                name,
+            )
+            return
+
+        self.logger.debug("Connecting to device %s", name)
+        session = self.mass.http_session
+
+        self.atv = await connect(conf, self.mass.loop, session=session)
+        self.connected = True
+        self.atv.power.listener = self
+        self.atv.listener = self
+        self.atv.audio.listener = self
+        if self.atv.features.in_state(
+            interface.FeatureState.Available, interface.FeatureName.PushUpdates
+        ):
+            self.atv.push_updater.listener = self
+            self.atv.push_updater.start()
+
+        self.address_updated(str(conf.address))
+
+        self._setup_device()
+        self.update_attributes()
+
+        self._connection_attempts = 0
+        if self._connection_was_lost:
+            self.logger.info(
+                'Connection was re-established to device "%s"',
+                name,
+            )
+            self._connection_was_lost = False
+
+    def _setup_device(self):
+        if not (mass_player := self.mass.players.get(self.player_id)):
+            mass_player = Player(
+                player_id=self.player_id,
+                provider="airplay",
+                type=PlayerType.PLAYER,
+                name=self.discovery_info.name,
+                available=True,
+                powered=False,
+                device_info=DeviceInfo(
+                    model=self.discovery_info.device_info.raw_model,
+                    manufacturer="Apple",
+                    address=str(self.discovery_info.address),
+                ),
+                supported_features=(
+                    PlayerFeature.PAUSE,
+                    PlayerFeature.SYNC,
+                    PlayerFeature.VOLUME_SET,
+                    PlayerFeature.POWER,
+                ),
+                max_sample_rate=44100,
+                supports_24bit=False,
+            )
+        if self.atv:
+            dev_info = self.atv.device_info
+            mass_player.device_info = DeviceInfo(
+                model=(
+                    dev_info.raw_model
+                    if dev_info.model == DeviceModel.Unknown and dev_info.raw_model
+                    else model_str(dev_info.model)
+                ),
+                manufacturer="Apple",
+                address=str(self.discovery_info.address),
+            )
+        self.mass.players.register_or_update(mass_player)
+
+    def playstatus_update(self, _, playstatus: interface.Playing) -> None:
+        """Inform about changes to what is currently playing."""
+        self.logger.debug("Playstatus received: %s", playstatus)
+        self._playing = playstatus
+        self.update_attributes()
+
+    def playstatus_error(self, updater, exception: Exception) -> None:
+        """Inform about an error when updating play status."""
+        self.logger.debug("Playstatus error received", exc_info=exception)
+        self._playing = None
+        self.update_attributes()
+
+    def powerstate_update(self, old_state: PowerState, new_state: PowerState) -> None:
+        """Update power state when it changes."""
+        self.update_attributes()
+
+    def volume_update(self, old_level: float, new_level: float) -> None:
+        """Update volume when it changes."""
+        self.update_attributes()
+
+    def update_attributes(self) -> None:
+        """Update the player attributes."""
+        mass_player = self.mass.players.get(self.player_id)
+        mass_player.volume_level = int(self.atv.audio.volume)
+        mass_player.powered = self.is_on
+        if self.cliraop_proc and not self.cliraop_proc.closed:
+            mass_player.state = self.optimistic_state
+            # NOTE: alapsed time is pushed from cliraop
+        elif self.atv is None or not self.connected:
+            mass_player.powered = False
+            mass_player.state = PlayerState.IDLE
+        elif self._playing:
+            state = self._playing.device_state
+            if state in (DeviceState.Idle, DeviceState.Loading):
+                mass_player.state = PlayerState.IDLE
+            elif state == DeviceState.Playing:
+                mass_player.state = PlayerState.PLAYING
+            elif state in (DeviceState.Paused, DeviceState.Seeking, DeviceState.Stopped):
+                mass_player.state = PlayerState.PAUSED
+            else:
+                mass_player.state = PlayerState.IDLE
+            mass_player.elapsed_time = self._playing.position or 0
+            mass_player.elapsed_time_last_updated = time.time()
+            mass_player.current_item_id = self._playing.content_identifier
+        else:
+            mass_player.state = PlayerState.IDLE
+        self.mass.players.update(self.player_id)
+
+    @property
+    def is_connecting(self):
+        """Return true if connection is in progress."""
+        return self._task is not None
+
+    def address_updated(self, address):
+        """Update cached address in config entry."""
+        self.logger.debug("Changing address to %s", address)
+        self._setup_device()
+
+
 class AirplayProvider(PlayerProvider):
-    """Player provider for Airplay based players, using the slimproto bridge."""
+    """Player provider for Airplay based players."""
 
-    _bridge_bin: str | None = None
-    _bridge_proc: asyncio.subprocess.Process | None = None
-    _timer_handle: asyncio.TimerHandle | None = None
-    _closing: bool = False
-    _config_file: str | None = None
-    _log_reader_task: asyncio.Task | None = None
-    _removed_players: set[str] | None = None
+    _atv_players: dict[str, AirPlayPlayer]
+    _discovery_running: bool = False
+    _cliraop_bin: str | None = None
+    _stream_tasks: dict[str, asyncio.Task]
 
     @property
     def supported_features(self) -> tuple[ProviderFeature, ...]:
         """Return the features supported by this Provider."""
-        # for now do not allow creation of airplay groups
-        # in preparation of new airplay provider coming up soon
-        # return (ProviderFeature.SYNC_PLAYERS,)
-        return ()
+        return (ProviderFeature.SYNC_PLAYERS,)
 
     async def handle_setup(self) -> None:
         """Handle async initialization of the provider."""
-        self._removed_players = set()
-        self._config_file = os.path.join(self.mass.storage_path, "airplay_bridge.xml")
-        # locate the raopbridge binary (will raise if that fails)
-        self._bridge_bin = await self._get_bridge_binary()
-        # make sure that slimproto provider is loaded
-        slimproto_prov: SlimprotoProvider = self.mass.get_provider("slimproto")
-        assert slimproto_prov, "This provider depends on the SlimProto provider."
-        # register as virtual provider on slimproto provider
-        slimproto_prov.register_virtual_provider(
-            "RaopBridge",
-            self._handle_player_register_callback,
-            self._handle_player_update_callback,
+        self._atv_players = {}
+        self._stream_tasks = {}
+        self._cliraop_bin = await self.get_cliraop_binary()
+        self.mass.create_task(self._run_discovery())
+        dacp_port = 49831
+        self.dacp_id = dacp_id = f"{randrange(2 ** 64):X}"
+        self.logger.debug("Starting DACP ActiveRemote %s on port %s", dacp_id, dacp_port)
+        await asyncio.start_server(self._handle_dacp_request, "0.0.0.0", dacp_port)
+        zeroconf_type = "_dacp._tcp.local."
+        server_id = f"iTunes_Ctrl_{dacp_id}.{zeroconf_type}"
+        info = ServiceInfo(
+            zeroconf_type,
+            name=server_id,
+            addresses=[await get_ip_pton(self.mass.streams.publish_ip)],
+            port=dacp_port,
+            properties={
+                "txtvers": "1",
+                "Ver": "63B5E5C0C201542E",
+                "DbId": "63B5E5C0C201542E",
+                "OSsi": "0x1F5",
+            },
+            server=f"{socket.gethostname()}.local",
         )
-        await self._check_config_xml()
-        # start running the bridge
-        asyncio.create_task(self._bridge_process_runner(slimproto_prov))
+        await self.mass.zeroconf.async_register_service(info)
+
+    async def _handle_dacp_request(  # noqa: PLR0915
+        self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter
+    ) -> None:
+        """Handle new connection on the socket."""
+        try:
+            raw_request = b""
+            while recv := await reader.read(1024):
+                raw_request += recv
+                if len(recv) < 1024:
+                    break
+
+            request = raw_request.decode("UTF-8")
+            headers_raw, body = request.split("\r\n\r\n", 1)
+            headers_raw = headers_raw.split("\r\n")
+            headers = {}
+            for line in headers_raw[1:]:
+                x, y = line.split(":", 1)
+                headers[x.strip()] = y.strip()
+            active_remote = headers.get("Active-Remote")
+            _, path, _ = headers_raw[0].split(" ")
+            atv_player = next(
+                (x for x in self._atv_players.values() if x.active_remote_id == active_remote), None
+            )
+            self.logger.debug(
+                "DACP request for %s (%s): %s -- %s",
+                atv_player.discovery_info.name if atv_player else "UNKNOWN PLAYER",
+                active_remote,
+                path,
+                body,
+            )
+            if not atv_player:
+                return
+
+            player_id = atv_player.player_id
+            if path == "/ctrl-int/1/nextitem":
+                self.mass.create_task(self.mass.player_queues.next(player_id))
+            elif path == "/ctrl-int/1/previtem":
+                self.mass.create_task(self.mass.player_queues.previous(player_id))
+            elif path == "/ctrl-int/1/play":
+                self.mass.create_task(self.mass.player_queues.play(player_id))
+            elif path == "/ctrl-int/1/playpause":
+                self.mass.create_task(self.mass.player_queues.play_pause(player_id))
+            elif path == "/ctrl-int/1/stop":
+                self.mass.create_task(self.cmd_stop(player_id))
+            elif path == "/ctrl-int/1/volumeup":
+                self.mass.create_task(self.mass.players.cmd_volume_up(player_id))
+            elif path == "/ctrl-int/1/volumedown":
+                self.mass.create_task(self.mass.players.cmd_volume_down(player_id))
+            elif path == "/ctrl-int/1/shuffle_songs":
+                queue = self.mass.player_queues.get(player_id)
+                self.mass.create_task(
+                    self.mass.player_queues.set_shuffle(player_id, not queue.shuffle_enabled)
+                )
+            elif path in ("/ctrl-int/1/pause", "/ctrl-int/1/discrete-pause"):
+                self.mass.create_task(self.mass.player_queues.pause(player_id))
+            elif "dmcp.device-volume=" in path:
+                raop_volume = float(path.split("dmcp.device-volume=", 1)[-1])
+                volume = convert_airplay_volume(raop_volume)
+                if abs(volume - int(atv_player.atv.audio.volume)) > 2:
+                    self.mass.create_task(self.cmd_volume_set(player_id, volume))
+            elif "dmcp.volume=" in path:
+                volume = int(path.split("dmcp.volume=", 1)[-1])
+                if abs(volume - int(atv_player.atv.audio.volume)) > 2:
+                    self.mass.create_task(self.cmd_volume_set(player_id, volume))
+            else:
+                self.logger.warning(
+                    "Unknown DACP request for %s: %s",
+                    atv_player.discovery_info.name,
+                    path,
+                )
 
-    async def unload(self) -> None:
-        """Handle close/cleanup of the provider."""
-        self._closing = True
-        if slimproto_prov := self.mass.get_provider("slimproto"):
-            slimproto_prov.unregister_virtual_provider("RaopBridge")
-        await self._stop_bridge()
+            # send response
+            date_str = utc().strftime("%a, %-d %b %Y %H:%M:%S")
+            response = (
+                f"HTTP/1.0 204 No Content\r\nDate: {date_str} "
+                "GMT\r\nDAAP-Server: iTunes/7.6.2 (Windows; N;)\r\nContent-Type: "
+                "application/x-dmap-tagged\r\nContent-Length: 0\r\n"
+                "Connection: close\r\n\r\n"
+            )
+            writer.write(response.encode())
+            await writer.drain()
+        finally:
+            writer.close()
 
     async def get_player_config_entries(self, player_id: str) -> tuple[ConfigEntry, ...]:
         """Return all (provider/player specific) Config Entries for the given player (if any)."""
-        slimproto_prov = self.mass.get_provider("slimproto")
-        base_entries = await slimproto_prov.get_player_config_entries(player_id)
-        return base_entries + PLAYER_CONFIG_ENTRIES
-
-    def on_player_config_changed(self, config: PlayerConfig, changed_keys: set[str]) -> None:
-        """Call (by config manager) when the configuration of a player changes."""
-        super().on_player_config_changed(config, changed_keys)
-        # forward to slimproto too
-        slimproto_prov = self.mass.get_provider("slimproto")
-        slimproto_prov.on_player_config_changed(config, changed_keys)
-
-        async def update_config() -> None:
-            # stop bridge (it will be auto restarted)
-            if changed_keys.intersection(NEED_BRIDGE_RESTART):
-                self.restart_bridge()
-
-        asyncio.create_task(update_config())
-
-    def on_player_config_removed(self, player_id: str) -> None:
-        """Call (by config manager) when the configuration of a player is removed."""
-        super().on_player_config_removed(player_id)
-        self._removed_players.add(player_id)
-        self.restart_bridge()
+        entries = await super().get_player_config_entries(player_id)
+        return entries + PLAYER_CONFIG_ENTRIES
 
     async def cmd_stop(self, player_id: str) -> None:
-        """Send STOP command to given player."""
-        # simply forward to underlying slimproto player
-        slimproto_prov = self.mass.get_provider("slimproto")
-        await slimproto_prov.cmd_stop(player_id)
+        """Send STOP command to given player.
+
+        - player_id: player_id of the player to handle the command.
+        """
+        if stream_task := self._stream_tasks.pop(player_id, None):
+            if not stream_task.done():
+                stream_task.cancel()
+
+        # forward command to player and any connected sync members
+        async with asyncio.TaskGroup() as tg:
+            for atv_player in self._get_sync_clients(player_id):
+                tg.create_task(atv_player.stop())
 
     async def cmd_play(self, player_id: str) -> None:
-        """Send PLAY command to given player."""
-        # simply forward to underlying slimproto player
-        slimproto_prov = self.mass.get_provider("slimproto")
-        await slimproto_prov.cmd_play(player_id)
+        """Send PLAY (unpause) command to given player.
+
+        - player_id: player_id of the player to handle the command.
+        """
+        # forward command to player and any connected sync members
+        async with asyncio.TaskGroup() as tg:
+            for atv_player in self._get_sync_clients(player_id):
+                if atv_player.cliraop_proc and not atv_player.cliraop_proc.closed:
+                    # prefer interactive command to our streamer
+                    tg.create_task(atv_player.send_cli_command("ACTION=PLAY"))
+                    atv_player.optimistic_state = PlayerState.PLAYING
+                    atv_player.update_attributes()
+                elif atv := atv_player.atv:
+                    tg.create_task(atv.remote_control.play())
+
+    async def cmd_pause(self, player_id: str) -> None:
+        """Send PAUSE command to given player.
+
+        - player_id: player_id of the player to handle the command.
+        """
+        # forward command to player and any connected sync members
+        async with asyncio.TaskGroup() as tg:
+            for atv_player in self._get_sync_clients(player_id):
+                if atv_player.cliraop_proc and not atv_player.cliraop_proc.closed:
+                    # prefer interactive command to our streamer
+                    tg.create_task(atv_player.send_cli_command("ACTION=PAUSE"))
+                    atv_player.optimistic_state = PlayerState.PAUSED
+                    atv_player.update_attributes()
+                elif atv := atv_player.atv:
+                    tg.create_task(atv.remote_control.pause())
 
     async def play_media(
         self,
@@ -221,300 +657,402 @@ class AirplayProvider(PlayerProvider):
             - seek_position: Optional seek to this position.
             - fade_in: Optionally fade in the item at playback start.
         """
-        # simply forward to underlying slimproto player
-        slimproto_prov = self.mass.get_provider("slimproto")
-        await slimproto_prov.play_media(
-            player_id,
-            queue_item=queue_item,
-            seek_position=seek_position,
-            fade_in=fade_in,
-        )
+        # stop existing streams first
+        await self.cmd_stop(player_id)
+        await self.cmd_power(player_id, True)
+        atv_player = self._atv_players[player_id]
+        player = self.mass.players.get(player_id)
+
+        if player.synced_to:
+            # should not happen, but just in case
+            raise RuntimeError("Player is synced")
+
+        # NOTE: Although the pyatv library is perfectly capable of playback
+        # to not only raop targets but also airplay 1 + 2, its not suitable
+        # for synced playback to multiple clients at once.
+        # Also the performance is horrible. Python is not suitable for realtime
+        # audio streaming.
+        # So, I've decided to a combined route here. I've created a small binary
+        # written in C based on libraop to do the actual timestamped playback.
+        # the raw pcm audio is fed to the stdin of this cliraop binary and we can
+        # send some commands over a named pipe.
+
+        # get current ntp before we start
+        _, stdout = await check_output(f"{self._cliraop_bin} -ntp")
+        ntp = int(stdout.strip())
+
+        # setup Raop process for player and its sync childs
+        async with asyncio.TaskGroup() as tg:
+            for atv_player in self._get_sync_clients(player_id):
+                if not atv_player.atv:
+                    # just in case...
+                    await atv_player.connect()
+                tg.create_task(self._init_cliraop(atv_player, ntp))
+
+        async def _streamer() -> None:
+            queue = self.mass.player_queues.get(queue_item.queue_id)
+            player.current_item_id = f"{queue_item.queue_id}.{queue_item.queue_item_id}"
+            player.elapsed_time = 0
+            player.elapsed_time_last_updated = time.time()
+            player.state = PlayerState.PLAYING
+            self.mass.players.register_or_update(player)
+            prev_metadata_checksum: str = ""
+            pcm_format = AudioFormat(
+                content_type=ContentType.PCM_S16LE,
+                sample_rate=44100,
+                bit_depth=16,
+                channels=2,
+            )
+            try:
+                async for pcm_chunk in self.mass.streams.get_flow_stream(
+                    queue,
+                    start_queue_item=queue_item,
+                    pcm_format=pcm_format,
+                    seek_position=seek_position,
+                    fade_in=fade_in,
+                ):
+                    # send metadata to player(s) if needed
+                    # NOTE: this must all be done in separate tasks to not disturb audio
+                    if queue and queue.current_item and queue.current_item.streamdetails:
+                        metadata_checksum = (
+                            queue.current_item.streamdetails.stream_title
+                            or queue.current_item.queue_item_id
+                        )
+                        if prev_metadata_checksum != metadata_checksum:
+                            prev_metadata_checksum = metadata_checksum
+                            self.mass.create_task(self._send_metadata(player_id))
+                    # send progress metadata
+                    for atv_player in self._get_sync_clients(player_id):
+                        self.mass.create_task(
+                            atv_player.send_cli_command(f"PROGRESS={int(queue.elapsed_time)}\n")
+                        )
+
+                    # send audio chunk to player(s)
+                    async with asyncio.TaskGroup() as tg:
+                        available_clients = 0
+                        for atv_player in self._get_sync_clients(player_id):
+                            if not atv_player.cliraop_proc or atv_player.cliraop_proc.closed:
+                                # this may not happen, but just in case
+                                continue
+                            available_clients += 1
+                            tg.create_task(atv_player.cliraop_proc.write(pcm_chunk))
+                        if not available_clients:
+                            return
+
+            finally:
+                self.logger.debug("Streamer task ended for player %s", queue.display_name)
+                for atv_player in self._get_sync_clients(player_id):
+                    if atv_player.cliraop_proc and not atv_player.cliraop_proc.closed:
+                        atv_player.cliraop_proc.write_eof()
+
+        # start streaming the queue (pcm) audio in a background task
+        self._stream_tasks[player_id] = asyncio.create_task(_streamer())
 
     async def play_stream(self, player_id: str, stream_job: MultiClientStreamJob) -> None:
         """Handle PLAY STREAM on given player.
 
         This is a special feature from the Universal Group provider.
         """
-        # simply forward to underlying slimproto player
-        slimproto_prov = self.mass.get_provider("slimproto")
-        await slimproto_prov.play_stream(player_id, stream_job)
-
-    async def enqueue_next_queue_item(self, player_id: str, queue_item: QueueItem) -> None:
-        """Handle enqueuing of the next queue item on the player."""
-        # simply forward to underlying slimproto player
-        slimproto_prov = self.mass.get_provider("slimproto")
-        await slimproto_prov.enqueue_next_queue_item(player_id, queue_item)
-
-    async def cmd_pause(self, player_id: str) -> None:
-        """Send PAUSE command to given player."""
-        # simply forward to underlying slimproto player
-        slimproto_prov = self.mass.get_provider("slimproto")
-        await slimproto_prov.cmd_pause(player_id)
+        raise NotImplementedError
 
     async def cmd_power(self, player_id: str, powered: bool) -> None:
-        """Send POWER command to given player."""
-        # simply forward to underlying slimproto player
-        slimproto_prov = self.mass.get_provider("slimproto")
-        await slimproto_prov.cmd_power(player_id, powered)
+        """Send POWER command to given player.
+
+        - player_id: player_id of the player to handle the command.
+        - powered: bool if player should be powered on or off.
+        """
+        atv_player = self._atv_players[player_id]
+        mass_player = self.mass.players.get(player_id)
+        if powered:
+            await atv_player.connect()
+        elif not powered:
+            await self.cmd_stop(player_id)
+            await atv_player.disconnect()
+        mass_player.powered = powered
+        self.mass.players.update(player_id)
 
     async def cmd_volume_set(self, player_id: str, volume_level: int) -> None:
-        """Send VOLUME_SET command to given player."""
-        # simply forward to underlying slimproto player
-        slimproto_prov = self.mass.get_provider("slimproto")
-        await slimproto_prov.cmd_volume_set(player_id, volume_level)
+        """Send VOLUME_SET command to given player.
 
-    async def cmd_volume_mute(self, player_id: str, muted: bool) -> None:
-        """Send VOLUME MUTE command to given player."""
-        # simply forward to underlying slimproto player
-        slimproto_prov = self.mass.get_provider("slimproto")
-        await slimproto_prov.cmd_volume_mute(player_id, muted)
+        - player_id: player_id of the player to handle the command.
+        - volume_level: volume level (0..100) to set on the player.
+        """
+        atv_player = self._atv_players[player_id]
+        if atv_player.cliraop_proc:
+            # prefer interactive command to our streamer
+            await atv_player.send_cli_command(f"VOLUME={volume_level}")
+        elif atv := atv_player.atv:
+            await atv.audio.set_volume(volume_level)
 
     async def cmd_sync(self, player_id: str, target_player: str) -> None:
-        """Handle SYNC command for given player."""
-        # simply forward to underlying slimproto player
-        slimproto_prov = self.mass.get_provider("slimproto")
-        await slimproto_prov.cmd_sync(player_id, target_player)
+        """Handle SYNC command for given player.
+
+        Join/add the given player(id) to the given (master) player/sync group.
+
+            - player_id: player_id of the player to handle the command.
+            - target_player: player_id of the syncgroup master or group player.
+        """
+        player = self.mass.players.get(player_id, raise_unavailable=True)
+        group_leader = self.mass.players.get(target_player, raise_unavailable=True)
+        if group_leader.synced_to:
+            raise RuntimeError("Player is already synced")
+        player.synced_to = target_player
+        group_leader.group_childs.add(player_id)
+        self.mass.players.update(target_player)
+        if group_leader.powered:
+            await self.cmd_power(player_id, True)
+        active_queue = self.mass.player_queues.get_active_queue(group_leader.player_id)
+        if active_queue.state == PlayerState.PLAYING:
+            self.mass.create_task(self.mass.player_queues.resume(active_queue.queue_id))
 
     async def cmd_unsync(self, player_id: str) -> None:
-        """Handle UNSYNC command for given player."""
-        # simply forward to underlying slimproto player
-        slimproto_prov = self.mass.get_provider("slimproto")
-        await slimproto_prov.cmd_unsync(player_id)
-
-    async def _handle_player_register_callback(self, player: Player) -> None:
-        """Handle player register callback from slimproto source player."""
-        # TODO: Can we get better device info from mDNS ?
-        player.provider = self.instance_id
-        player.device_info = DeviceInfo(
-            model="Airplay device",
-            address=player.device_info.address,
-            manufacturer="Generic",
+        """Handle UNSYNC command for given player.
+
+        Remove the given player from any syncgroups it currently is synced to.
+
+            - player_id: player_id of the player to handle the command.
+        """
+        player = self.mass.players.get(player_id, raise_unavailable=True)
+        if not player.synced_to:
+            return
+        group_leader = self.mass.players.get(player.synced_to, raise_unavailable=True)
+        group_leader.group_childs.remove(player_id)
+        player.synced_to = None
+        if player.state == PlayerState.PLAYING:
+            await self.cmd_stop(player_id)
+        self.mass.players.update(player_id)
+
+    async def _run_discovery(self) -> None:
+        """Discover Airplay players on the network."""
+        if self._discovery_running:
+            return
+        try:
+            self._discovery_running = True
+            self.logger.debug("Airplay discovery started...")
+            discovered_devices = await scan(self.mass.loop, timeout=5)
+
+            if not discovered_devices:
+                self.logger.debug("No devices found")
+                return
+
+            for dev in discovered_devices:
+                self.mass.create_task(self._player_discovered(dev))
+
+        finally:
+            self._discovery_running = False
+
+        def reschedule():
+            self.mass.create_task(self._run_discovery())
+
+        # reschedule self once finished
+        self.mass.loop.call_later(300, reschedule)
+
+    async def _player_discovered(self, discovery_info: interface.BaseConfig) -> None:
+        """Handle discovered Airplay player on mdns."""
+        player_id = f"ap{discovery_info.identifier.lower().replace(':', '')}"
+        if player_id in self._atv_players:
+            atv_player = self._atv_players[player_id]
+            if discovery_info.address != atv_player.discovery_info.address:
+                atv_player.address_updated(discovery_info.address)
+            return
+        self.logger.info(f"Connecting to {discovery_info.address}")
+        self._atv_players[player_id] = atv_player = AirPlayPlayer(
+            self.mass, player_id, discovery_info
         )
-        player.supports_24bit = False
-        # disable sonos by default
-        if "sonos" in player.name.lower() or "rincon" in player.name.lower():
-            player.enabled_by_default = False
-
-        # extend info from the discovery xml
-        async with aiofiles.open(self._config_file, "r") as _file:
-            xml_data = await _file.read()
-            with suppress(ET.ParseError):
-                xml_root = ET.XML(xml_data)
-                for device_elem in xml_root.findall("device"):
-                    player_id = device_elem.find("mac").text
-                    if player_id != player.player_id:
-                        continue
-                    # prefer name from UDN because default name is often wrong
-                    udn = device_elem.find("udn").text
-                    udn_name = udn.split("@")[1].split("._")[0]
-                    player.name = udn_name
-                    break
+        atv_player._setup_device()
+        for player in self.players:
+            player.can_sync_with = tuple(x for x in self._atv_players if x != player.player_id)
+            self.mass.players.update(player.player_id)
 
-    def _handle_player_update_callback(self, player: Player) -> None:
-        """Handle player update callback from slimproto source player."""
+    def _is_feature_available(
+        self, atv_player: interface.AppleTV, feature: interface.FeatureName
+    ) -> bool:
+        """Return if a feature is available."""
+        mass_player = self.mass.players.get(atv_player.device_info.output_device_id)
+        if atv_player and mass_player.state == PlayerState.PLAYING:
+            return atv_player.features.in_state(interface.FeatureState.Available, feature)
+        return False
 
-    async def _get_bridge_binary(self):
-        """Find the correct bridge binary belonging to the platform."""
+    async def get_cliraop_binary(self):
+        """Find the correct raop/airplay binary belonging to the platform."""
         # ruff: noqa: SIM102
+        if self._cliraop_bin is not None:
+            return self._cliraop_bin
 
-        async def check_bridge_binary(bridge_binary_path: str) -> str | None:
+        async def check_binary(cliraop_path: str) -> str | None:
             try:
-                bridge_binary = await asyncio.create_subprocess_exec(
-                    *[bridge_binary_path, "-t", "-x", self._config_file],
+                cliraop = await asyncio.create_subprocess_exec(
+                    *[cliraop_path, "-check"],
                     stdout=asyncio.subprocess.PIPE,
+                    stderr=asyncio.subprocess.STDOUT,
                 )
-                stdout, _ = await bridge_binary.communicate()
-                if (
-                    bridge_binary.returncode == 1
-                    and b"This program is free software: you can redistribute it and/or modify"
-                    in stdout
-                ):
-                    self._bridge_bin = bridge_binary_path
-                    return bridge_binary_path
-            except OSError as err:
-                self.logger.exception(err)
+                stdout, _ = await cliraop.communicate()
+                stdout = stdout.strip().decode()
+                if cliraop.returncode == 0 and stdout == "cliraop check":
+                    self._cliraop_bin = cliraop_path
+                    return cliraop_path
+            except OSError:
                 return None
 
         base_path = os.path.join(os.path.dirname(__file__), "bin")
-
-        system = platform.system().lower()
+        system = platform.system().lower().replace("darwin", "macos")
         architecture = platform.machine().lower()
 
-        if bridge_binary := await check_bridge_binary(
-            os.path.join(base_path, f"squeeze2raop-{system}-{architecture}-static")
+        if bridge_binary := await check_binary(
+            os.path.join(base_path, f"cliraop-{system}-{architecture}")
         ):
             return bridge_binary
 
-        msg = f"Unable to locate RaopBridge for {system}/{architecture}"
+        msg = f"Unable to locate RAOP Play binary for {system}/{architecture}"
         raise RuntimeError(msg)
 
-    async def _bridge_process_runner(self, slimproto_prov: SlimprotoProvider) -> None:
-        """Run the bridge binary in the background."""
-        self.logger.debug(
-            "Starting Airplay bridge using config file %s",
-            self._config_file,
+    def _get_sync_clients(self, player_id: str) -> list[AirPlayPlayer]:
+        """Get all sync clients for a player."""
+        mass_player = self.mass.players.get(player_id, True)
+        sync_clients: list[AirPlayPlayer] = []
+        # we need to return the player itself too
+        group_child_ids = {player_id}
+        group_child_ids.update(mass_player.group_childs)
+        for child_id in group_child_ids:
+            if client := self._atv_players.get(child_id):
+                sync_clients.append(client)
+        return sync_clients
+
+    async def _init_cliraop(self, atv_player: AirPlayPlayer, ntp: int) -> None:
+        """Initiatlize CLIRaop process for a player."""
+        stream: RaopStream | None = next(
+            (x for x in atv_player.atv.stream.instances if isinstance(x, RaopStream)), None
         )
-        conf_log_level = self.config.get_value(CONF_LOG_LEVEL)
-        enable_debug_log = conf_log_level == "DEBUG"
-        args = [
-            self._bridge_bin,
-            "-s",
-            f"localhost:{slimproto_prov.port}",
-            "-x",
-            self._config_file,
-            "-I",
-            "-Z",
-            "-d",
-            f'all={"debug" if enable_debug_log else "warn"}',
-            # filter out apple tv's for now until we fix auth
-            "-m",
-            "apple-tv,appletv",
-        ]
-        start_success = False
-        while True:
-            try:
-                self._bridge_proc = await asyncio.create_subprocess_shell(
-                    " ".join(args),
-                    stdout=asyncio.subprocess.PIPE,
-                    stderr=asyncio.subprocess.STDOUT,
-                )
-                self._log_reader_task = asyncio.create_task(self._log_reader())
-                await self._bridge_proc.wait()
-            except Exception as err:
-                if not start_success:
-                    raise
-                self.logger.exception("Error in Airplay bridge", exc_info=err)
-            if self._closing:
-                break
-            await asyncio.sleep(10)
+        if stream is None:
+            raise RuntimeError("RAOP Not available")
 
-    async def _stop_bridge(self) -> None:
-        """Stop the bridge process."""
-        if self._bridge_proc:
-            try:
-                self.logger.info("Stopping bridge process...")
-                self._bridge_proc.terminate()
-                await self._bridge_proc.wait()
-                self.logger.info("Bridge process stopped.")
-                await asyncio.sleep(5)
-            except ProcessLookupError:
-                pass
-        if self._log_reader_task and not self._log_reader_task.done():
-            self._log_reader_task.cancel()
-
-    async def _check_config_xml(self, recreate: bool = False) -> None:
-        """Check the bridge config XML file."""
-        # ruff: noqa: PLR0915
-        if recreate or not os.path.isfile(self._config_file):
-            if os.path.isfile(self._config_file):
-                os.remove(self._config_file)
-            # discover players and create default config file
-            args = [
-                self._bridge_bin,
-                "-i",
-                self._config_file,
-            ]
-            proc = await asyncio.create_subprocess_shell(
-                " ".join(args),
-                stdout=asyncio.subprocess.DEVNULL,
-                stderr=asyncio.subprocess.DEVNULL,
+        async def log_watcher(cliraop_proc: AsyncProcess) -> None:
+            """Monitor stderr for a running CLIRaop process."""
+            mass_player = self.mass.players.get(atv_player.player_id)
+            logger = self.logger.getChild(atv_player.player_id)
+            async for line in cliraop_proc._proc.stderr:
+                line = line.decode().strip()  # noqa: PLW2901
+                if "set pause" in line:
+                    atv_player.optimistic_state = PlayerState.PAUSED
+                    atv_player.update_attributes()
+                if "Restarted at" in line:
+                    atv_player.optimistic_state = PlayerState.PLAYING
+                    atv_player.update_attributes()
+                elif "after start), played" in line:
+                    millis = int(line.split("played ")[1].split(" ")[0])
+                    mass_player.elapsed_time = millis / 1000
+                    mass_player.elapsed_time_last_updated = time.time()
+                else:
+                    logger.debug(line)
+            # if we reach this point, the process exited
+            if cliraop_proc._proc.returncode is not None:
+                cliraop_proc.closed = True
+            logger.debug(
+                "CLIRaop process stopped with errorcode %s",
+                cliraop_proc._proc.returncode,
             )
-            await proc.wait()
 
-        # read xml file's data
-        async with aiofiles.open(self._config_file, "r") as _file:
-            xml_data = await _file.read()
+        extra_args = []
+        latency = self.mass.config.get_raw_player_config_value(
+            atv_player.player_id, CONF_LATENCY, 1200
+        )
+        extra_args += ["-l", str(latency)]
+        if self.mass.config.get_raw_player_config_value(
+            atv_player.player_id, CONF_ENCRYPTION, False
+        ):
+            extra_args += ["-et"]
+        if self.mass.config.get_raw_player_config_value(
+            atv_player.player_id, CONF_ALAC_ENCODE, False
+        ):
+            extra_args += ["-a"]
+        if self.mass.config.get_raw_player_config_value(
+            atv_player.player_id, CONF_VOLUME_START, False
+        ):
+            extra_args += ["-v", str(int(atv_player.atv.audio.volume))]
+        sync_adjust = self.mass.config.get_raw_player_config_value(
+            atv_player.player_id, CONF_SYNC_ADJUST, 0
+        )
 
-        try:
-            xml_root = ET.XML(xml_data)
-        except ET.ParseError:
-            if recreate:
-                raise
-            await self._check_config_xml(True)
-            return
+        atv_player.optimistic_state = PlayerState.PLAYING
+        # always generate a new active remote id to prevent race conditions
+        # with the named pipe used to send commands
+        atv_player.active_remote_id = str(randint(1000, 8000))
+        args = [
+            self._cliraop_bin,
+            "-n",
+            str(ntp),
+            "-p",
+            str(stream.core.service.port),
+            "-w",
+            str(2000 + sync_adjust),
+            *extra_args,
+            "-dacp",
+            self.dacp_id,
+            "-ar",
+            atv_player.active_remote_id,
+            "-md",
+            atv_player.discovery_info.properties["_raop._tcp.local"]["md"],
+            "-et",
+            atv_player.discovery_info.properties["_raop._tcp.local"]["et"],
+            str(atv_player.discovery_info.address),
+            "-",
+        ]
+        if platform.system() == "Darwin":
+            os.environ["DYLD_LIBRARY_PATH"] = "/usr/local/lib"
+        atv_player.cliraop_proc = AsyncProcess(
+            args, enable_stdin=True, enable_stdout=False, enable_stderr=True
+        )
+        await atv_player.cliraop_proc.start()
+        # send empty command to unblock named pipe
+        await atv_player.send_cli_command("\n")
+        atv_player.cliraop_proc.attach_task(log_watcher(atv_player.cliraop_proc))
 
-        # set common/global values
-        common_elem = xml_root.find("common")
-        for key, value in {
-            "codecs": "flc,pcm",
-            "sample_rate": "44100",
-            "resample": "0",
-        }.items():
-            xml_elem = common_elem.find(key)
-            xml_elem.text = value
-
-        # default values for players
-        for conf_entry in PLAYER_CONFIG_ENTRIES:
-            if conf_entry.type == ConfigEntryType.LABEL:
-                continue
-            conf_val = conf_entry.default_value
-            xml_elem = common_elem.find(conf_entry.key)
-            if xml_elem is None:
-                xml_elem = ET.SubElement(common_elem, conf_entry.key)
-            if conf_entry.type == ConfigEntryType.BOOLEAN:
-                xml_elem.text = "1" if conf_val else "0"
+    async def _send_metadata(self, player_id: str) -> None:
+        """Send metadata to player (and connected sync childs)."""
+        queue = self.mass.player_queues.get_active_queue(player_id)
+        if not queue or not queue.current_item:
+            return
+        duration = min(queue.current_item.duration or 0, 3600)
+        title = queue.current_item.name
+        artist = ""
+        album = ""
+        if queue.current_item.streamdetails and queue.current_item.streamdetails.stream_title:
+            # stream title from radio station
+            stream_title = queue.current_item.streamdetails.stream_title
+            if " - " in stream_title:
+                artist, title = stream_title.split(" - ", 1)
             else:
-                xml_elem.text = str(conf_val)
-
-        # get/set all device configs
-        for device_elem in xml_root.findall("device"):
-            player_id = device_elem.find("mac").text
-            if player_id in self._removed_players:
-                xml_root.remove(device_elem)
-                self._removed_players.remove(player_id)
-                continue
-            # use raw config values because players are not
-            # yet available at startup/init (race condition)
-            raw_player_conf = self.mass.config.get(f"{CONF_PLAYERS}/{player_id}")
-            if not raw_player_conf:
-                continue
-            device_elem.find("enabled").text = "1" if raw_player_conf["enabled"] else "0"
-
-            # set some values that are not (yet) configurable
-            for key, value in {
-                "player_volume": "-1",
-                "prevent_playback": "off",
-            }.items():
-                xml_elem = device_elem.find(key)
-                if xml_elem is None:
-                    xml_elem = ET.SubElement(device_elem, key)
-                xml_elem.text = value
-
-            # set values based on config entries
-            for conf_entry in PLAYER_CONFIG_ENTRIES:
-                if conf_entry.type == ConfigEntryType.LABEL:
-                    continue
-                conf_val = raw_player_conf["values"].get(conf_entry.key, conf_entry.default_value)
-                xml_elem = device_elem.find(conf_entry.key)
-                if xml_elem is None:
-                    xml_elem = ET.SubElement(device_elem, conf_entry.key)
-                if conf_entry.type == ConfigEntryType.BOOLEAN:
-                    xml_elem.text = "1" if conf_val else "0"
-                else:
-                    xml_elem.text = str(conf_val)
-
-        # save config file
-        async with aiofiles.open(self._config_file, "w") as _file:
-            xml_str = ET.tostring(xml_root)
-            await _file.write(xml_str.decode())
-
-    async def _log_reader(self) -> None:
-        """Read log output from bridge process."""
-        bridge_logger = self.logger.getChild("squeeze2raop")
-        while self._bridge_proc.returncode is None:
-            async for line in self._bridge_proc.stdout:
-                bridge_logger.debug(line.decode().strip())
-
-    def restart_bridge(self) -> None:
-        """Schedule restart of bridge process."""
-        if self._timer_handle is not None:
-            self._timer_handle.cancel()
-            self._timer_handle = None
-
-        async def restart_bridge() -> None:
-            self.logger.info("Restarting Airplay bridge (due to config changes)")
-            await self._stop_bridge()
-            await self._check_config_xml()
-
-        # schedule the action for later
-        self._timer_handle = self.mass.loop.call_later(10, self.mass.create_task, restart_bridge)
+                title = stream_title
+            # set album to radio station name
+            album = queue.current_item.name
+        if media_item := queue.current_item.media_item:
+            if artist_str := getattr(media_item, "artist_str", None):
+                artist = artist_str
+            if _album := getattr(media_item, "album", None):
+                album = _album.name
+
+        cmd = f"TITLE={title or 'Music Assistant'}\nARTIST={artist}\nALBUM={album}\n"
+        cmd += f"DURATION={duration}\nACTION=SENDMETA\n"
+
+        async with asyncio.TaskGroup() as tg:
+            for atv_player in self._get_sync_clients(player_id):
+                tg.create_task(atv_player.send_cli_command(cmd))
+
+        # get image
+        if not queue.current_item.image:
+            return
+        image_path = create_tempfile()
+        image_data = await self.mass.metadata.get_thumbnail(
+            queue.current_item.image.path,
+            512,
+            queue.current_item.image.provider,
+        )
+        async with aiofiles.open(image_path.name, "wb") as outfile:
+            await outfile.write(image_data)
+            async with asyncio.TaskGroup() as tg:
+                for atv_player in self._get_sync_clients(player_id):
+                    if image_path:
+                        tg.create_task(atv_player.send_cli_command(f"ARTWORK={image_path.name}\n"))
+        # make sure the temp file gets deleted again
+        await asyncio.sleep(5)
+        image_path.close()
diff --git a/music_assistant/server/providers/airplay/bin/cliraop-linux-aarch64 b/music_assistant/server/providers/airplay/bin/cliraop-linux-aarch64
new file mode 100755 (executable)
index 0000000..ef4c4eb
Binary files /dev/null and b/music_assistant/server/providers/airplay/bin/cliraop-linux-aarch64 differ
diff --git a/music_assistant/server/providers/airplay/bin/cliraop-linux-x86_64 b/music_assistant/server/providers/airplay/bin/cliraop-linux-x86_64
new file mode 100755 (executable)
index 0000000..ba25f08
Binary files /dev/null and b/music_assistant/server/providers/airplay/bin/cliraop-linux-x86_64 differ
diff --git a/music_assistant/server/providers/airplay/bin/cliraop-macos-arm64 b/music_assistant/server/providers/airplay/bin/cliraop-macos-arm64
new file mode 100755 (executable)
index 0000000..4a91d90
Binary files /dev/null and b/music_assistant/server/providers/airplay/bin/cliraop-macos-arm64 differ
diff --git a/music_assistant/server/providers/airplay/bin/squeeze2raop-darwin-arm64-static b/music_assistant/server/providers/airplay/bin/squeeze2raop-darwin-arm64-static
deleted file mode 100755 (executable)
index ca6cb7a..0000000
Binary files a/music_assistant/server/providers/airplay/bin/squeeze2raop-darwin-arm64-static and /dev/null differ
diff --git a/music_assistant/server/providers/airplay/bin/squeeze2raop-darwin-x86_64-static b/music_assistant/server/providers/airplay/bin/squeeze2raop-darwin-x86_64-static
deleted file mode 100755 (executable)
index 69c2027..0000000
Binary files a/music_assistant/server/providers/airplay/bin/squeeze2raop-darwin-x86_64-static and /dev/null differ
diff --git a/music_assistant/server/providers/airplay/bin/squeeze2raop-linux-aarch64-static b/music_assistant/server/providers/airplay/bin/squeeze2raop-linux-aarch64-static
deleted file mode 100755 (executable)
index ace5d07..0000000
Binary files a/music_assistant/server/providers/airplay/bin/squeeze2raop-linux-aarch64-static and /dev/null differ
diff --git a/music_assistant/server/providers/airplay/bin/squeeze2raop-linux-arm-static b/music_assistant/server/providers/airplay/bin/squeeze2raop-linux-arm-static
deleted file mode 100755 (executable)
index 0b3da4d..0000000
Binary files a/music_assistant/server/providers/airplay/bin/squeeze2raop-linux-arm-static and /dev/null differ
diff --git a/music_assistant/server/providers/airplay/bin/squeeze2raop-linux-x86-static b/music_assistant/server/providers/airplay/bin/squeeze2raop-linux-x86-static
deleted file mode 100755 (executable)
index 0593e70..0000000
Binary files a/music_assistant/server/providers/airplay/bin/squeeze2raop-linux-x86-static and /dev/null differ
diff --git a/music_assistant/server/providers/airplay/bin/squeeze2raop-linux-x86_64-static b/music_assistant/server/providers/airplay/bin/squeeze2raop-linux-x86_64-static
deleted file mode 100755 (executable)
index 0e2e4da..0000000
Binary files a/music_assistant/server/providers/airplay/bin/squeeze2raop-linux-x86_64-static and /dev/null differ
index 9460f5e0c8da704f2bc5f4ed4831b001dd505a75..9e5fb135160fdb94b7e7995a9c9ff762cfbcf114 100644 (file)
@@ -9,6 +9,5 @@
   "multi_instance": false,
   "builtin": false,
   "load_by_default": true,
-  "depends_on": "slimproto",
-  "icon":"cast-variant"
+  "icon": "cast-variant"
 }
index 14e124e2d54846cc06bcf3b50bf1bdf69d6b175b..272370559856d21532671be3c772bacd1e8a7941 100644 (file)
@@ -1,13 +1,8 @@
 [project]
 name = "music_assistant"
 # The version is set by GH action on release
-version = "0.0.0"
-license = { text = "Apache-2.0" }
-description = "Music Assistant"
-readme = "README.md"
-requires-python = ">=3.11"
 authors = [
-  { name = "The Music Assistant Authors", email = "marcelveldt@users.noreply.github.com" },
+  {name = "The Music Assistant Authors", email = "marcelveldt@users.noreply.github.com"},
 ]
 classifiers = [
   "Environment :: Console",
@@ -15,6 +10,11 @@ classifiers = [
   "Programming Language :: Python :: 3.12",
 ]
 dependencies = ["aiohttp", "orjson", "mashumaro"]
+description = "Music Assistant"
+license = {text = "Apache-2.0"}
+readme = "README.md"
+requires-python = ">=3.11"
+version = "0.0.0"
 
 [project.optional-dependencies]
 server = [
@@ -63,10 +63,10 @@ mass = "music_assistant.__main__:main"
 ignore-words-list = "provid,hass,followings,childs"
 
 [tool.setuptools]
+include-package-data = true
+packages = ["music_assistant"]
 platforms = ["any"]
 zip-safe = false
-packages = ["music_assistant"]
-include-package-data = true
 
 [tool.setuptools.package-data]
 music_assistant = ["py.typed"]
@@ -78,19 +78,17 @@ show-fixes = true
 line-length = 100
 target-version = "py311"
 
-
 [tool.ruff.lint.pydocstyle]
 # Use Google-style docstrings.
 convention = "pep257"
 
 [tool.ruff.lint.pylint]
 
+max-args = 10
 max-branches = 25
 max-returns = 15
-max-args = 10
 max-statements = 50
 
-
 [tool.mypy]
 platform = "linux"
 python_version = "3.11"
@@ -134,13 +132,10 @@ disable = [
   "duplicate-code",
   "format",
   "unsubscriptable-object",
-  "unused-argument",                           # handled by ruff
-  "unspecified-encoding",                      # handled by ruff
+  "unused-argument", # handled by ruff
+  "unspecified-encoding", # handled by ruff
   "isinstance-second-argument-not-valid-type", # conflict with ruff
-  "fixme",                                     # we're still developing
-
-  # TEMPORARY DISABLED rules
-  # The below rules must be enabled later one-by-one !
+  "fixme", # we're still developing  # TEMPORARY DISABLED rules  # The below rules must be enabled later one-by-one !
   "too-many-return-statements",
   "unsupported-assignment-operation",
   "invalid-name",
@@ -165,12 +160,10 @@ disable = [
   "no-else-raise",
   "undefined-loop-variable",
   "too-many-nested-blocks",
-  "too-many-public-methods",          # unavoidable?
-  "too-many-arguments",               # unavoidable?
-  "too-many-branches",                # unavoidable?
-  "too-many-instance-attributes",     # unavoidable?
-
-
+  "too-many-public-methods", # unavoidable?
+  "too-many-arguments", # unavoidable?
+  "too-many-branches", # unavoidable?
+  "too-many-instance-attributes", # unavoidable?
 ]
 
 [tool.pylint.SIMILARITIES]
@@ -185,27 +178,24 @@ asyncio_mode = "auto"
 
 [tool.ruff.lint]
 ignore = [
-  "ANN002",  # Just annoying, not really useful
-  "ANN003",  # Just annoying, not really useful
-  "ANN101",  # Self... explanatory
-  "ANN401",  # Opinioated warning on disallowing dynamically typed expressions
-  "D203",    # Conflicts with other rules
-  "D213",    # Conflicts with other rules
-  "D417",    # False positives in some occasions
-  "FIX002",  # Just annoying, not really useful
+  "ANN002", # Just annoying, not really useful
+  "ANN003", # Just annoying, not really useful
+  "ANN101", # Self... explanatory
+  "ANN401", # Opinioated warning on disallowing dynamically typed expressions
+  "D203", # Conflicts with other rules
+  "D213", # Conflicts with other rules
+  "D417", # False positives in some occasions
+  "EM101", # Just annoying, not really useful
+  "FIX002", # Just annoying, not really useful
   "PLR2004", # Just annoying, not really useful
-  "PD011",   # Just annoying, not really useful
-  "S101",    # assert is often used to satisfy type checking
-  "TD002",   # Just annoying, not really useful
-  "TD003",   # Just annoying, not really useful
-  "TD004",   # Just annoying, not really useful
-
-  # Conflicts with the Ruff formatter
-  "COM812",
-  "ISC001",
-
-  # TEMPORARY DISABLED rules
-  # The below rules must be enabled later one-by-one !
+  "PD011", # Just annoying, not really useful
+  "S101", # assert is often used to satisfy type checking
+  "TD002", # Just annoying, not really useful
+  "TD003", # Just annoying, not really useful
+  "TD004", # Just annoying, not really useful
+  "TRY003", # Just annoying, not really useful
+  "COM812", # Conflicts with the Ruff formatter
+  "ISC001", # TEMPORARY DISABLED rules  # The below rules must be enabled later one-by-one !
   "BLE001",
   "FBT001",
   "FBT002",