From: Marcel van der Veldt Date: Fri, 31 Oct 2025 01:16:46 +0000 (+0100) Subject: Optimizations to the AirPlay provider X-Git-Url: https://git.kitaultman.com/?a=commitdiff_plain;h=80128a37d617e23cdae09df407ae9c36ae734463;p=music-assistant-server.git Optimizations to the AirPlay provider - Some refactoring for code maintability - Add late join support - Prepare for AirPlay 2 (split up protocols) - Various fixes for buffering --- diff --git a/music_assistant/providers/airplay/__init__.py b/music_assistant/providers/airplay/__init__.py index 2bbd0283..b6121c97 100644 --- a/music_assistant/providers/airplay/__init__.py +++ b/music_assistant/providers/airplay/__init__.py @@ -5,10 +5,11 @@ from __future__ import annotations from typing import TYPE_CHECKING from music_assistant_models.config_entries import ProviderConfig -from music_assistant_models.enums import ProviderFeature +from music_assistant_models.enums import ConfigEntryType, ProviderFeature from music_assistant_models.provider import ProviderManifest from music_assistant.mass import MusicAssistant +from music_assistant.providers.airplay.constants import CONF_ENABLE_LATE_JOIN from .provider import AirPlayProvider @@ -37,7 +38,20 @@ async def get_config_entries( values: the (intermediate) raw values for config entries sent with the action. """ # ruff: noqa: ARG001 - return () + return ( + ConfigEntry( + key=CONF_ENABLE_LATE_JOIN, + type=ConfigEntryType.BOOLEAN, + default_value=False, + label="Enable late joining", + description=( + "Allow the player to join an existing AirPlay stream instead of " + "starting a new one. \n NOTE: may not work in all conditions. " + "If you experience issues or players are not fully in sync, disable this option." + ), + category="airplay", + ), + ) async def setup( diff --git a/music_assistant/providers/airplay/bin/cliraop-linux-aarch64 b/music_assistant/providers/airplay/bin/cliraop-linux-aarch64 index 2266b5da..4eae6a13 100755 Binary files a/music_assistant/providers/airplay/bin/cliraop-linux-aarch64 and b/music_assistant/providers/airplay/bin/cliraop-linux-aarch64 differ diff --git a/music_assistant/providers/airplay/bin/cliraop-linux-x86_64 b/music_assistant/providers/airplay/bin/cliraop-linux-x86_64 index 0b6faf9e..dc6cedb9 100755 Binary files a/music_assistant/providers/airplay/bin/cliraop-linux-x86_64 and b/music_assistant/providers/airplay/bin/cliraop-linux-x86_64 differ diff --git a/music_assistant/providers/airplay/bin/cliraop-macos-arm64 b/music_assistant/providers/airplay/bin/cliraop-macos-arm64 index c62298b8..8de2c4cc 100755 Binary files a/music_assistant/providers/airplay/bin/cliraop-macos-arm64 and b/music_assistant/providers/airplay/bin/cliraop-macos-arm64 differ diff --git a/music_assistant/providers/airplay/constants.py b/music_assistant/providers/airplay/constants.py index e02a5982..473604b1 100644 --- a/music_assistant/providers/airplay/constants.py +++ b/music_assistant/providers/airplay/constants.py @@ -2,6 +2,7 @@ from __future__ import annotations +from enum import Enum from typing import Final from music_assistant_models.enums import ContentType @@ -11,6 +12,14 @@ from music_assistant.constants import INTERNAL_PCM_FORMAT DOMAIN = "airplay" + +class StreamingProtocol(Enum): + """AirPlay streaming protocol versions.""" + + RAOP = 1 # AirPlay 1 (RAOP) + AIRPLAY2 = 2 # AirPlay 2 + + CACHE_CATEGORY_PREV_VOLUME: Final[int] = 1 CONF_ENCRYPTION: Final[str] = "encryption" @@ -20,6 +29,22 @@ CONF_PASSWORD: Final[str] = "password" CONF_READ_AHEAD_BUFFER: Final[str] = "read_ahead_buffer" CONF_IGNORE_VOLUME: Final[str] = "ignore_volume" CONF_CREDENTIALS: Final[str] = "credentials" +CONF_AIRPLAY_PROTOCOL: Final[str] = "airplay_protocol" + +AIRPLAY_DISCOVERY_TYPE: Final[str] = "_airplay._tcp.local." +RAOP_DISCOVERY_TYPE: Final[str] = "_raop._tcp.local." +DACP_DISCOVERY_TYPE: Final[str] = "_dacp._tcp.local." + +AIRPLAY2_MIN_LOG_LEVEL: Final[int] = 3 # Min loglevel to ensure stderr output contains what we need +CONF_AP_CREDENTIALS: Final[str] = "ap_credentials" +CONF_MRP_CREDENTIALS: Final[str] = "mrp_credentials" +CONF_ACTION_START_PAIRING: Final[str] = "start_ap_pairing" +CONF_ACTION_FINISH_PAIRING: Final[str] = "finish_ap_pairing" +CONF_ACTION_START_MRP_PAIRING: Final[str] = "start_mrp_pairing" +CONF_ACTION_FINISH_MRP_PAIRING: Final[str] = "finish_mrp_pairing" +CONF_PAIRING_PIN: Final[str] = "pairing_pin" +CONF_MRP_PAIRING_PIN: Final[str] = "mrp_pairing_pin" +CONF_ENABLE_LATE_JOIN: Final[str] = "enable_late_join" BACKOFF_TIME_LOWER_LIMIT: Final[int] = 15 # seconds BACKOFF_TIME_UPPER_LIMIT: Final[int] = 300 # Five minutes @@ -47,4 +72,6 @@ BROKEN_RAOP_MODELS = ( ("Sonos", "Arc Ultra"), # Samsung has been repeatedly being reported as having issues with AirPlay 1/raop ("Samsung", "*"), + ("Ubiquiti Inc.", "*"), + ("Juke Audio", "*"), ) diff --git a/music_assistant/providers/airplay/helpers.py b/music_assistant/providers/airplay/helpers.py index 3089990a..1b4f0333 100644 --- a/music_assistant/providers/airplay/helpers.py +++ b/music_assistant/providers/airplay/helpers.py @@ -4,16 +4,20 @@ from __future__ import annotations import os import platform +import time from typing import TYPE_CHECKING from zeroconf import IPVersion from music_assistant.helpers.process import check_output -from music_assistant.providers.airplay.constants import BROKEN_RAOP_MODELS +from music_assistant.providers.airplay.constants import BROKEN_RAOP_MODELS, StreamingProtocol if TYPE_CHECKING: from zeroconf.asyncio import AsyncServiceInfo +# NTP epoch delta: difference between Unix epoch (1970) and NTP epoch (1900) +NTP_EPOCH_DELTA = 0x83AA7E80 # 2208988800 seconds + def convert_airplay_volume(value: float) -> int: """Remap AirPlay Volume to 0..100 scale.""" @@ -25,7 +29,7 @@ def convert_airplay_volume(value: float) -> int: return int(portion + normal_min) -def get_model_info(info: AsyncServiceInfo) -> tuple[str, str]: +def get_model_info(info: AsyncServiceInfo) -> tuple[str, str]: # noqa: PLR0911 """Return Manufacturer and Model name from mdns info.""" manufacturer = info.decoded_properties.get("manufacturer") model = info.decoded_properties.get("model") @@ -57,6 +61,8 @@ def get_model_info(info: AsyncServiceInfo) -> tuple[str, str]: return ("Apple", "Apple TV 4K Gen2") if model == "AppleTV14,1": return ("Apple", "Apple TV 4K Gen3") + if model == "UPL-AMP": + return ("Ubiquiti Inc.", "UPL-AMP") if "AirPort" in model: return ("Apple", "AirPort Express") if "AudioAccessory" in model: @@ -64,6 +70,26 @@ def get_model_info(info: AsyncServiceInfo) -> tuple[str, str]: if "AppleTV" in model: model = "Apple TV" manufacturer = "Apple" + # Detect Mac devices (Mac mini, MacBook, iMac, etc.) + # Model identifiers like: Mac16,11, MacBookPro18,3, iMac21,1 + if model.startswith(("Mac", "iMac")): + # Parse Mac model to friendly name + if model.startswith("MacBookPro"): + return ("Apple", f"MacBook Pro ({model})") + if model.startswith("MacBookAir"): + return ("Apple", f"MacBook Air ({model})") + if model.startswith("MacBook"): + return ("Apple", f"MacBook ({model})") + if model.startswith("iMac"): + return ("Apple", f"iMac ({model})") + if model.startswith("Macmini"): + return ("Apple", f"Mac mini ({model})") + if model.startswith("MacPro"): + return ("Apple", f"Mac Pro ({model})") + if model.startswith("MacStudio"): + return ("Apple", f"Mac Studio ({model})") + # Generic Mac device (e.g. Mac16,11 for Mac mini M4) + return ("Apple", f"Mac ({model})") return (manufacturer or "AirPlay", model) @@ -89,17 +115,43 @@ def is_broken_raop_model(manufacturer: str, model: str) -> bool: return False -async def get_cliraop_binary() -> str: - """Find the correct raop/airplay binary belonging to the platform.""" +async def get_cli_binary(protocol: StreamingProtocol) -> str: + """Find the correct raop/airplay binary belonging to the platform. + + Args: + protocol: The streaming protocol (RAOP or AIRPLAY2) + + Returns: + Path to the CLI binary - async def check_binary(cliraop_path: str) -> str | None: + Raises: + RuntimeError: If the binary cannot be found + """ + + async def check_binary(cli_path: str) -> str | None: try: - returncode, output = await check_output( - cliraop_path, - "-check", - ) - if returncode == 0 and output.strip().decode() == "cliraop check": - return cliraop_path + if protocol == StreamingProtocol.RAOP: + args = [ + cli_path, + "-check", + ] + passing_output = "cliraop check" + else: + config_file = os.path.join(os.path.dirname(__file__), "bin", "cliap2.conf") + args = [ + cli_path, + "--testrun", + "--config", + config_file, + ] + + returncode, output = await check_output(*args) + if ( + protocol == StreamingProtocol.RAOP + and returncode == 0 + and output.strip().decode() == passing_output + ) or (protocol == StreamingProtocol.AIRPLAY2 and returncode == 0): + return cli_path except OSError: pass return None @@ -108,10 +160,125 @@ async def get_cliraop_binary() -> str: system = platform.system().lower().replace("darwin", "macos") architecture = platform.machine().lower() + if protocol == StreamingProtocol.RAOP: + package = "cliraop" + elif protocol == StreamingProtocol.AIRPLAY2: + package = "cliap2" + else: + raise RuntimeError(f"Unsupported streaming protocol requested: {protocol}") + if bridge_binary := await check_binary( - os.path.join(base_path, f"cliraop-{system}-{architecture}") + os.path.join(base_path, f"{package}-{system}-{architecture}") ): return bridge_binary - msg = f"Unable to locate RAOP Play binary for {system}/{architecture}" + msg = ( + f"Unable to locate {protocol.name} CLI stream binary {package} for {system}/{architecture}" + ) raise RuntimeError(msg) + + +def get_ntp_timestamp() -> int: + """ + Get current NTP timestamp (64-bit). + + Returns: + int: 64-bit NTP timestamp (upper 32 bits = seconds, lower 32 bits = fraction) + """ + # Get current Unix timestamp with microsecond precision + current_time = time.time() + + # Split into seconds and microseconds + seconds = int(current_time) + microseconds = int((current_time - seconds) * 1_000_000) + + # Convert to NTP epoch (add offset from 1970 to 1900) + ntp_seconds = seconds + NTP_EPOCH_DELTA + + # Convert microseconds to NTP fraction (2^32 parts per second) + # fraction = (microseconds * 2^32) / 1_000_000 + ntp_fraction = int((microseconds << 32) / 1_000_000) + + # Combine into 64-bit value + return (ntp_seconds << 32) | ntp_fraction + + +def ntp_to_seconds_fraction(ntp_timestamp: int) -> tuple[int, int]: + """ + Split NTP timestamp into seconds and fraction components. + + Args: + ntp_timestamp: 64-bit NTP timestamp + + Returns: + tuple: (seconds, fraction) + """ + seconds = ntp_timestamp >> 32 + fraction = ntp_timestamp & 0xFFFFFFFF + return seconds, fraction + + +def ntp_to_unix_time(ntp_timestamp: int) -> float: + """ + Convert NTP timestamp to Unix timestamp (float). + + Args: + ntp_timestamp: 64-bit NTP timestamp + + Returns: + float: Unix timestamp (seconds since 1970-01-01) + """ + seconds = ntp_timestamp >> 32 + fraction = ntp_timestamp & 0xFFFFFFFF + + # Convert back to Unix epoch + unix_seconds = seconds - NTP_EPOCH_DELTA + + # Convert fraction to microseconds + microseconds = (fraction * 1_000_000) >> 32 + + return unix_seconds + (microseconds / 1_000_000) + + +def unix_time_to_ntp(unix_timestamp: float) -> int: + """ + Convert Unix timestamp (float) to NTP timestamp. + + Args: + unix_timestamp: Unix timestamp (seconds since 1970-01-01) + + Returns: + int: 64-bit NTP timestamp + """ + seconds = int(unix_timestamp) + microseconds = int((unix_timestamp - seconds) * 1_000_000) + + # Convert to NTP epoch + ntp_seconds = seconds + NTP_EPOCH_DELTA + + # Convert microseconds to NTP fraction + ntp_fraction = int((microseconds << 32) / 1_000_000) + + return (ntp_seconds << 32) | ntp_fraction + + +def add_seconds_to_ntp(ntp_timestamp: int, seconds: float) -> int: + """ + Add seconds to an NTP timestamp. + + Args: + ntp_timestamp: 64-bit NTP timestamp + seconds: Number of seconds to add (can be fractional) + + Returns: + int: New NTP timestamp with seconds added + """ + # Extract whole seconds and fraction + whole_seconds = int(seconds) + fraction = seconds - whole_seconds + + # Convert to NTP format (upper 32 bits = seconds, lower 32 bits = fraction) + ntp_seconds = whole_seconds << 32 + ntp_fraction = int(fraction * (1 << 32)) + + return ntp_timestamp + ntp_seconds + ntp_fraction diff --git a/music_assistant/providers/airplay/manifest.json b/music_assistant/providers/airplay/manifest.json index 9f5724c2..278a70f8 100644 --- a/music_assistant/providers/airplay/manifest.json +++ b/music_assistant/providers/airplay/manifest.json @@ -10,5 +10,5 @@ "multi_instance": false, "builtin": false, "icon": "cast-variant", - "mdns_discovery": ["_raop._tcp.local."] + "mdns_discovery": ["_airplay._tcp.local.", "_raop._tcp.local."] } diff --git a/music_assistant/providers/airplay/player.py b/music_assistant/providers/airplay/player.py index c11b7a07..40aa721f 100644 --- a/music_assistant/providers/airplay/player.py +++ b/music_assistant/providers/airplay/player.py @@ -1,4 +1,4 @@ -"""AirPlay Player implementation.""" +"""AirPlay Player implementations.""" from __future__ import annotations @@ -6,7 +6,7 @@ import asyncio import time from typing import TYPE_CHECKING, cast -from music_assistant_models.config_entries import ConfigEntry, ConfigValueType +from music_assistant_models.config_entries import ConfigEntry, ConfigValueOption, ConfigValueType from music_assistant_models.enums import ( ConfigEntryType, ContentType, @@ -16,6 +16,7 @@ from music_assistant_models.enums import ( PlayerType, ) from music_assistant_models.media_items import AudioFormat +from propcache import under_cached_property as cached_property from music_assistant.constants import ( CONF_ENTRY_DEPRECATED_EQ_BASS, @@ -31,26 +32,35 @@ from music_assistant.models.player import DeviceInfo, Player, PlayerMedia from music_assistant.providers.universal_group.constants import UGP_PREFIX from .constants import ( + AIRPLAY_DISCOVERY_TYPE, AIRPLAY_FLOW_PCM_FORMAT, AIRPLAY_PCM_FORMAT, CACHE_CATEGORY_PREV_VOLUME, + CONF_ACTION_FINISH_PAIRING, + CONF_ACTION_START_PAIRING, + CONF_AIRPLAY_PROTOCOL, CONF_ALAC_ENCODE, + CONF_AP_CREDENTIALS, CONF_ENCRYPTION, CONF_IGNORE_VOLUME, + CONF_PAIRING_PIN, CONF_PASSWORD, CONF_READ_AHEAD_BUFFER, FALLBACK_VOLUME, + RAOP_DISCOVERY_TYPE, + StreamingProtocol, ) from .helpers import get_primary_ip_address_from_zeroconf, is_broken_raop_model -from .raop import RaopStreamSession +from .stream_session import AirPlayStreamSession if TYPE_CHECKING: from zeroconf.asyncio import AsyncServiceInfo from music_assistant.providers.universal_group import UniversalGroupPlayer + from .protocols.airplay2 import AirPlay2Stream + from .protocols.raop import RaopStream from .provider import AirPlayProvider - from .raop import RaopStream BROKEN_RAOP_WARN = ConfigEntry( @@ -70,7 +80,7 @@ class AirPlayPlayer(Player): self, provider: AirPlayProvider, player_id: str, - discovery_info: AsyncServiceInfo, + discovery_info: AsyncServiceInfo | None, address: str, display_name: str, manufacturer: str, @@ -81,10 +91,9 @@ class AirPlayPlayer(Player): super().__init__(provider, player_id) self.discovery_info = discovery_info self.address = address - self.raop_stream: RaopStream | None = None + self.stream: RaopStream | AirPlay2Stream | None = None self.last_command_sent = 0.0 self._lock = asyncio.Lock() - # Set (static) player attributes self._attr_type = PlayerType.PLAYER self._attr_name = display_name @@ -104,12 +113,31 @@ class AirPlayPlayer(Player): self._attr_can_group_with = {provider.lookup_key} self._attr_enabled_by_default = not is_broken_raop_model(manufacturer, model) + @cached_property + def protocol(self) -> StreamingProtocol: + """Get the streaming protocol to use for this player.""" + protocol_value = self.config.get_value(CONF_AIRPLAY_PROTOCOL) + # Convert integer value to StreamingProtocol enum + if protocol_value == 2: + return StreamingProtocol.AIRPLAY2 + return StreamingProtocol.RAOP + async def get_config_entries( self, action: str | None = None, values: dict[str, ConfigValueType] | None = None, ) -> list[ConfigEntry]: """Return all (provider/player specific) Config Entries for the given player (if any).""" + base_entries = await super().get_config_entries() + + # Handle pairing actions + if action and self._requires_pairing(): + await self._handle_pairing_action(action=action, values=values) + + # Add pairing config entries for Apple TV and macOS devices + if self._requires_pairing(): + base_entries = [*self._get_pairing_config_entries(), *base_entries] + base_entries = await super().get_config_entries(action=action, values=values) base_entries += [ CONF_ENTRY_FLOW_MODE_ENFORCED, @@ -117,6 +145,23 @@ class AirPlayPlayer(Player): CONF_ENTRY_DEPRECATED_EQ_MID, CONF_ENTRY_DEPRECATED_EQ_TREBLE, CONF_ENTRY_OUTPUT_CODEC_HIDDEN, + ConfigEntry( + key=CONF_AIRPLAY_PROTOCOL, + type=ConfigEntryType.INTEGER, + default_value=StreamingProtocol.RAOP.value, + required=False, + label="AirPlay version to use for streaming", + description="AirPlay version 1 protocol uses RAOP.\n" + "AirPlay version 2 is an extension of RAOP.\n" + "Some newer devices do not fully support RAOP and " + "will only work with AirPlay version 2.", + category="airplay", + options=[ + ConfigValueOption("AirPlay 1 (RAOP)", StreamingProtocol.RAOP.value), + ConfigValueOption("AirPlay 2", StreamingProtocol.AIRPLAY2.value), + ], + hidden=True, + ), ConfigEntry( key=CONF_ENCRYPTION, type=ConfigEntryType.BOOLEAN, @@ -153,9 +198,10 @@ class AirPlayPlayer(Player): label="Audio buffer (ms)", description="Amount of buffer (in milliseconds), " "the player should keep to absorb network throughput jitter. " - "If you experience audio dropouts, try increasing this value.", + "Lower values reduce latency but may cause dropouts. " + "Recommended: 1000ms for stable playback.", category="airplay", - range=(500, 3000), + range=(500, 2000), ), # airplay has fixed sample rate/bit depth so make this config entry static and hidden create_sample_rates_config_entry( @@ -177,16 +223,212 @@ class AirPlayPlayer(Player): ), ] + # Regular AirPlay config entries + base_entries.extend( + [ + ConfigEntry( + key=CONF_ENCRYPTION, + type=ConfigEntryType.BOOLEAN, + default_value=True, + label="Enable encryption", + description="Enable encrypted communication with the player, " + "should by default be enabled for most devices.", + category="airplay", + ), + ConfigEntry( + key=CONF_ALAC_ENCODE, + type=ConfigEntryType.BOOLEAN, + default_value=True, + label="Enable compression", + description="Save some network bandwidth by sending the audio as " + "(lossless) ALAC at the cost of a bit CPU.", + category="airplay", + ), + CONF_ENTRY_SYNC_ADJUST, + ConfigEntry( + key=CONF_PASSWORD, + type=ConfigEntryType.SECURE_STRING, + default_value=None, + required=False, + label="Device password", + description="Some devices require a password to connect/play.", + category="airplay", + ), + ConfigEntry( + key=CONF_READ_AHEAD_BUFFER, + type=ConfigEntryType.INTEGER, + default_value=1000, + required=False, + label="Audio buffer (ms)", + description="Amount of buffer (in milliseconds), " + "the player should keep to absorb network throughput jitter. " + "If you experience audio dropouts, try increasing this value.", + category="airplay", + range=(500, 3000), + ), + # airplay has fixed sample rate/bit depth so make this config entry + # static and hidden + create_sample_rates_config_entry( + supported_sample_rates=[44100], supported_bit_depths=[16], hidden=True + ), + ConfigEntry( + key=CONF_IGNORE_VOLUME, + type=ConfigEntryType.BOOLEAN, + default_value=False, + label="Ignore volume reports sent by the device itself", + description=( + "The AirPlay protocol allows devices to report their own volume " + "level. \n" + "For some devices this is not reliable and can cause unexpected " + "volume changes. \n" + "Enable this option to ignore these reports." + ), + category="airplay", + ), + ] + ) + if is_broken_raop_model(self.device_info.manufacturer, self.device_info.model): base_entries.insert(-1, BROKEN_RAOP_WARN) return base_entries + def _requires_pairing(self) -> bool: + """Check if this device requires pairing (Apple TV or macOS).""" + if self.device_info.manufacturer.lower() != "apple": + return False + + model = self.device_info.model + # Apple TV devices + if "appletv" in model.lower() or "apple tv" in model.lower(): + return True + # Mac devices (including iMac, MacBook, Mac mini, Mac Pro, Mac Studio) + return model.startswith(("Mac", "iMac")) + + def _get_pairing_config_entries(self) -> list[ConfigEntry]: + """Return pairing config entries for Apple TV and macOS devices. + + Uses cliraop for AirPlay/RAOP pairing. + """ + entries: list[ConfigEntry] = [] + + # Check if we have credentials stored + has_credentials = bool(self.config.get_value(CONF_AP_CREDENTIALS)) + + if not has_credentials: + # Show pairing instructions and start button + entries.append( + ConfigEntry( + key="pairing_instructions", + type=ConfigEntryType.LABEL, + label="AirPlay Pairing Required", + description=( + "This device requires pairing before it can be used. " + "Click the button below to start the pairing process." + ), + ) + ) + entries.append( + ConfigEntry( + key=CONF_ACTION_START_PAIRING, + type=ConfigEntryType.ACTION, + label="Start Pairing", + description="Start the AirPlay pairing process", + action=CONF_ACTION_START_PAIRING, + ) + ) + else: + # Show paired status + entries.append( + ConfigEntry( + key="pairing_status", + type=ConfigEntryType.LABEL, + label="AirPlay Pairing Status", + description="Device is paired and ready to use.", + ) + ) + + # If pairing was started, show PIN entry + if self.config.get_value("_pairing_in_progress"): + entries.append( + ConfigEntry( + key=CONF_PAIRING_PIN, + type=ConfigEntryType.STRING, + label="Enter PIN", + description="Enter the 4-digit PIN shown on the device", + required=True, + ) + ) + entries.append( + ConfigEntry( + key=CONF_ACTION_FINISH_PAIRING, + type=ConfigEntryType.ACTION, + label="Finish Pairing", + description="Complete the pairing process with the PIN", + action=CONF_ACTION_FINISH_PAIRING, + ) + ) + + # Store credentials (hidden from UI) + entries.append( + ConfigEntry( + key=CONF_AP_CREDENTIALS, + type=ConfigEntryType.SECURE_STRING, + label="AirPlay Credentials", + default_value=None, + required=False, + hidden=True, + ) + ) + + return entries + + async def _handle_pairing_action( + self, action: str, values: dict[str, ConfigValueType] | None + ) -> None: + """Handle pairing actions using cliraop. + + TODO: Implement actual cliraop-based pairing. + """ + if action == CONF_ACTION_START_PAIRING: + # TODO: Start pairing using cliraop + # For now, just set a flag to show the PIN entry + self.mass.config.set_raw_player_config_value( + self.player_id, "_pairing_in_progress", True + ) + self.logger.info("Started AirPlay pairing for %s", self.display_name) + + elif action == CONF_ACTION_FINISH_PAIRING: + # TODO: Finish pairing using cliraop with the provided PIN + if not values: + return + + pin = values.get(CONF_PAIRING_PIN) + if not pin: + self.logger.warning("No PIN provided for pairing") + return + + # TODO: Use cliraop to complete pairing with the PIN + # For now, just clear the pairing in progress flag + self.mass.config.set_raw_player_config_value( + self.player_id, "_pairing_in_progress", False + ) + + # TODO: Store the actual credentials obtained from cliraop + # self.mass.config.set_raw_player_config_value( + # self.player_id, CONF_AP_CREDENTIALS, credentials_from_cliraop + # ) + + self.logger.info( + "Finished AirPlay pairing for %s (TODO: implement actual pairing)", + self.display_name, + ) + async def stop(self) -> None: """Send STOP command to player.""" - if self.raop_stream and self.raop_stream.session: + if self.stream and self.stream.session: # forward stop to the entire stream session - await self.raop_stream.session.stop() + await self.stream.session.stop() self._attr_active_source = None self._attr_current_media = None self.update_state() @@ -194,8 +436,8 @@ class AirPlayPlayer(Player): async def play(self) -> None: """Send PLAY (unpause) command to player.""" async with self._lock: - if self.raop_stream and self.raop_stream.running: - await self.raop_stream.send_cli_command("ACTION=PLAY") + if self.stream and self.stream.running: + await self.stream.send_cli_command("ACTION=PLAY") async def pause(self) -> None: """Send PAUSE command to player.""" @@ -206,9 +448,9 @@ class AirPlayPlayer(Player): return async with self._lock: - if not self.raop_stream or not self.raop_stream.running: + if not self.stream or not self.stream.running: return - await self.raop_stream.send_cli_command("ACTION=PAUSE") + await self.stream.send_cli_command("ACTION=PAUSE") async def play_media(self, media: PlayerMedia) -> None: """Handle PLAY MEDIA on given player.""" @@ -225,7 +467,7 @@ class AirPlayPlayer(Player): if media.media_type == MediaType.ANNOUNCEMENT: # special case: stream announcement assert media.custom_data - input_format = AIRPLAY_PCM_FORMAT + pcm_format = AIRPLAY_PCM_FORMAT audio_source = self.mass.streams.get_announcement_stream( media.custom_data["announcement_url"], output_format=AIRPLAY_PCM_FORMAT, @@ -234,7 +476,7 @@ class AirPlayPlayer(Player): ) elif media.media_type == MediaType.PLUGIN_SOURCE: # special case: plugin source stream - input_format = AIRPLAY_PCM_FORMAT + pcm_format = AIRPLAY_PCM_FORMAT assert media.custom_data audio_source = self.mass.streams.get_plugin_source_stream( plugin_source_id=media.custom_data["source_id"], @@ -248,11 +490,11 @@ class AirPlayPlayer(Player): ugp_player = cast("UniversalGroupPlayer", self.mass.players.get(media.source_id)) ugp_stream = ugp_player.stream assert ugp_stream is not None # for type checker - input_format = ugp_stream.base_pcm_format + pcm_format = ugp_stream.base_pcm_format audio_source = ugp_stream.subscribe_raw() elif media.source_id and media.queue_item_id: # regular queue (flow) stream request - input_format = AIRPLAY_FLOW_PCM_FORMAT + pcm_format = AIRPLAY_FLOW_PCM_FORMAT queue = self.mass.player_queues.get(media.source_id) assert queue start_queue_item = self.mass.player_queues.get_item( @@ -262,12 +504,12 @@ class AirPlayPlayer(Player): audio_source = self.mass.streams.get_queue_flow_stream( queue=queue, start_queue_item=start_queue_item, - pcm_format=input_format, + pcm_format=pcm_format, ) else: # assume url or some other direct path # NOTE: this will fail if its an uri not playable by ffmpeg - input_format = AIRPLAY_PCM_FORMAT + pcm_format = AIRPLAY_PCM_FORMAT audio_source = get_ffmpeg_stream( audio_input=media.uri, input_format=AudioFormat(content_type=ContentType.try_parse(media.uri)), @@ -275,25 +517,25 @@ class AirPlayPlayer(Player): ) # if an existing stream session is running, we could replace it with the new stream - if self.raop_stream and self.raop_stream.running: + if self.stream and self.stream.running: # check if we need to replace the stream - if self.raop_stream.prevent_playback: + if self.stream.prevent_playback: # player is in prevent playback mode, we need to stop the stream await self.stop() else: - await self.raop_stream.session.replace_stream(audio_source) + await self.stream.session.replace_stream(audio_source) return - # setup RaopStreamSession for player (and its sync childs if any) + # setup StreamSession for player (and its sync childs if any) sync_clients = self._get_sync_clients() provider = cast("AirPlayProvider", self.provider) - raop_stream_session = RaopStreamSession(provider, sync_clients, input_format, audio_source) - await raop_stream_session.start() + stream_session = AirPlayStreamSession(provider, sync_clients, pcm_format, audio_source) + await stream_session.start() async def volume_set(self, volume_level: int) -> None: """Send VOLUME_SET command to given player.""" - if self.raop_stream and self.raop_stream.running: - await self.raop_stream.send_cli_command(f"VOLUME={volume_level}\n") + if self.stream and self.stream.running: + await self.stream.send_cli_command(f"VOLUME={volume_level}\n") self._attr_volume_level = volume_level self.update_state() # store last state in cache @@ -317,22 +559,22 @@ class AirPlayPlayer(Player): # nothing to do return - raop_session = self.raop_stream.session if self.raop_stream else None + stream_session = self.stream.session if self.stream else None # handle removals first if player_ids_to_remove: if self.player_id in player_ids_to_remove: # dissolve the entire sync group - if self.raop_stream and self.raop_stream.running: + if self.stream and self.stream.running: # stop the stream session if it is running - await self.raop_stream.session.stop() + await self.stream.session.stop() self._attr_group_members = [] self.update_state() return for child_player in self._get_sync_clients(): if child_player.player_id in player_ids_to_remove: - if raop_session: - await raop_session.remove_client(child_player) + if stream_session: + await stream_session.remove_client(child_player) self._attr_group_members.remove(child_player.player_id) # handle additions @@ -354,16 +596,16 @@ class AirPlayPlayer(Player): "AirPlayPlayer | None", self.mass.players.get(player_id) ): if ( - child_player_to_add.raop_stream - and child_player_to_add.raop_stream.running - and child_player_to_add.raop_stream.session != raop_session + child_player_to_add.stream + and child_player_to_add.stream.running + and child_player_to_add.stream.session != stream_session ): - await child_player_to_add.raop_stream.session.remove_client(child_player_to_add) + await child_player_to_add.stream.session.remove_client(child_player_to_add) - # add new child to the existing raop session (if any) + # add new child to the existing stream (RAOP or AirPlay2) session (if any) self._attr_group_members.append(player_id) - if raop_session: - await raop_session.add_client(child_player_to_add) + if stream_session: + await stream_session.add_client(child_player_to_add) # always update the state after modifying group members self.update_state() @@ -371,7 +613,7 @@ class AirPlayPlayer(Player): def update_volume_from_device(self, volume: int) -> None: """Update volume from device feedback.""" ignore_volume_report = ( - self.mass.config.get_raw_player_config_value(self.player_id, CONF_IGNORE_VOLUME, False) + self.config.get_value(CONF_IGNORE_VOLUME) or self.device_info.manufacturer.lower() == "apple" ) @@ -388,7 +630,12 @@ class AirPlayPlayer(Player): def set_discovery_info(self, discovery_info: AsyncServiceInfo, display_name: str) -> None: """Set/update the discovery info for the player.""" self._attr_name = display_name - self.discovery_info = discovery_info + if discovery_info.type == AIRPLAY_DISCOVERY_TYPE: + self.airplay_discovery_info = discovery_info + elif discovery_info.type == RAOP_DISCOVERY_TYPE: + self.raop_discovery_info = discovery_info + else: # guard + return cur_address = self.address new_address = get_primary_ip_address_from_zeroconf(discovery_info) if new_address is None: @@ -400,10 +647,10 @@ class AirPlayPlayer(Player): self._attr_device_info.ip_address = new_address self.update_state() - def set_state_from_raop( + def set_state_from_stream( self, state: PlaybackState | None = None, elapsed_time: float | None = None ) -> None: - """Set the playback state from RAOP.""" + """Set the playback state from stream (RAOP or AirPlay2).""" if state is not None: self._attr_playback_state = state if elapsed_time is not None: @@ -414,11 +661,11 @@ class AirPlayPlayer(Player): async def on_unload(self) -> None: """Handle logic when the player is unloaded from the Player controller.""" await super().on_unload() - if self.raop_stream: + if self.stream: # stop the stream session if it is running - if self.raop_stream.running: - self.mass.create_task(self.raop_stream.session.stop()) - self.raop_stream = None + if self.stream.running: + self.mass.create_task(self.stream.session.stop()) + self.stream = None def _get_sync_clients(self) -> list[AirPlayPlayer]: """Get all sync clients for a player.""" diff --git a/music_assistant/providers/airplay/protocols/__init__.py b/music_assistant/providers/airplay/protocols/__init__.py new file mode 100644 index 00000000..459329e4 --- /dev/null +++ b/music_assistant/providers/airplay/protocols/__init__.py @@ -0,0 +1 @@ +"""AirPlay (streaming) Protocols.""" diff --git a/music_assistant/providers/airplay/protocols/_protocol.py b/music_assistant/providers/airplay/protocols/_protocol.py new file mode 100644 index 00000000..52a5f7d1 --- /dev/null +++ b/music_assistant/providers/airplay/protocols/_protocol.py @@ -0,0 +1,218 @@ +"""Base protocol class for AirPlay streaming implementations.""" + +from __future__ import annotations + +import asyncio +import os +import time +from abc import ABC, abstractmethod +from contextlib import suppress +from random import randint +from typing import TYPE_CHECKING + +from music_assistant_models.enums import ContentType, PlaybackState +from music_assistant_models.media_items import AudioFormat + +from music_assistant.constants import VERBOSE_LOG_LEVEL +from music_assistant.helpers.namedpipe import AsyncNamedPipeWriter + +if TYPE_CHECKING: + from music_assistant_models.player import PlayerMedia + + from music_assistant.helpers.process import AsyncProcess + from music_assistant.providers.airplay.player import AirPlayPlayer + from music_assistant.providers.airplay.stream_session import AirPlayStreamSession + + +class AirPlayProtocol(ABC): + """Base class for AirPlay streaming protocols (RAOP and AirPlay2). + + This class contains common logic shared between protocol implementations, + with abstract methods for protocol-specific behavior. + """ + + # the pcm audio format used for streaming to this protocol + pcm_format = AudioFormat( + content_type=ContentType.PCM_S16LE, sample_rate=44100, bit_depth=16, channels=2 + ) + + def __init__( + self, + session: AirPlayStreamSession, + player: AirPlayPlayer, + ) -> None: + """Initialize base AirPlay protocol. + + Args: + session: The stream session managing this protocol instance + player: The player to stream to + """ + self.session = session + self.prov = session.prov + self.mass = session.prov.mass + self.player = player + # Generate unique ID to prevent race conditions with named pipes + self.active_remote_id: str = str(randint(1000, 8000)) + self.prevent_playback: bool = False + self._cli_proc: AsyncProcess | None = None + # State tracking + self._started = asyncio.Event() + self._stopped = False + self._total_bytes_sent = 0 + self._stream_bytes_sent = 0 + self.audio_named_pipe = ( + f"/tmp/{player.protocol.value}-{self.player.player_id}-{self.active_remote_id}-audio" # noqa: S108 + ) + self.commands_named_pipe = ( + f"/tmp/{player.protocol.value}-{self.player.player_id}-{self.active_remote_id}-cmd" # noqa: S108 + ) + # Async named pipe writers (kept open for session duration) + self._audio_pipe: AsyncNamedPipeWriter | None = None + self._commands_pipe: AsyncNamedPipeWriter | None = None + + @property + def running(self) -> bool: + """Return boolean if this stream is running.""" + return ( + not self._stopped + and self._started.is_set() + and self._cli_proc is not None + and not self._cli_proc.closed + ) + + @abstractmethod + async def get_ntp(self) -> int: + """Get current NTP timestamp from the CLI binary.""" + + @abstractmethod + async def start(self, start_ntp: int, skip: int = 0) -> None: + """Initialize streaming process for the player. + + Args: + start_ntp: NTP timestamp to start streaming + skip: Number of seconds to skip (for late joiners) + """ + + async def _open_pipes(self) -> None: + """Open both named pipes in non-blocking mode for async I/O.""" + # Open audio pipe with buffer size optimization + self._audio_pipe = AsyncNamedPipeWriter(self.audio_named_pipe, logger=self.player.logger) + await self._audio_pipe.open(increase_buffer=True) + + # Open command pipe (no need to increase buffer for small commands) + self._commands_pipe = AsyncNamedPipeWriter( + self.commands_named_pipe, logger=self.player.logger + ) + await self._commands_pipe.open(increase_buffer=False) + + self.player.logger.debug("Named pipes opened in non-blocking mode for streaming session") + + async def stop(self) -> None: + """Stop playback and cleanup.""" + # Send stop command before setting _stopped flag + await self.send_cli_command("ACTION=STOP") + + self._stopped = True + + # Close named pipes (sends EOF to C side, triggering graceful shutdown) + if self._audio_pipe is not None: + await self._audio_pipe.close() + self._audio_pipe = None + + if self._commands_pipe is not None: + await self._commands_pipe.close() + self._commands_pipe = None + + # Close the CLI process (wait for it to terminate) + if self._cli_proc and not self._cli_proc.closed: + await self._cli_proc.close(True) + + self.player.set_state_from_stream(state=PlaybackState.IDLE, elapsed_time=0) + + # Remove named pipes from filesystem + with suppress(Exception): + await asyncio.to_thread(os.remove, self.audio_named_pipe) + with suppress(Exception): + await asyncio.to_thread(os.remove, self.commands_named_pipe) + + async def write_chunk(self, chunk: bytes) -> None: + """ + Write a (pcm) audio chunk to the stream. + + Writes one second worth of audio data based on the pcm format. + Uses non-blocking I/O with asyncio event loop (no thread pool consumption). + """ + if self._audio_pipe is None or not self._audio_pipe.is_open: + return + + pipe_write_start = time.time() + + try: + await self._audio_pipe.write(chunk) + except TimeoutError as e: + # Re-raise with player context + raise TimeoutError(f"Player {self.player.player_id}: {e}") from e + + pipe_write_elapsed = time.time() - pipe_write_start + + # Log only truly abnormal pipe writes (>5s indicates a real stall) + # Normal writes take ~1s due to pipe rate-limiting to playback speed + # Can take up to ~4s if player's latency buffer is full + if pipe_write_elapsed > 5.0: + self.player.logger.error( + "!!! STALLED PIPE WRITE: Player %s took %.3fs to write %d bytes to pipe", + self.player.player_id, + pipe_write_elapsed, + len(chunk), + ) + + async def write_eof(self) -> None: + """Write EOF to signal end of stream.""" + # default implementation simply closes the named pipe + # can be overridden with protocol specific implementation if needed + if self._audio_pipe is not None: + await self._audio_pipe.close() + self._audio_pipe = None + + async def send_cli_command(self, command: str) -> None: + """Send an interactive command to the running CLI binary using non-blocking I/O.""" + if self._stopped or not self._cli_proc or self._cli_proc.closed: + return + if self._commands_pipe is None or not self._commands_pipe.is_open: + return + + await self._started.wait() + + if not command.endswith("\n"): + command += "\n" + + self.player.logger.log(VERBOSE_LOG_LEVEL, "sending command %s", command) + self.player.last_command_sent = time.time() + + # Write command to pipe + data = command.encode("utf-8") + + with suppress(BrokenPipeError): + try: + # Use shorter timeout for commands (1 second per wait iteration) + await self._commands_pipe.write(data, timeout_per_wait=1.0) + except TimeoutError: + self.player.logger.warning("Command pipe write timeout for %s", command.strip()) + + async def send_metadata(self, progress: int | None, metadata: PlayerMedia | None) -> None: + """Send metadata to player.""" + if self._stopped: + return + if metadata: + duration = min(metadata.duration or 0, 3600) + title = metadata.title or "" + artist = metadata.artist or "" + album = metadata.album or "" + cmd = f"TITLE={title}\nARTIST={artist}\nALBUM={album}\n" + cmd += f"DURATION={duration}\nPROGRESS=0\nACTION=SENDMETA\n" + await self.send_cli_command(cmd) + # get image + if metadata.image_url: + await self.send_cli_command(f"ARTWORK={metadata.image_url}\n") + if progress is not None: + await self.send_cli_command(f"PROGRESS={progress}\n") diff --git a/music_assistant/providers/airplay/protocols/airplay2.py b/music_assistant/providers/airplay/protocols/airplay2.py new file mode 100644 index 00000000..1e670702 --- /dev/null +++ b/music_assistant/providers/airplay/protocols/airplay2.py @@ -0,0 +1,193 @@ +"""Logic for AirPlay 2 audio streaming to AirPlay devices.""" + +from __future__ import annotations + +import asyncio +import logging +import os +import platform + +from music_assistant_models.enums import PlaybackState +from music_assistant_models.errors import PlayerCommandFailed + +from music_assistant.constants import CONF_SYNC_ADJUST, VERBOSE_LOG_LEVEL +from music_assistant.helpers.process import AsyncProcess +from music_assistant.providers.airplay.constants import ( + AIRPLAY2_MIN_LOG_LEVEL, + CONF_READ_AHEAD_BUFFER, +) +from music_assistant.providers.airplay.helpers import get_cli_binary, get_ntp_timestamp + +from ._protocol import AirPlayProtocol + + +class AirPlay2Stream(AirPlayProtocol): + """ + AirPlay 2 Audio Streamer. + + Python is not suitable for realtime audio streaming so we do the actual streaming + of audio using a small executable written in C based on owntones to do + the actual timestamped playback. It reads pcm audio from a named pipe + and we can send some interactive commands using another named pipe. + """ + + _stderr_reader_task: asyncio.Task[None] | None = None + + async def get_ntp(self) -> int: + """Get current NTP timestamp.""" + # TODO! + return get_ntp_timestamp() + + @property + def _cli_loglevel(self) -> int: + """ + Return a cliap2 aligned loglevel. + + Ensures that minimum level required for required cliap2 stderr output is respected. + """ + force_verbose: bool = False # just for now + mass_level: int = 0 + match self.prov.logger.level: + case logging.CRITICAL: + mass_level = 0 + case logging.ERROR: + mass_level = 1 + case logging.WARNING: + mass_level = 2 + case logging.INFO: + mass_level = 3 + case logging.DEBUG: + mass_level = 4 + if self.prov.logger.isEnabledFor(VERBOSE_LOG_LEVEL): + mass_level = 5 + if force_verbose: + mass_level = 5 # always use max log level for now to capture all stderr output + return max(mass_level, AIRPLAY2_MIN_LOG_LEVEL) + + async def start(self, start_ntp: int, skip: int = 0) -> None: + """Initialize CLI process for a player.""" + cli_binary = await get_cli_binary(self.player.protocol) + assert self.player.discovery_info is not None + + player_id = self.player.player_id + sync_adjust = self.mass.config.get_raw_player_config_value(player_id, CONF_SYNC_ADJUST, 0) + assert isinstance(sync_adjust, int) + read_ahead = await self.mass.config.get_player_config_value( + player_id, CONF_READ_AHEAD_BUFFER + ) + + txt_kv: str = "" + for key, value in self.player.discovery_info.decoded_properties.items(): + txt_kv += f'"{key}={value}" ' + + # Note: skip parameter is accepted for API compatibility with base class + # but is not currently used by the cliap2 binary (AirPlay2 handles late joiners differently) + + # cliap2 is the binary that handles the actual streaming to the player + # this binary leverages from the AirPlay2 support in owntones + # https://github.com/music-assistant/cliairplay + cli_args = [ + cli_binary, + "--config", + os.path.join(os.path.dirname(__file__), "bin", "cliap2.conf"), + "--name", + self.player.display_name, + "--hostname", + str(self.player.discovery_info.server), + "--address", + str(self.player.address), + "--port", + str(self.player.discovery_info.port), + "--txt", + txt_kv, + "--ntpstart", + str(start_ntp), + "--latency", + str(read_ahead), + "--volume", + str(self.player.volume_level), + "--loglevel", + str(self._cli_loglevel), + "--pipe", + self.audio_named_pipe, + ] + self.player.logger.debug( + "Starting cliap2 process for player %s with args: %s", + player_id, + cli_args, + ) + self._cli_proc = AsyncProcess(cli_args, stdin=True, stderr=True, name="cliap2") + if platform.system() == "Darwin": + os.environ["DYLD_LIBRARY_PATH"] = "/usr/local/lib" + await self._cli_proc.start() + # read up to first num_lines lines of stderr to get the initial status + num_lines: int = 50 + if self.prov.logger.level > logging.INFO: + num_lines *= 10 + for _ in range(num_lines): + line = (await self._cli_proc.read_stderr()).decode("utf-8", errors="ignore") + self.player.logger.debug(line) + if f"airplay: Adding AirPlay device '{self.player.display_name}'" in line: + self.player.logger.info("AirPlay device connected. Starting playback.") + self._started.set() + # Open pipes now that cliraop is ready + await self._open_pipes() + break + if f"The AirPlay 2 device '{self.player.display_name}' failed" in line: + raise PlayerCommandFailed("Cannot connect to AirPlay device") + # start reading the stderr of the cliap2 process from another task + self._stderr_reader_task = self.mass.create_task(self._stderr_reader()) + + async def _stderr_reader(self) -> None: + """Monitor stderr for the running CLIap2 process.""" + player = self.player + queue = self.mass.players.get_active_queue(player) + logger = player.logger + lost_packets = 0 + if not self._cli_proc: + return + async for line in self._cli_proc.iter_stderr(): + # TODO @bradkeifer make cliap2 work this way + if "elapsed milliseconds:" in line: + # this is received more or less every second while playing + # millis = int(line.split("elapsed milliseconds: ")[1]) + # self.player.elapsed_time = (millis / 1000) - self.elapsed_time_correction + # self.player.elapsed_time_last_updated = time.time() + # NOTE: Metadata is now handled at the session level + pass + if "set pause" in line or "Pause at" in line: + player.set_state_from_stream(state=PlaybackState.PAUSED) + if "Restarted at" in line or "restarting w/ pause" in line: + player.set_state_from_stream(state=PlaybackState.PLAYING) + if "restarting w/o pause" in line: + # streaming has started + player.set_state_from_stream(state=PlaybackState.PLAYING, elapsed_time=0) + if "lost packet out of backlog" in line: + lost_packets += 1 + if lost_packets == 100 and queue: + logger.error("High packet loss detected, restarting playback...") + self.mass.create_task(self.mass.player_queues.resume(queue.queue_id, False)) + else: + logger.warning("Packet loss detected!") + if "end of stream reached" in line: + logger.debug("End of stream reached") + break + + # log cli stderr output in alignment with mass logging level + if "[FATAL]" in line: + logger.critical(line) + elif "[ LOG]" in line: + logger.error(line) + elif "[ INFO]" in line: + logger.info(line) + elif "[ WARN]" in line: + logger.warning(line) + elif "[DEBUG]" in line: + logger.debug(line) + elif "[ SPAM]" in line: + logger.log(VERBOSE_LOG_LEVEL, line) + else: # for now, log unknown lines as error + logger.error(line) + + # ensure we're cleaned up afterwards (this also logs the returncode) + await self.stop() diff --git a/music_assistant/providers/airplay/protocols/raop.py b/music_assistant/providers/airplay/protocols/raop.py new file mode 100644 index 00000000..805a6d22 --- /dev/null +++ b/music_assistant/providers/airplay/protocols/raop.py @@ -0,0 +1,200 @@ +"""Logic for RAOP audio streaming to AirPlay devices.""" + +from __future__ import annotations + +import asyncio +import logging + +from music_assistant_models.enums import PlaybackState +from music_assistant_models.errors import PlayerCommandFailed + +from music_assistant.constants import CONF_SYNC_ADJUST, VERBOSE_LOG_LEVEL +from music_assistant.helpers.process import AsyncProcess, check_output +from music_assistant.providers.airplay.constants import ( + CONF_ALAC_ENCODE, + CONF_ENCRYPTION, + CONF_PASSWORD, + CONF_READ_AHEAD_BUFFER, +) +from music_assistant.providers.airplay.helpers import get_cli_binary + +from ._protocol import AirPlayProtocol + + +class RaopStream(AirPlayProtocol): + """ + RAOP (AirPlay 1) Audio Streamer. + + Python is not suitable for realtime audio streaming so we do the actual streaming + of (RAOP) audio using a small executable written in C based on libraop to do + the actual timestamped playback, which reads pcm audio from stdin + and we can send some interactive commands using a named pipe. + """ + + _stderr_reader_task: asyncio.Task[None] | None = None + + @property + def running(self) -> bool: + """Return boolean if this stream is running.""" + return ( + not self._stopped + and self._started.is_set() + and self._cli_proc is not None + and not self._cli_proc.closed + ) + + async def get_ntp(self) -> int: + """Get current NTP timestamp from the CLI binary.""" + cli_binary = await get_cli_binary(self.player.protocol) + # TODO: we can potentially also just generate this ourselves? + self.prov.logger.debug("Getting NTP timestamp from %s CLI binary", self.player.protocol) + _, stdout = await check_output(cli_binary, "-ntp") + self.prov.logger.debug(f"Output from ntp check: {stdout.decode().strip()}") + return int(stdout.strip()) + + async def start(self, start_ntp: int, skip: int = 0) -> None: + """Initialize CLIRaop process for a player.""" + assert self.player.discovery_info is not None # for type checker + cli_binary = await get_cli_binary(self.player.protocol) + + extra_args: list[str] = [] + player_id = self.player.player_id + extra_args += ["-if", self.mass.streams.bind_ip] + if self.player.config.get_value(CONF_ENCRYPTION, True): + extra_args += ["-encrypt"] + if self.player.config.get_value(CONF_ALAC_ENCODE, True): + extra_args += ["-alac"] + for prop in ("et", "md", "am", "pk", "pw"): + if prop_value := self.player.discovery_info.decoded_properties.get(prop): + extra_args += [f"-{prop}", prop_value] + if skip > 0: + extra_args += ["-skip", str(skip)] + sync_adjust = self.player.config.get_value(CONF_SYNC_ADJUST, 0) + assert isinstance(sync_adjust, int) + if device_password := self.mass.config.get_raw_player_config_value( + player_id, CONF_PASSWORD, None + ): + extra_args += ["-password", str(device_password)] + # Add AirPlay credentials from pyatv pairing if available (for Apple devices) + # if raop_credentials := self.player.config.get_value(CONF_AP_CREDENTIALS): + # # pyatv AirPlay credentials are in format "identifier:secret_key:other:data" + # # cliraop expects just the secret_key (2nd part, 64-char hex string) for -secret + # parts = str(raop_credentials).split(":") + # if len(parts) >= 2: + # # Take the second part (index 1) as the secret key + # secret_key = parts[1] + # self.prov.logger.debug( + # "Using AirPlay credentials for %s: id=%s, secret_len=%d, parts=%d", + # self.player.player_id, + # parts[0], + # len(secret_key), + # len(parts), + # ) + # extra_args += ["-secret", secret_key] + # else: + # # Fallback: assume it's already just the key + # self.prov.logger.debug( + # "Using AirPlay credentials for %s: single value, length=%d", + # self.player.player_id, + # len(str(raop_credentials)), + # ) + # extra_args += ["-secret", str(raop_credentials)] + if self.prov.logger.isEnabledFor(logging.DEBUG): + extra_args += ["-debug", "5"] + elif self.prov.logger.isEnabledFor(VERBOSE_LOG_LEVEL): + extra_args += ["-debug", "10"] + read_ahead = await self.mass.config.get_player_config_value( + player_id, CONF_READ_AHEAD_BUFFER + ) + self.player.logger.info("Starting cliraop with latency buffer: %dms", read_ahead) + + # cliraop is the binary that handles the actual raop streaming to the player + # this is a slightly modified version of philippe44's libraop + # https://github.com/music-assistant/libraop + # we use this intermediate binary to do the actual streaming because attempts to do + # so using pure python (e.g. pyatv) were not successful due to the realtime nature + cliraop_args = [ + cli_binary, + "-ntpstart", + str(start_ntp), + "-port", + str(self.player.discovery_info.port), + "-latency", + str(read_ahead), + "-volume", + str(self.player.volume_level), + *extra_args, + "-dacp", + self.prov.dacp_id, + "-activeremote", + self.active_remote_id, + "-cmdpipe", + self.commands_named_pipe, + "-udn", + self.player.discovery_info.name, + self.player.address, + self.audio_named_pipe, + ] + self.player.logger.debug( + "Starting cliraop process for player %s with args: %s", + self.player.player_id, + cliraop_args, + ) + self._cli_proc = AsyncProcess(cliraop_args, stdin=False, stderr=True, name="cliraop") + await self._cli_proc.start() + # read up to first 50 lines of stderr to get the initial status + for _ in range(50): + line = (await self._cli_proc.read_stderr()).decode("utf-8", errors="ignore") + self.player.logger.debug(line) + if "connected to " in line: + self.player.logger.info("AirPlay device connected. Starting playback.") + self._started.set() + # Open pipes now that cliraop is ready + await self._open_pipes() + break + if "Cannot connect to AirPlay device" in line: + raise PlayerCommandFailed("Cannot connect to AirPlay device") + # repeat sending the volume level to the player because some players seem + # to ignore it the first time + # https://github.com/music-assistant/support/issues/3330 + await self.send_cli_command(f"VOLUME={self.player.volume_level}\n") + # start reading the stderr of the cliraop process from another task + self._stderr_reader_task = self.mass.create_task(self._stderr_reader()) + + async def _stderr_reader(self) -> None: + """Monitor stderr for the running CLIRaop process.""" + player = self.player + logger = player.logger + lost_packets = 0 + if not self._cli_proc: + return + async for line in self._cli_proc.iter_stderr(): + if "elapsed milliseconds:" in line: + # this is received more or less every second while playing + # millis = int(line.split("elapsed milliseconds: ")[1]) + # self.player.elapsed_time = (millis / 1000) - self.elapsed_time_correction + # self.player.elapsed_time_last_updated = time.time() + logger.log(VERBOSE_LOG_LEVEL, line) + continue + if "set pause" in line or "Pause at" in line: + player.set_state_from_stream(state=PlaybackState.PAUSED) + if "Restarted at" in line or "restarting w/ pause" in line: + player.set_state_from_stream(state=PlaybackState.PLAYING) + if "restarting w/o pause" in line: + # streaming has started + player.set_state_from_stream(state=PlaybackState.PLAYING, elapsed_time=0) + if "lost packet out of backlog" in line: + lost_packets += 1 + if lost_packets == 100: + logger.error("High packet loss detected, restarting playback...") + self.mass.create_task(self.mass.players.cmd_resume(self.player.player_id)) + else: + logger.warning("Packet loss detected!") + if "end of stream reached" in line: + logger.debug("End of stream reached") + break + logger.debug(line) + + # ensure we're cleaned up afterwards (this also logs the returncode) + logger.debug("CLIRaop stderr reader ended") + await self.stop() diff --git a/music_assistant/providers/airplay/provider.py b/music_assistant/providers/airplay/provider.py index feaa9397..73cdafa3 100644 --- a/music_assistant/providers/airplay/provider.py +++ b/music_assistant/providers/airplay/provider.py @@ -12,16 +12,20 @@ from zeroconf import ServiceStateChange from zeroconf.asyncio import AsyncServiceInfo from music_assistant.helpers.datetime import utc -from music_assistant.helpers.util import get_ip_pton, select_free_port +from music_assistant.helpers.util import ( + get_ip_pton, + get_primary_ip_address_from_zeroconf, + select_free_port, +) from music_assistant.models.player_provider import PlayerProvider -from .constants import CACHE_CATEGORY_PREV_VOLUME, CONF_IGNORE_VOLUME, FALLBACK_VOLUME -from .helpers import ( - convert_airplay_volume, - get_cliraop_binary, - get_model_info, - get_primary_ip_address_from_zeroconf, +from .constants import ( + CACHE_CATEGORY_PREV_VOLUME, + CONF_IGNORE_VOLUME, + DACP_DISCOVERY_TYPE, + FALLBACK_VOLUME, ) +from .helpers import convert_airplay_volume, get_model_info from .player import AirPlayPlayer # TODO: AirPlay provider @@ -36,14 +40,11 @@ from .player import AirPlayPlayer class AirPlayProvider(PlayerProvider): """Player provider for AirPlay based players.""" - cliraop_bin: str | None _dacp_server: asyncio.Server _dacp_info: AsyncServiceInfo async def handle_async_init(self) -> None: """Handle async initialization of the provider.""" - # we locate the cliraop binary here, so we can fail early if it is not available - self.cliraop_bin: str | None = await get_cliraop_binary() # register DACP zeroconf service dacp_port = await select_free_port(39831, 49831) self.dacp_id = dacp_id = f"{randrange(2**64):X}" @@ -51,10 +52,9 @@ class AirPlayProvider(PlayerProvider): self._dacp_server = await asyncio.start_server( self._handle_dacp_request, "0.0.0.0", dacp_port ) - zeroconf_type = "_dacp._tcp.local." - server_id = f"iTunes_Ctrl_{dacp_id}.{zeroconf_type}" + server_id = f"iTunes_Ctrl_{dacp_id}.{DACP_DISCOVERY_TYPE}" self._dacp_info = AsyncServiceInfo( - zeroconf_type, + DACP_DISCOVERY_TYPE, name=server_id, addresses=[await get_ip_pton(str(self.mass.streams.publish_ip))], port=dacp_port, @@ -101,7 +101,6 @@ class AirPlayProvider(PlayerProvider): # update the latest discovery info for existing player player.set_discovery_info(info, display_name) return - # handle new player await self._setup_player(player_id, display_name, info) async def unload(self, is_removed: bool = False) -> None: @@ -117,11 +116,20 @@ class AirPlayProvider(PlayerProvider): self, player_id: str, display_name: str, discovery_info: AsyncServiceInfo ) -> None: """Handle setup of a new player that is discovered using mdns.""" - # prefer airplay mdns info as it has more details - # fallback to raop info if airplay info is not available - airplay_info = AsyncServiceInfo( - "_airplay._tcp.local.", discovery_info.name.split("@")[-1].replace("_raop", "_airplay") - ) + if discovery_info.type == "_raop._tcp.local.": + # RAOP service discovered + self.logger.debug("Discovered RAOP service for %s", display_name) + # always prefer airplay mdns info as it has more details + # fallback to raop info if airplay info is not available, + # (old device only announcing raop) + airplay_info = AsyncServiceInfo( + "_airplay._tcp.local.", + discovery_info.name.split("@")[-1].replace("_raop", "_airplay"), + ) + else: + # AirPlay service discovered + self.logger.debug("Discovered AirPlay service for %s", display_name) + airplay_info = discovery_info if await airplay_info.async_request(self.mass.aiozc.zeroconf, 3000): manufacturer, model = get_model_info(airplay_info) else: @@ -131,27 +139,15 @@ class AirPlayProvider(PlayerProvider): self.logger.debug("Ignoring %s in discovery as it is disabled.", display_name) return - if "apple tv" in model.lower(): - # For now, we ignore the Apple TV until we implement the authentication. - # maybe we can simply use pyatv only for this part? - # the cliraop application has already been prepared to accept the secret. - self.logger.info( - "Ignoring %s in discovery because it is not yet supported.", display_name - ) - return - address = get_primary_ip_address_from_zeroconf(discovery_info) if not address: return # should not happen, but guard just in case + if not discovery_info: + return # should not happen, but guard just in case # if we reach this point, all preflights are ok and we can create the player self.logger.debug("Discovered AirPlay device %s on %s", display_name, address) - # append airplay to the default display name for generic (non-apple) devices - # this makes it easier for users to distinguish between airplay and non-airplay devices - if manufacturer.lower() != "apple" and "airplay" not in display_name.lower(): - display_name += " (AirPlay)" - # Get volume from cache if not ( volume := await self.mass.cache.get( @@ -160,6 +156,21 @@ class AirPlayProvider(PlayerProvider): ): volume = FALLBACK_VOLUME + # Append airplay to the default name for non-apple devices + # to make it easier for users to distinguish + is_apple = manufacturer.lower() == "apple" + if not is_apple and "airplay" not in display_name.lower(): + display_name += " (AirPlay)" + + self.logger.debug( + "Setting up player %s: manufacturer=%s, model=%s", + display_name, + manufacturer, + model, + ) + + # Create single AirPlayPlayer for all devices + # Pairing config entries will be shown conditionally based on device type player = AirPlayPlayer( provider=self, player_id=player_id, @@ -170,6 +181,15 @@ class AirPlayProvider(PlayerProvider): model=model, initial_volume=volume, ) + + # Final check before registration to handle race conditions + # (multiple MDNS events processed in parallel for same device) + if self.mass.players.get(player_id): + self.logger.debug( + "Player %s already registered during setup, skipping registration", player_id + ) + return + await self.mass.players.register(player) async def _handle_dacp_request( # noqa: PLR0915 @@ -204,17 +224,17 @@ class AirPlayProvider(PlayerProvider): active_remote = headers.get("Active-Remote") _, path, _ = headers_split[0].split(" ") # lookup airplay player by active remote id - player = next( + player: AirPlayPlayer | None = next( ( x for x in self.get_players() - if x.raop_stream and x.raop_stream.active_remote_id == active_remote + if x.stream and x.stream.active_remote_id == active_remote ), None, ) self.logger.debug( "DACP request for %s (%s): %s -- %s", - player.discovery_info.name if player else "UNKNOWN PLAYER", + player.name if player else "UNKNOWN PLAYER", active_remote, path, body, @@ -269,8 +289,8 @@ class AirPlayProvider(PlayerProvider): # we've sent or the device requesting a new volume itself. # In case of a small rounding difference, we ignore this, # to prevent an endless pingpong of volume changes - raop_volume = float(path.split("dmcp.device-volume=", 1)[-1]) - volume = convert_airplay_volume(raop_volume) + airplay_volume = float(path.split("dmcp.device-volume=", 1)[-1]) + volume = convert_airplay_volume(airplay_volume) player.update_volume_from_device(volume) elif "dmcp.volume=" in path: # volume change request from device (e.g. volume buttons) @@ -278,13 +298,13 @@ class AirPlayProvider(PlayerProvider): player.update_volume_from_device(volume) elif "device-prevent-playback=1" in path: # device switched to another source (or is powered off) - if raop_stream := player.raop_stream: - raop_stream.prevent_playback = True - self.mass.create_task(player.raop_stream.session.remove_client(player)) + if stream := player.stream: + stream.prevent_playback = True + self.mass.create_task(player.stream.session.remove_client(player)) elif "device-prevent-playback=0" in path: # device reports that its ready for playback again - if raop_stream := player.raop_stream: - raop_stream.prevent_playback = False + if stream := player.stream: + stream.prevent_playback = False # send response date_str = utc().strftime("%a, %-d %b %Y %H:%M:%S") diff --git a/music_assistant/providers/airplay/raop.py b/music_assistant/providers/airplay/raop.py deleted file mode 100644 index 50ae76f4..00000000 --- a/music_assistant/providers/airplay/raop.py +++ /dev/null @@ -1,473 +0,0 @@ -"""Logic for RAOP (AirPlay 1) audio streaming to AirPlay devices.""" - -from __future__ import annotations - -import asyncio -import logging -import time -from collections.abc import AsyncGenerator -from contextlib import suppress -from random import randint -from typing import TYPE_CHECKING - -from music_assistant_models.enums import PlaybackState -from music_assistant_models.errors import PlayerCommandFailed - -from music_assistant.constants import CONF_SYNC_ADJUST, VERBOSE_LOG_LEVEL -from music_assistant.helpers.audio import get_chunksize, get_player_filter_params -from music_assistant.helpers.ffmpeg import FFMpeg -from music_assistant.helpers.process import AsyncProcess, check_output -from music_assistant.helpers.util import TaskManager, close_async_generator - -from .constants import ( - AIRPLAY_PCM_FORMAT, - CONF_ALAC_ENCODE, - CONF_ENCRYPTION, - CONF_PASSWORD, - CONF_READ_AHEAD_BUFFER, -) - -if TYPE_CHECKING: - from music_assistant_models.media_items import AudioFormat - - from .player import AirPlayPlayer - from .provider import AirPlayProvider - - -class RaopStreamSession: - """Object that holds the details of a (RAOP) stream session to one or more players.""" - - def __init__( - self, - airplay_provider: AirPlayProvider, - sync_clients: list[AirPlayPlayer], - input_format: AudioFormat, - audio_source: AsyncGenerator[bytes, None], - ) -> None: - """Initialize RaopStreamSession.""" - assert sync_clients - self.prov = airplay_provider - self.mass = airplay_provider.mass - self.input_format = input_format - self.sync_clients = sync_clients - self._audio_source = audio_source - self._audio_source_task: asyncio.Task[None] | None = None - self._lock = asyncio.Lock() - - async def start(self) -> None: - """Initialize RaopStreamSession.""" - # initialize raop stream for all players - - # get current ntp and start RaopStream per player - assert self.prov.cliraop_bin - _, stdout = await check_output(self.prov.cliraop_bin, "-ntp") - start_ntp = int(stdout.strip()) - wait_start = 1750 + (250 * len(self.sync_clients)) - - async def _start_client(raop_player: AirPlayPlayer) -> None: - # stop existing stream if running - if raop_player.raop_stream and raop_player.raop_stream.running: - await raop_player.raop_stream.stop() - - raop_player.raop_stream = RaopStream(self, raop_player) - await raop_player.raop_stream.start(start_ntp, wait_start) - - async with TaskManager(self.mass) as tm: - for _raop_player in self.sync_clients: - tm.create_task(_start_client(_raop_player)) - self._audio_source_task = asyncio.create_task(self._audio_streamer()) - - async def stop(self) -> None: - """Stop playback and cleanup.""" - if self._audio_source_task and not self._audio_source_task.done(): - self._audio_source_task.cancel() - with suppress(asyncio.CancelledError): - await self._audio_source_task - await asyncio.gather( - *[self.remove_client(x) for x in self.sync_clients], - return_exceptions=True, - ) - - async def remove_client(self, airplay_player: AirPlayPlayer) -> None: - """Remove a sync client from the session.""" - if airplay_player not in self.sync_clients: - return - assert airplay_player.raop_stream - assert airplay_player.raop_stream.session == self - async with self._lock: - self.sync_clients.remove(airplay_player) - await airplay_player.raop_stream.stop() - airplay_player.raop_stream = None - # if this was the last client, stop the session - if not self.sync_clients: - await self.stop() - return - - async def add_client(self, airplay_player: AirPlayPlayer) -> None: - """Add a sync client to the session.""" - # TODO: Add the ability to add a new client to an existing session - # e.g. by counting the number of frames sent etc. - - # temp solution: just restart the whole playback session when new client(s) join - sync_leader = self.sync_clients[0] - if not sync_leader.raop_stream or not sync_leader.raop_stream.running: - return - - await self.stop() # we need to stop the current session to add a new client - # this could potentially be called by multiple players at the exact same time - # so we debounce the resync a bit here with a timer - if sync_leader.current_media: - self.mass.call_later( - 0.5, - self.mass.players.cmd_resume(sync_leader.player_id), - task_id=f"resync_session_{sync_leader.player_id}", - ) - - async def replace_stream(self, audio_source: AsyncGenerator[bytes, None]) -> None: - """Replace the audio source of the stream.""" - # cancel the current audio source task - assert self._audio_source_task # for type checker - self._audio_source_task.cancel() - with suppress(asyncio.CancelledError, RuntimeError): - await self._audio_source_task - # set new audio source and restart the stream - self._audio_source = audio_source - self._audio_source_task = asyncio.create_task(self._audio_streamer()) - # restart the (player-specific) ffmpeg stream for all players - # this is the easiest way to ensure the new audio source is used - # as quickly as possible, without waiting for the buffers to be drained - # it also allows to change the player settings such as DSP on the fly - for sync_client in self.sync_clients: - if not sync_client.raop_stream: - continue # guard - sync_client.raop_stream.start_ffmpeg_stream() - - async def _audio_streamer(self) -> None: - """Stream audio to all players.""" - generator_exhausted = False - try: - async for chunk in self._audio_source: - async with self._lock: - sync_clients = [ - x for x in self.sync_clients if x.raop_stream and x.raop_stream.running - ] - if not sync_clients: - return - await asyncio.gather( - *[x.raop_stream.write_chunk(chunk) for x in sync_clients if x.raop_stream], - return_exceptions=True, - ) - # entire stream consumed: send EOF - generator_exhausted = True - async with self._lock: - await asyncio.gather( - *[ - x.raop_stream.write_eof() - for x in self.sync_clients - if x.raop_stream and x.raop_stream.running - ], - return_exceptions=True, - ) - except Exception as err: - logger = self.prov.logger - logger.error( - "Stream error: %s", - str(err) or err.__class__.__name__, - exc_info=err if logger.isEnabledFor(logging.DEBUG) else None, - ) - raise - finally: - if not generator_exhausted: - await close_async_generator(self._audio_source) - - -class RaopStream: - """ - RAOP (AirPlay 1) Audio Streamer. - - Python is not suitable for realtime audio streaming so we do the actual streaming - of (RAOP) audio using a small executable written in C based on libraop to do - the actual timestamped playback, which reads pcm audio from stdin - and we can send some interactive commands using a named pipe. - """ - - def __init__( - self, - session: RaopStreamSession, - player: AirPlayPlayer, - ) -> None: - """Initialize RaopStream.""" - self.session = session - self.prov = session.prov - self.mass = session.prov.mass - self.player = player - - # always generate a new active remote id to prevent race conditions - # with the named pipe used to send audio - self.active_remote_id: str = str(randint(1000, 8000)) - self.prevent_playback: bool = False - self._stderr_reader_task: asyncio.Task[None] | None = None - self._cliraop_proc: AsyncProcess | None = None - self._ffmpeg_proc: AsyncProcess | None = None - self._ffmpeg_reader_task: asyncio.Task[None] | None = None - self._started = asyncio.Event() - self._stopped = False - self._total_bytes_sent = 0 - self._stream_bytes_sent = 0 - - @property - def running(self) -> bool: - """Return boolean if this stream is running.""" - return ( - not self._stopped - and self._started.is_set() - and self._cliraop_proc is not None - and not self._cliraop_proc.closed - ) - - async def start(self, start_ntp: int, wait_start: int = 1000) -> None: - """Initialize CLIRaop process for a player.""" - assert self.prov.cliraop_bin - extra_args: list[str] = [] - player_id = self.player.player_id - extra_args += ["-if", self.mass.streams.bind_ip] - if self.mass.config.get_raw_player_config_value(player_id, CONF_ENCRYPTION, False): - extra_args += ["-encrypt"] - if self.mass.config.get_raw_player_config_value(player_id, CONF_ALAC_ENCODE, True): - extra_args += ["-alac"] - for prop in ("et", "md", "am", "pk", "pw"): - if prop_value := self.player.discovery_info.decoded_properties.get(prop): - extra_args += [f"-{prop}", prop_value] - sync_adjust = self.mass.config.get_raw_player_config_value(player_id, CONF_SYNC_ADJUST, 0) - assert isinstance(sync_adjust, int) - if device_password := self.mass.config.get_raw_player_config_value( - player_id, CONF_PASSWORD, None - ): - extra_args += ["-password", str(device_password)] - if self.prov.logger.isEnabledFor(logging.DEBUG): - extra_args += ["-debug", "5"] - elif self.prov.logger.isEnabledFor(VERBOSE_LOG_LEVEL): - extra_args += ["-debug", "10"] - read_ahead = await self.mass.config.get_player_config_value( - player_id, CONF_READ_AHEAD_BUFFER - ) - # ffmpeg handles the player specific stream + filters and pipes - # audio to the cliraop process - self.start_ffmpeg_stream() - - # cliraop is the binary that handles the actual raop streaming to the player - # this is a slightly modified version of philippe44's libraop - # https://github.com/music-assistant/libraop - # we use this intermediate binary to do the actual streaming because attempts to do - # so using pure python (e.g. pyatv) were not successful due to the realtime nature - # TODO: Either enhance libraop with airplay 2 support or find a better alternative - cliraop_args = [ - self.prov.cliraop_bin, - "-ntpstart", - str(start_ntp), - "-port", - str(self.player.discovery_info.port), - "-wait", - str(wait_start - sync_adjust), - "-latency", - str(read_ahead), - "-volume", - str(self.player.volume_level), - *extra_args, - "-dacp", - self.prov.dacp_id, - "-activeremote", - self.active_remote_id, - "-udn", - self.player.discovery_info.name, - self.player.address, - "-", - ] - self._cliraop_proc = AsyncProcess(cliraop_args, stdin=True, stderr=True, name="cliraop") - await self._cliraop_proc.start() - # read first 20 lines of stderr to get the initial status - for _ in range(20): - line = (await self._cliraop_proc.read_stderr()).decode("utf-8", errors="ignore") - self.player.logger.debug(line) - if "connected to " in line: - self._started.set() - break - if "Cannot connect to AirPlay device" in line: - if self._ffmpeg_reader_task: - self._ffmpeg_reader_task.cancel() - raise PlayerCommandFailed("Cannot connect to AirPlay device") - # repeat sending the volume level to the player because some players seem - # to ignore it the first time - # https://github.com/music-assistant/support/issues/3330 - await self.send_cli_command(f"VOLUME={self.player.volume_level}\n") - # start reading the stderr of the cliraop process from another task - self._stderr_reader_task = self.mass.create_task(self._stderr_reader()) - - async def stop(self) -> None: - """Stop playback and cleanup.""" - await self.send_cli_command("ACTION=STOP") - self._stopped = True - if self._stderr_reader_task and not self._stderr_reader_task.done(): - self._stderr_reader_task.cancel() - if self._ffmpeg_reader_task and not self._ffmpeg_reader_task.done(): - self._ffmpeg_reader_task.cancel() - if self._cliraop_proc and not self._cliraop_proc.closed: - await self._cliraop_proc.close(True) - if self._ffmpeg_proc and not self._ffmpeg_proc.closed: - await self._ffmpeg_proc.close(True) - self.player.set_state_from_raop(state=PlaybackState.IDLE, elapsed_time=0) - - async def write_chunk(self, chunk: bytes) -> None: - """Write a (pcm) audio chunk.""" - if self._stopped: - raise RuntimeError("Stream is already stopped") - await self._started.wait() - assert self._ffmpeg_proc - await self._ffmpeg_proc.write(chunk) - - async def write_eof(self) -> None: - """Write EOF.""" - if self._stopped: - raise RuntimeError("Stream is already stopped") - await self._started.wait() - assert self._ffmpeg_proc - await self._ffmpeg_proc.write_eof() - - async def send_cli_command(self, command: str) -> None: - """Send an interactive command to the running CLIRaop binary.""" - if self._stopped or not self._cliraop_proc or self._cliraop_proc.closed: - return - await self._started.wait() - - if not command.endswith("\n"): - command += "\n" - - def send_data() -> None: - with suppress(BrokenPipeError), open(named_pipe, "w") as f: - f.write(command) - - named_pipe = f"/tmp/raop-{self.active_remote_id}" # noqa: S108 - self.player.logger.log(VERBOSE_LOG_LEVEL, "sending command %s", command) - self.player.last_command_sent = time.time() - await asyncio.to_thread(send_data) - - def start_ffmpeg_stream(self) -> None: - """Start (or replace) the player-specific ffmpeg stream to feed cliraop.""" - # cancel existing ffmpeg reader task - if self._ffmpeg_reader_task and not self._ffmpeg_reader_task.done(): - self._ffmpeg_reader_task.cancel() - if self._ffmpeg_proc and not self._ffmpeg_proc.closed: - self.mass.create_task(self._ffmpeg_proc.close(True)) - # start new ffmpeg reader task - self._ffmpeg_reader_task = self.mass.create_task(self._ffmpeg_reader()) - - async def _ffmpeg_reader(self) -> None: - """Read audio from the audio source and pipe it to the CLIRaop process.""" - self._ffmpeg_proc = FFMpeg( - audio_input="-", - input_format=self.session.input_format, - output_format=AIRPLAY_PCM_FORMAT, - filter_params=get_player_filter_params( - self.mass, - self.player.player_id, - self.session.input_format, - AIRPLAY_PCM_FORMAT, - ), - ) - self._stream_bytes_sent = 0 - await self._ffmpeg_proc.start() - chunksize = get_chunksize(AIRPLAY_PCM_FORMAT) - # wait for cliraop to be ready - await asyncio.wait_for(self._started.wait(), 20) - async for chunk in self._ffmpeg_proc.iter_chunked(chunksize): - if self._stopped: - break - if not self._cliraop_proc or self._cliraop_proc.closed: - break - await self._cliraop_proc.write(chunk) - self._stream_bytes_sent += len(chunk) - self._total_bytes_sent += len(chunk) - del chunk - # we base elapsed time on the amount of bytes sent - # so we can account for reusing the same session for multiple streams - self.player.set_state_from_raop( - elapsed_time=self._stream_bytes_sent / chunksize, - ) - # if we reach this point, the process exited, most likely because the stream ended - if self._cliraop_proc and not self._cliraop_proc.closed: - await self._cliraop_proc.write_eof() - - async def _stderr_reader(self) -> None: - """Monitor stderr for the running CLIRaop process.""" - player = self.player - logger = player.logger - lost_packets = 0 - prev_metadata_checksum: str = "" - prev_progress_report: float = 0 - if not self._cliraop_proc: - return - async for line in self._cliraop_proc.iter_stderr(): - if "elapsed milliseconds:" in line: - # this is received more or less every second while playing - # millis = int(line.split("elapsed milliseconds: ")[1]) - # self.player.elapsed_time = (millis / 1000) - self.elapsed_time_correction - # self.player.elapsed_time_last_updated = time.time() - # send metadata to player(s) if needed - # NOTE: this must all be done in separate tasks to not disturb audio - now = time.time() - if (player.elapsed_time or 0) > 2 and player.current_media: - metadata_checksum = f"{player.current_media.uri}.{player.current_media.title}.{player.current_media.image_url}" # noqa: E501 - if prev_metadata_checksum != metadata_checksum: - prev_metadata_checksum = metadata_checksum - prev_progress_report = now - self.mass.create_task(self._send_metadata()) - # send the progress report every 5 seconds - elif now - prev_progress_report >= 5: - prev_progress_report = now - self.mass.create_task(self._send_progress()) - if "set pause" in line or "Pause at" in line: - player.set_state_from_raop(state=PlaybackState.PAUSED) - if "Restarted at" in line or "restarting w/ pause" in line: - player.set_state_from_raop(state=PlaybackState.PLAYING) - if "restarting w/o pause" in line: - # streaming has started - player.set_state_from_raop(state=PlaybackState.PLAYING, elapsed_time=0) - if "lost packet out of backlog" in line: - lost_packets += 1 - if lost_packets == 100: - logger.error("High packet loss detected, restarting playback...") - self.mass.create_task(self.mass.players.cmd_resume(self.player.player_id)) - else: - logger.warning("Packet loss detected!") - if "end of stream reached" in line: - logger.debug("End of stream reached") - break - logger.log(VERBOSE_LOG_LEVEL, line) - - # ensure we're cleaned up afterwards (this also logs the returncode) - await self.stop() - - async def _send_metadata(self) -> None: - """Send metadata to player (and connected sync childs).""" - if not self.player or not self.player.current_media or self._stopped: - return - duration = min(self.player.current_media.duration or 0, 3600) - title = self.player.current_media.title or "" - artist = self.player.current_media.artist or "" - album = self.player.current_media.album or "" - cmd = f"TITLE={title}\nARTIST={artist}\nALBUM={album}\n" - cmd += f"DURATION={duration}\nPROGRESS=0\nACTION=SENDMETA\n" - - await self.send_cli_command(cmd) - - # get image - if not self.player.current_media.image_url or self._stopped: - return - await self.send_cli_command(f"ARTWORK={self.player.current_media.image_url}\n") - - async def _send_progress(self) -> None: - """Send progress report to player (and connected sync childs).""" - if not self.player.current_media or self._stopped: - return - progress = int(self.player.corrected_elapsed_time or 0) - await self.send_cli_command(f"PROGRESS={progress}\n") diff --git a/music_assistant/providers/airplay/stream_session.py b/music_assistant/providers/airplay/stream_session.py new file mode 100644 index 00000000..5e89793c --- /dev/null +++ b/music_assistant/providers/airplay/stream_session.py @@ -0,0 +1,418 @@ +"""Unified AirPlay/RAOP stream session logic for AirPlay devices.""" + +from __future__ import annotations + +import asyncio +import logging +import time +from collections.abc import AsyncGenerator +from contextlib import suppress +from typing import TYPE_CHECKING + +from music_assistant.helpers.audio import get_player_filter_params +from music_assistant.helpers.ffmpeg import FFMpeg +from music_assistant.helpers.util import TaskManager, close_async_generator +from music_assistant.providers.airplay.helpers import unix_time_to_ntp + +from .constants import CONF_ENABLE_LATE_JOIN, StreamingProtocol +from .protocols.airplay2 import AirPlay2Stream +from .protocols.raop import RaopStream + +if TYPE_CHECKING: + from music_assistant_models.media_items import AudioFormat + from music_assistant_models.player import PlayerMedia + + from .player import AirPlayPlayer + from .provider import AirPlayProvider + + +class AirPlayStreamSession: + """Stream session (RAOP or AirPlay2) to one or more players.""" + + def __init__( + self, + airplay_provider: AirPlayProvider, + sync_clients: list[AirPlayPlayer], + pcm_format: AudioFormat, + audio_source: AsyncGenerator[bytes, None], + ) -> None: + """Initialize AirPlayStreamSession. + + Args: + airplay_provider: The AirPlay provider instance + sync_clients: List of AirPlay players to stream to + pcm_format: PCM format of the input stream + audio_source: Async generator yielding audio chunks + """ + assert sync_clients + self.prov = airplay_provider + self.mass = airplay_provider.mass + self.pcm_format = pcm_format + self.sync_clients = sync_clients + self._audio_source = audio_source + self._audio_source_task: asyncio.Task[None] | None = None + self._player_ffmpeg: dict[str, FFMpeg] = {} + self._player_start_chunk: dict[str, int] = {} # Chunk number when player joined + self._lock = asyncio.Lock() + self.start_ntp: int = 0 + self.start_time: float = 0.0 + self.chunks_streamed: int = 0 # Total chunks sent to session (each chunk = 1 second) + + async def start(self) -> None: + """Initialize stream session for all players.""" + # Get current NTP timestamp and calculate wait time + cur_time = time.time() + wait_start = 1750 + (250 * len(self.sync_clients)) # in milliseconds + wait_start_seconds = wait_start / 1000 + self.wait_start = wait_start_seconds # in seconds + self.start_time = cur_time + wait_start_seconds + self.start_ntp = unix_time_to_ntp(self.start_time) + + self.prov.logger.info( + "Starting stream session with %d clients", + len(self.sync_clients), + ) + + async def _start_client(airplay_player: AirPlayPlayer) -> None: + """Start stream for a single client.""" + # Stop existing stream if running + if airplay_player.stream and airplay_player.stream.running: + await airplay_player.stream.stop() + if ffmpeg := self._player_ffmpeg.pop(airplay_player.player_id, None): + await ffmpeg.close() + del ffmpeg + + self._player_start_chunk[airplay_player.player_id] = 1 + + # Create appropriate stream type based on protocol + if airplay_player.protocol == StreamingProtocol.AIRPLAY2: + airplay_player.stream = AirPlay2Stream(self, airplay_player) + else: + airplay_player.stream = RaopStream(self, airplay_player) + + # create optional FFMpeg instance per player if needed + # this is used to do any optional DSP processing/filtering + filter_params = get_player_filter_params( + self.mass, + airplay_player.player_id, + self.pcm_format, + airplay_player.stream.pcm_format, + ) + if filter_params or self.pcm_format != airplay_player.stream.pcm_format: + ffmpeg = FFMpeg( + audio_input="-", + input_format=self.pcm_format, + output_format=airplay_player.stream.pcm_format, + filter_params=filter_params, + ) + await ffmpeg.start() + self._player_ffmpeg[airplay_player.player_id] = ffmpeg + + await airplay_player.stream.start(self.start_ntp) + + # Tracking will be initialized on first write + + async with TaskManager(self.mass) as tm: + for _airplay_player in self.sync_clients: + tm.create_task(_start_client(_airplay_player)) + # Start audio source streamer task + # this will read from the audio source and distribute to all players + self._audio_source_task = asyncio.create_task(self._audio_streamer()) + + async def stop(self) -> None: + """Stop playback and cleanup.""" + if self._audio_source_task and not self._audio_source_task.done(): + self._audio_source_task.cancel() + with suppress(asyncio.CancelledError): + await self._audio_source_task + await asyncio.gather( + *[self.remove_client(x) for x in self.sync_clients], + return_exceptions=True, + ) + + async def remove_client(self, airplay_player: AirPlayPlayer) -> None: + """Remove a sync client from the session.""" + if airplay_player not in self.sync_clients: + return + assert airplay_player.stream + assert airplay_player.stream.session == self + async with self._lock: + self.sync_clients.remove(airplay_player) + if ffmpeg := self._player_ffmpeg.pop(airplay_player.player_id, None): + await ffmpeg.close() + del ffmpeg + # Clean up player tracking + self._player_start_chunk.pop(airplay_player.player_id, None) + await airplay_player.stream.stop() + airplay_player.stream = None + # If this was the last client, stop the session + if not self.sync_clients: + await self.stop() + return + + async def add_client(self, airplay_player: AirPlayPlayer) -> None: + """Add a sync client to the session as a late joiner. + + The late joiner will: + 1. Start playing at a compensated NTP timestamp (start_ntp + offset) + 2. Receive silence calculated dynamically based on how much audio has been sent + 3. Then receive real audio chunks in sync with other players + """ + sync_leader = self.sync_clients[0] + if not sync_leader.stream or not sync_leader.stream.running: + return + + allow_late_join = self.prov.config.get_value(CONF_ENABLE_LATE_JOIN, False) + if not allow_late_join: + # Late joining is not allowed - restart the session for all players + await self.stop() # we need to stop the current session to add a new client + # this could potentially be called by multiple players at the exact same time + # so we debounce the resync a bit here with a timer + if sync_leader.current_media: + self.mass.call_later( + 0.5, + self.mass.players.cmd_resume(sync_leader.player_id), + task_id=f"resync_session_{sync_leader.player_id}", + ) + + # Stop existing stream if the player is already streaming + if airplay_player.stream and airplay_player.stream.running: + await airplay_player.stream.stop() + + # Clean up any existing FFmpeg instance for this player + if ffmpeg := self._player_ffmpeg.pop(airplay_player.player_id, None): + await ffmpeg.close() + del ffmpeg + + # Create appropriate stream type based on protocol + if airplay_player.protocol == StreamingProtocol.AIRPLAY2: + airplay_player.stream = AirPlay2Stream(self, airplay_player) + else: + airplay_player.stream = RaopStream(self, airplay_player) + + # Create optional FFMpeg instance per player if needed + filter_params = get_player_filter_params( + self.mass, + airplay_player.player_id, + self.pcm_format, + airplay_player.stream.pcm_format, + ) + if filter_params or self.pcm_format != airplay_player.stream.pcm_format: + ffmpeg = FFMpeg( + audio_input="-", + input_format=self.pcm_format, + output_format=airplay_player.stream.pcm_format, + filter_params=filter_params, + ) + await ffmpeg.start() + self._player_ffmpeg[airplay_player.player_id] = ffmpeg + + # Snapshot chunks_streamed inside lock to prevent race conditions + # Keep lock held during stream.start() to ensure player doesn't miss any chunks + async with self._lock: + # Calculate skip_seconds based on how many chunks have been sent + skip_seconds = self.chunks_streamed + + # Add player to sync clients list + if airplay_player not in self.sync_clients: + self.sync_clients.append(airplay_player) + + await airplay_player.stream.start(self.start_ntp, skip_seconds) + + async def replace_stream(self, audio_source: AsyncGenerator[bytes, None]) -> None: + """Replace the audio source of the stream.""" + # Cancel the current audio source task + assert self._audio_source_task # for type checker + self._audio_source_task.cancel() + with suppress(asyncio.CancelledError): + await self._audio_source_task + # Set new audio source and restart the stream + self._audio_source = audio_source + self._audio_source_task = asyncio.create_task(self._audio_streamer()) + # Restart the (player-specific) ffmpeg stream for all players + # This is the easiest way to ensure the new audio source is used + # as quickly as possible, without waiting for the buffers to be drained + # It also allows changing the player settings such as DSP on the fly + # for sync_client in self.sync_clients: + # if not sync_client.stream: + # continue # guard + # sync_client.stream.start_ffmpeg_stream() + + async def _audio_streamer(self) -> None: + """Stream audio to all players.""" + generator_exhausted = False + _last_metadata: str | None = None + try: + # each chunk is exactly one second of audio data based on the pcm format. + async for chunk in self._audio_source: + async with self._lock: + sync_clients = [x for x in self.sync_clients if x.stream and x.stream.running] + if not sync_clients: + self.prov.logger.error( + "!!! AUDIO STREAMER EXITING: No running clients left! " + "Total sync_clients: %d, Details: %s", + len(self.sync_clients), + [ + (x.player_id, x.stream.running if x.stream else None) + for x in self.sync_clients + ], + ) + return + + # Write to all players with a timeout (10 seconds) + # Timeout must account for player's internal latency buffer (1-4 seconds) + # The player may legitimately not accept data while draining its buffer + write_start = time.time() + write_tasks = [ + asyncio.wait_for(self._write_chunk_to_player(x, chunk), timeout=10.0) + for x in sync_clients + if x.stream + ] + results = await asyncio.gather(*write_tasks, return_exceptions=True) + write_elapsed = time.time() - write_start + + # Check for write errors or timeouts + players_to_remove = [] + for i, result in enumerate(results): + if i >= len(sync_clients): + continue + player = sync_clients[i] + + if isinstance(result, asyncio.TimeoutError): + self.prov.logger.error( + "!!! TIMEOUT writing chunk %d to player %s - " + "REMOVING from sync group! Total write time=%.3fs", + self.chunks_streamed, + player.player_id, + write_elapsed, + ) + players_to_remove.append(player) + elif isinstance(result, Exception): + self.prov.logger.error( + "!!! Error writing chunk %d to player %s: %s - " + "REMOVING from sync group! Total write time=%.3fs", + self.chunks_streamed, + player.player_id, + result, + write_elapsed, + ) + players_to_remove.append(player) + + # Remove failed/timed-out players from sync group + for player in players_to_remove: + if player in self.sync_clients: + self.sync_clients.remove(player) + self.prov.logger.warning( + "Player %s removed from sync group due to write failure/timeout", + player.player_id, + ) + # Stop the player's stream + if player.stream: + self.mass.create_task(player.stream.stop()) + + # Update chunk counter (each chunk is exactly one second of audio) + self.chunks_streamed += 1 + + # send metadata if changed + # do this in a separate task to not disturb audio streaming + # NOTE: we should probably move this out of the audio stream task into it's own task + if ( + self.sync_clients + and (_leader := self.sync_clients[0]) + and (_leader.corrected_elapsed_time or 0) > 2 + and (metadata := _leader.current_media) is not None + ): + now = time.time() + metadata_checksum = f"{metadata.uri}.{metadata.title}.{metadata.image_url}" + progress = metadata.corrected_elapsed_time or 0 + if _last_metadata != metadata_checksum: + _last_metadata = metadata_checksum + prev_progress_report = now + self.mass.create_task(self._send_metadata(progress, metadata)) + # send the progress report every 5 seconds + elif now - prev_progress_report >= 5: + prev_progress_report = now + self.mass.create_task(self._send_metadata(progress, None)) + # Entire stream consumed: send EOF + generator_exhausted = True + async with self._lock: + await asyncio.gather( + *[ + self._write_eof_to_player(x) + for x in self.sync_clients + if x.stream and x.stream.running + ], + return_exceptions=True, + ) + except Exception as err: + logger = self.prov.logger + logger.error( + "Stream error: %s", + str(err) or err.__class__.__name__, + exc_info=err if logger.isEnabledFor(logging.DEBUG) else None, + ) + raise + finally: + if not generator_exhausted: + await close_async_generator(self._audio_source) + + async def _write_chunk_to_player(self, airplay_player: AirPlayPlayer, chunk: bytes) -> None: + """ + Write audio chunk to a specific player. + + each chunk is exactly one second of audio data based on the pcm format. + For late joiners, compensates for chunks sent between join time and actual chunk delivery. + Blocks (async) until the data has been written. + """ + write_start = time.time() + chunk_number = self.chunks_streamed + 1 + player_id = airplay_player.player_id + + # Calculate chunk offset based on actual time vs start time + self._player_start_chunk.pop(player_id, None) + + # if the player has an associated FFMpeg instance, use that first + if ffmpeg := self._player_ffmpeg.get(player_id): + await ffmpeg.write(chunk) + chunk_to_send = await ffmpeg.read(len(chunk)) + else: + chunk_to_send = chunk + + assert airplay_player.stream + stream_write_start = time.time() + await airplay_player.stream.write_chunk(chunk_to_send) + stream_write_elapsed = time.time() - stream_write_start + + total_elapsed = time.time() - write_start + + # Log only truly abnormal writes (>5s indicates a real stall) + # Can take up to ~4s if player's latency buffer is being drained + if total_elapsed > 5.0: + self.prov.logger.error( + "!!! STALLED WRITE: Player %s chunk %d took %.3fs total (stream write: %.3fs)", + player_id, + chunk_number, + total_elapsed, + stream_write_elapsed, + ) + + async def _write_eof_to_player(self, airplay_player: AirPlayPlayer) -> None: + """Write EOF to a specific player.""" + # cleanup any associated FFMpeg instance first + if ffmpeg := self._player_ffmpeg.pop(airplay_player.player_id, None): + await ffmpeg.write_eof() + await ffmpeg.close() + assert airplay_player.stream + await airplay_player.stream.write_eof() + + async def _send_metadata(self, progress: int | None, metadata: PlayerMedia | None) -> None: + """Send metadata to all players.""" + async with self._lock: + await asyncio.gather( + *[ + x.stream.send_metadata(progress, metadata) + for x in self.sync_clients + if x.stream and x.stream.running + ], + return_exceptions=True, + )