from typing import TYPE_CHECKING
from music_assistant_models.config_entries import ProviderConfig
-from music_assistant_models.enums import ProviderFeature
+from music_assistant_models.enums import ConfigEntryType, ProviderFeature
from music_assistant_models.provider import ProviderManifest
from music_assistant.mass import MusicAssistant
+from music_assistant.providers.airplay.constants import CONF_ENABLE_LATE_JOIN
from .provider import AirPlayProvider
values: the (intermediate) raw values for config entries sent with the action.
"""
# ruff: noqa: ARG001
- return ()
+ return (
+ ConfigEntry(
+ key=CONF_ENABLE_LATE_JOIN,
+ type=ConfigEntryType.BOOLEAN,
+ default_value=False,
+ label="Enable late joining",
+ description=(
+ "Allow the player to join an existing AirPlay stream instead of "
+ "starting a new one. \n NOTE: may not work in all conditions. "
+ "If you experience issues or players are not fully in sync, disable this option."
+ ),
+ category="airplay",
+ ),
+ )
async def setup(
from __future__ import annotations
+from enum import Enum
from typing import Final
from music_assistant_models.enums import ContentType
DOMAIN = "airplay"
+
+class StreamingProtocol(Enum):
+ """AirPlay streaming protocol versions."""
+
+ RAOP = 1 # AirPlay 1 (RAOP)
+ AIRPLAY2 = 2 # AirPlay 2
+
+
CACHE_CATEGORY_PREV_VOLUME: Final[int] = 1
CONF_ENCRYPTION: Final[str] = "encryption"
CONF_READ_AHEAD_BUFFER: Final[str] = "read_ahead_buffer"
CONF_IGNORE_VOLUME: Final[str] = "ignore_volume"
CONF_CREDENTIALS: Final[str] = "credentials"
+CONF_AIRPLAY_PROTOCOL: Final[str] = "airplay_protocol"
+
+AIRPLAY_DISCOVERY_TYPE: Final[str] = "_airplay._tcp.local."
+RAOP_DISCOVERY_TYPE: Final[str] = "_raop._tcp.local."
+DACP_DISCOVERY_TYPE: Final[str] = "_dacp._tcp.local."
+
+AIRPLAY2_MIN_LOG_LEVEL: Final[int] = 3 # Min loglevel to ensure stderr output contains what we need
+CONF_AP_CREDENTIALS: Final[str] = "ap_credentials"
+CONF_MRP_CREDENTIALS: Final[str] = "mrp_credentials"
+CONF_ACTION_START_PAIRING: Final[str] = "start_ap_pairing"
+CONF_ACTION_FINISH_PAIRING: Final[str] = "finish_ap_pairing"
+CONF_ACTION_START_MRP_PAIRING: Final[str] = "start_mrp_pairing"
+CONF_ACTION_FINISH_MRP_PAIRING: Final[str] = "finish_mrp_pairing"
+CONF_PAIRING_PIN: Final[str] = "pairing_pin"
+CONF_MRP_PAIRING_PIN: Final[str] = "mrp_pairing_pin"
+CONF_ENABLE_LATE_JOIN: Final[str] = "enable_late_join"
BACKOFF_TIME_LOWER_LIMIT: Final[int] = 15 # seconds
BACKOFF_TIME_UPPER_LIMIT: Final[int] = 300 # Five minutes
("Sonos", "Arc Ultra"),
# Samsung has been repeatedly being reported as having issues with AirPlay 1/raop
("Samsung", "*"),
+ ("Ubiquiti Inc.", "*"),
+ ("Juke Audio", "*"),
)
import os
import platform
+import time
from typing import TYPE_CHECKING
from zeroconf import IPVersion
from music_assistant.helpers.process import check_output
-from music_assistant.providers.airplay.constants import BROKEN_RAOP_MODELS
+from music_assistant.providers.airplay.constants import BROKEN_RAOP_MODELS, StreamingProtocol
if TYPE_CHECKING:
from zeroconf.asyncio import AsyncServiceInfo
+# NTP epoch delta: difference between Unix epoch (1970) and NTP epoch (1900)
+NTP_EPOCH_DELTA = 0x83AA7E80 # 2208988800 seconds
+
def convert_airplay_volume(value: float) -> int:
"""Remap AirPlay Volume to 0..100 scale."""
return int(portion + normal_min)
-def get_model_info(info: AsyncServiceInfo) -> tuple[str, str]:
+def get_model_info(info: AsyncServiceInfo) -> tuple[str, str]: # noqa: PLR0911
"""Return Manufacturer and Model name from mdns info."""
manufacturer = info.decoded_properties.get("manufacturer")
model = info.decoded_properties.get("model")
return ("Apple", "Apple TV 4K Gen2")
if model == "AppleTV14,1":
return ("Apple", "Apple TV 4K Gen3")
+ if model == "UPL-AMP":
+ return ("Ubiquiti Inc.", "UPL-AMP")
if "AirPort" in model:
return ("Apple", "AirPort Express")
if "AudioAccessory" in model:
if "AppleTV" in model:
model = "Apple TV"
manufacturer = "Apple"
+ # Detect Mac devices (Mac mini, MacBook, iMac, etc.)
+ # Model identifiers like: Mac16,11, MacBookPro18,3, iMac21,1
+ if model.startswith(("Mac", "iMac")):
+ # Parse Mac model to friendly name
+ if model.startswith("MacBookPro"):
+ return ("Apple", f"MacBook Pro ({model})")
+ if model.startswith("MacBookAir"):
+ return ("Apple", f"MacBook Air ({model})")
+ if model.startswith("MacBook"):
+ return ("Apple", f"MacBook ({model})")
+ if model.startswith("iMac"):
+ return ("Apple", f"iMac ({model})")
+ if model.startswith("Macmini"):
+ return ("Apple", f"Mac mini ({model})")
+ if model.startswith("MacPro"):
+ return ("Apple", f"Mac Pro ({model})")
+ if model.startswith("MacStudio"):
+ return ("Apple", f"Mac Studio ({model})")
+ # Generic Mac device (e.g. Mac16,11 for Mac mini M4)
+ return ("Apple", f"Mac ({model})")
return (manufacturer or "AirPlay", model)
return False
-async def get_cliraop_binary() -> str:
- """Find the correct raop/airplay binary belonging to the platform."""
+async def get_cli_binary(protocol: StreamingProtocol) -> str:
+ """Find the correct raop/airplay binary belonging to the platform.
+
+ Args:
+ protocol: The streaming protocol (RAOP or AIRPLAY2)
+
+ Returns:
+ Path to the CLI binary
- async def check_binary(cliraop_path: str) -> str | None:
+ Raises:
+ RuntimeError: If the binary cannot be found
+ """
+
+ async def check_binary(cli_path: str) -> str | None:
try:
- returncode, output = await check_output(
- cliraop_path,
- "-check",
- )
- if returncode == 0 and output.strip().decode() == "cliraop check":
- return cliraop_path
+ if protocol == StreamingProtocol.RAOP:
+ args = [
+ cli_path,
+ "-check",
+ ]
+ passing_output = "cliraop check"
+ else:
+ config_file = os.path.join(os.path.dirname(__file__), "bin", "cliap2.conf")
+ args = [
+ cli_path,
+ "--testrun",
+ "--config",
+ config_file,
+ ]
+
+ returncode, output = await check_output(*args)
+ if (
+ protocol == StreamingProtocol.RAOP
+ and returncode == 0
+ and output.strip().decode() == passing_output
+ ) or (protocol == StreamingProtocol.AIRPLAY2 and returncode == 0):
+ return cli_path
except OSError:
pass
return None
system = platform.system().lower().replace("darwin", "macos")
architecture = platform.machine().lower()
+ if protocol == StreamingProtocol.RAOP:
+ package = "cliraop"
+ elif protocol == StreamingProtocol.AIRPLAY2:
+ package = "cliap2"
+ else:
+ raise RuntimeError(f"Unsupported streaming protocol requested: {protocol}")
+
if bridge_binary := await check_binary(
- os.path.join(base_path, f"cliraop-{system}-{architecture}")
+ os.path.join(base_path, f"{package}-{system}-{architecture}")
):
return bridge_binary
- msg = f"Unable to locate RAOP Play binary for {system}/{architecture}"
+ msg = (
+ f"Unable to locate {protocol.name} CLI stream binary {package} for {system}/{architecture}"
+ )
raise RuntimeError(msg)
+
+
+def get_ntp_timestamp() -> int:
+ """
+ Get current NTP timestamp (64-bit).
+
+ Returns:
+ int: 64-bit NTP timestamp (upper 32 bits = seconds, lower 32 bits = fraction)
+ """
+ # Get current Unix timestamp with microsecond precision
+ current_time = time.time()
+
+ # Split into seconds and microseconds
+ seconds = int(current_time)
+ microseconds = int((current_time - seconds) * 1_000_000)
+
+ # Convert to NTP epoch (add offset from 1970 to 1900)
+ ntp_seconds = seconds + NTP_EPOCH_DELTA
+
+ # Convert microseconds to NTP fraction (2^32 parts per second)
+ # fraction = (microseconds * 2^32) / 1_000_000
+ ntp_fraction = int((microseconds << 32) / 1_000_000)
+
+ # Combine into 64-bit value
+ return (ntp_seconds << 32) | ntp_fraction
+
+
+def ntp_to_seconds_fraction(ntp_timestamp: int) -> tuple[int, int]:
+ """
+ Split NTP timestamp into seconds and fraction components.
+
+ Args:
+ ntp_timestamp: 64-bit NTP timestamp
+
+ Returns:
+ tuple: (seconds, fraction)
+ """
+ seconds = ntp_timestamp >> 32
+ fraction = ntp_timestamp & 0xFFFFFFFF
+ return seconds, fraction
+
+
+def ntp_to_unix_time(ntp_timestamp: int) -> float:
+ """
+ Convert NTP timestamp to Unix timestamp (float).
+
+ Args:
+ ntp_timestamp: 64-bit NTP timestamp
+
+ Returns:
+ float: Unix timestamp (seconds since 1970-01-01)
+ """
+ seconds = ntp_timestamp >> 32
+ fraction = ntp_timestamp & 0xFFFFFFFF
+
+ # Convert back to Unix epoch
+ unix_seconds = seconds - NTP_EPOCH_DELTA
+
+ # Convert fraction to microseconds
+ microseconds = (fraction * 1_000_000) >> 32
+
+ return unix_seconds + (microseconds / 1_000_000)
+
+
+def unix_time_to_ntp(unix_timestamp: float) -> int:
+ """
+ Convert Unix timestamp (float) to NTP timestamp.
+
+ Args:
+ unix_timestamp: Unix timestamp (seconds since 1970-01-01)
+
+ Returns:
+ int: 64-bit NTP timestamp
+ """
+ seconds = int(unix_timestamp)
+ microseconds = int((unix_timestamp - seconds) * 1_000_000)
+
+ # Convert to NTP epoch
+ ntp_seconds = seconds + NTP_EPOCH_DELTA
+
+ # Convert microseconds to NTP fraction
+ ntp_fraction = int((microseconds << 32) / 1_000_000)
+
+ return (ntp_seconds << 32) | ntp_fraction
+
+
+def add_seconds_to_ntp(ntp_timestamp: int, seconds: float) -> int:
+ """
+ Add seconds to an NTP timestamp.
+
+ Args:
+ ntp_timestamp: 64-bit NTP timestamp
+ seconds: Number of seconds to add (can be fractional)
+
+ Returns:
+ int: New NTP timestamp with seconds added
+ """
+ # Extract whole seconds and fraction
+ whole_seconds = int(seconds)
+ fraction = seconds - whole_seconds
+
+ # Convert to NTP format (upper 32 bits = seconds, lower 32 bits = fraction)
+ ntp_seconds = whole_seconds << 32
+ ntp_fraction = int(fraction * (1 << 32))
+
+ return ntp_timestamp + ntp_seconds + ntp_fraction
"multi_instance": false,
"builtin": false,
"icon": "cast-variant",
- "mdns_discovery": ["_raop._tcp.local."]
+ "mdns_discovery": ["_airplay._tcp.local.", "_raop._tcp.local."]
}
-"""AirPlay Player implementation."""
+"""AirPlay Player implementations."""
from __future__ import annotations
import time
from typing import TYPE_CHECKING, cast
-from music_assistant_models.config_entries import ConfigEntry, ConfigValueType
+from music_assistant_models.config_entries import ConfigEntry, ConfigValueOption, ConfigValueType
from music_assistant_models.enums import (
ConfigEntryType,
ContentType,
PlayerType,
)
from music_assistant_models.media_items import AudioFormat
+from propcache import under_cached_property as cached_property
from music_assistant.constants import (
CONF_ENTRY_DEPRECATED_EQ_BASS,
from music_assistant.providers.universal_group.constants import UGP_PREFIX
from .constants import (
+ AIRPLAY_DISCOVERY_TYPE,
AIRPLAY_FLOW_PCM_FORMAT,
AIRPLAY_PCM_FORMAT,
CACHE_CATEGORY_PREV_VOLUME,
+ CONF_ACTION_FINISH_PAIRING,
+ CONF_ACTION_START_PAIRING,
+ CONF_AIRPLAY_PROTOCOL,
CONF_ALAC_ENCODE,
+ CONF_AP_CREDENTIALS,
CONF_ENCRYPTION,
CONF_IGNORE_VOLUME,
+ CONF_PAIRING_PIN,
CONF_PASSWORD,
CONF_READ_AHEAD_BUFFER,
FALLBACK_VOLUME,
+ RAOP_DISCOVERY_TYPE,
+ StreamingProtocol,
)
from .helpers import get_primary_ip_address_from_zeroconf, is_broken_raop_model
-from .raop import RaopStreamSession
+from .stream_session import AirPlayStreamSession
if TYPE_CHECKING:
from zeroconf.asyncio import AsyncServiceInfo
from music_assistant.providers.universal_group import UniversalGroupPlayer
+ from .protocols.airplay2 import AirPlay2Stream
+ from .protocols.raop import RaopStream
from .provider import AirPlayProvider
- from .raop import RaopStream
BROKEN_RAOP_WARN = ConfigEntry(
self,
provider: AirPlayProvider,
player_id: str,
- discovery_info: AsyncServiceInfo,
+ discovery_info: AsyncServiceInfo | None,
address: str,
display_name: str,
manufacturer: str,
super().__init__(provider, player_id)
self.discovery_info = discovery_info
self.address = address
- self.raop_stream: RaopStream | None = None
+ self.stream: RaopStream | AirPlay2Stream | None = None
self.last_command_sent = 0.0
self._lock = asyncio.Lock()
-
# Set (static) player attributes
self._attr_type = PlayerType.PLAYER
self._attr_name = display_name
self._attr_can_group_with = {provider.lookup_key}
self._attr_enabled_by_default = not is_broken_raop_model(manufacturer, model)
+ @cached_property
+ def protocol(self) -> StreamingProtocol:
+ """Get the streaming protocol to use for this player."""
+ protocol_value = self.config.get_value(CONF_AIRPLAY_PROTOCOL)
+ # Convert integer value to StreamingProtocol enum
+ if protocol_value == 2:
+ return StreamingProtocol.AIRPLAY2
+ return StreamingProtocol.RAOP
+
async def get_config_entries(
self,
action: str | None = None,
values: dict[str, ConfigValueType] | None = None,
) -> list[ConfigEntry]:
"""Return all (provider/player specific) Config Entries for the given player (if any)."""
+ base_entries = await super().get_config_entries()
+
+ # Handle pairing actions
+ if action and self._requires_pairing():
+ await self._handle_pairing_action(action=action, values=values)
+
+ # Add pairing config entries for Apple TV and macOS devices
+ if self._requires_pairing():
+ base_entries = [*self._get_pairing_config_entries(), *base_entries]
+
base_entries = await super().get_config_entries(action=action, values=values)
base_entries += [
CONF_ENTRY_FLOW_MODE_ENFORCED,
CONF_ENTRY_DEPRECATED_EQ_MID,
CONF_ENTRY_DEPRECATED_EQ_TREBLE,
CONF_ENTRY_OUTPUT_CODEC_HIDDEN,
+ ConfigEntry(
+ key=CONF_AIRPLAY_PROTOCOL,
+ type=ConfigEntryType.INTEGER,
+ default_value=StreamingProtocol.RAOP.value,
+ required=False,
+ label="AirPlay version to use for streaming",
+ description="AirPlay version 1 protocol uses RAOP.\n"
+ "AirPlay version 2 is an extension of RAOP.\n"
+ "Some newer devices do not fully support RAOP and "
+ "will only work with AirPlay version 2.",
+ category="airplay",
+ options=[
+ ConfigValueOption("AirPlay 1 (RAOP)", StreamingProtocol.RAOP.value),
+ ConfigValueOption("AirPlay 2", StreamingProtocol.AIRPLAY2.value),
+ ],
+ hidden=True,
+ ),
ConfigEntry(
key=CONF_ENCRYPTION,
type=ConfigEntryType.BOOLEAN,
label="Audio buffer (ms)",
description="Amount of buffer (in milliseconds), "
"the player should keep to absorb network throughput jitter. "
- "If you experience audio dropouts, try increasing this value.",
+ "Lower values reduce latency but may cause dropouts. "
+ "Recommended: 1000ms for stable playback.",
category="airplay",
- range=(500, 3000),
+ range=(500, 2000),
),
# airplay has fixed sample rate/bit depth so make this config entry static and hidden
create_sample_rates_config_entry(
),
]
+ # Regular AirPlay config entries
+ base_entries.extend(
+ [
+ ConfigEntry(
+ key=CONF_ENCRYPTION,
+ type=ConfigEntryType.BOOLEAN,
+ default_value=True,
+ label="Enable encryption",
+ description="Enable encrypted communication with the player, "
+ "should by default be enabled for most devices.",
+ category="airplay",
+ ),
+ ConfigEntry(
+ key=CONF_ALAC_ENCODE,
+ type=ConfigEntryType.BOOLEAN,
+ default_value=True,
+ label="Enable compression",
+ description="Save some network bandwidth by sending the audio as "
+ "(lossless) ALAC at the cost of a bit CPU.",
+ category="airplay",
+ ),
+ CONF_ENTRY_SYNC_ADJUST,
+ ConfigEntry(
+ key=CONF_PASSWORD,
+ type=ConfigEntryType.SECURE_STRING,
+ default_value=None,
+ required=False,
+ label="Device password",
+ description="Some devices require a password to connect/play.",
+ category="airplay",
+ ),
+ ConfigEntry(
+ key=CONF_READ_AHEAD_BUFFER,
+ type=ConfigEntryType.INTEGER,
+ default_value=1000,
+ required=False,
+ label="Audio buffer (ms)",
+ description="Amount of buffer (in milliseconds), "
+ "the player should keep to absorb network throughput jitter. "
+ "If you experience audio dropouts, try increasing this value.",
+ category="airplay",
+ range=(500, 3000),
+ ),
+ # airplay has fixed sample rate/bit depth so make this config entry
+ # static and hidden
+ create_sample_rates_config_entry(
+ supported_sample_rates=[44100], supported_bit_depths=[16], hidden=True
+ ),
+ ConfigEntry(
+ key=CONF_IGNORE_VOLUME,
+ type=ConfigEntryType.BOOLEAN,
+ default_value=False,
+ label="Ignore volume reports sent by the device itself",
+ description=(
+ "The AirPlay protocol allows devices to report their own volume "
+ "level. \n"
+ "For some devices this is not reliable and can cause unexpected "
+ "volume changes. \n"
+ "Enable this option to ignore these reports."
+ ),
+ category="airplay",
+ ),
+ ]
+ )
+
if is_broken_raop_model(self.device_info.manufacturer, self.device_info.model):
base_entries.insert(-1, BROKEN_RAOP_WARN)
return base_entries
+ def _requires_pairing(self) -> bool:
+ """Check if this device requires pairing (Apple TV or macOS)."""
+ if self.device_info.manufacturer.lower() != "apple":
+ return False
+
+ model = self.device_info.model
+ # Apple TV devices
+ if "appletv" in model.lower() or "apple tv" in model.lower():
+ return True
+ # Mac devices (including iMac, MacBook, Mac mini, Mac Pro, Mac Studio)
+ return model.startswith(("Mac", "iMac"))
+
+ def _get_pairing_config_entries(self) -> list[ConfigEntry]:
+ """Return pairing config entries for Apple TV and macOS devices.
+
+ Uses cliraop for AirPlay/RAOP pairing.
+ """
+ entries: list[ConfigEntry] = []
+
+ # Check if we have credentials stored
+ has_credentials = bool(self.config.get_value(CONF_AP_CREDENTIALS))
+
+ if not has_credentials:
+ # Show pairing instructions and start button
+ entries.append(
+ ConfigEntry(
+ key="pairing_instructions",
+ type=ConfigEntryType.LABEL,
+ label="AirPlay Pairing Required",
+ description=(
+ "This device requires pairing before it can be used. "
+ "Click the button below to start the pairing process."
+ ),
+ )
+ )
+ entries.append(
+ ConfigEntry(
+ key=CONF_ACTION_START_PAIRING,
+ type=ConfigEntryType.ACTION,
+ label="Start Pairing",
+ description="Start the AirPlay pairing process",
+ action=CONF_ACTION_START_PAIRING,
+ )
+ )
+ else:
+ # Show paired status
+ entries.append(
+ ConfigEntry(
+ key="pairing_status",
+ type=ConfigEntryType.LABEL,
+ label="AirPlay Pairing Status",
+ description="Device is paired and ready to use.",
+ )
+ )
+
+ # If pairing was started, show PIN entry
+ if self.config.get_value("_pairing_in_progress"):
+ entries.append(
+ ConfigEntry(
+ key=CONF_PAIRING_PIN,
+ type=ConfigEntryType.STRING,
+ label="Enter PIN",
+ description="Enter the 4-digit PIN shown on the device",
+ required=True,
+ )
+ )
+ entries.append(
+ ConfigEntry(
+ key=CONF_ACTION_FINISH_PAIRING,
+ type=ConfigEntryType.ACTION,
+ label="Finish Pairing",
+ description="Complete the pairing process with the PIN",
+ action=CONF_ACTION_FINISH_PAIRING,
+ )
+ )
+
+ # Store credentials (hidden from UI)
+ entries.append(
+ ConfigEntry(
+ key=CONF_AP_CREDENTIALS,
+ type=ConfigEntryType.SECURE_STRING,
+ label="AirPlay Credentials",
+ default_value=None,
+ required=False,
+ hidden=True,
+ )
+ )
+
+ return entries
+
+ async def _handle_pairing_action(
+ self, action: str, values: dict[str, ConfigValueType] | None
+ ) -> None:
+ """Handle pairing actions using cliraop.
+
+ TODO: Implement actual cliraop-based pairing.
+ """
+ if action == CONF_ACTION_START_PAIRING:
+ # TODO: Start pairing using cliraop
+ # For now, just set a flag to show the PIN entry
+ self.mass.config.set_raw_player_config_value(
+ self.player_id, "_pairing_in_progress", True
+ )
+ self.logger.info("Started AirPlay pairing for %s", self.display_name)
+
+ elif action == CONF_ACTION_FINISH_PAIRING:
+ # TODO: Finish pairing using cliraop with the provided PIN
+ if not values:
+ return
+
+ pin = values.get(CONF_PAIRING_PIN)
+ if not pin:
+ self.logger.warning("No PIN provided for pairing")
+ return
+
+ # TODO: Use cliraop to complete pairing with the PIN
+ # For now, just clear the pairing in progress flag
+ self.mass.config.set_raw_player_config_value(
+ self.player_id, "_pairing_in_progress", False
+ )
+
+ # TODO: Store the actual credentials obtained from cliraop
+ # self.mass.config.set_raw_player_config_value(
+ # self.player_id, CONF_AP_CREDENTIALS, credentials_from_cliraop
+ # )
+
+ self.logger.info(
+ "Finished AirPlay pairing for %s (TODO: implement actual pairing)",
+ self.display_name,
+ )
+
async def stop(self) -> None:
"""Send STOP command to player."""
- if self.raop_stream and self.raop_stream.session:
+ if self.stream and self.stream.session:
# forward stop to the entire stream session
- await self.raop_stream.session.stop()
+ await self.stream.session.stop()
self._attr_active_source = None
self._attr_current_media = None
self.update_state()
async def play(self) -> None:
"""Send PLAY (unpause) command to player."""
async with self._lock:
- if self.raop_stream and self.raop_stream.running:
- await self.raop_stream.send_cli_command("ACTION=PLAY")
+ if self.stream and self.stream.running:
+ await self.stream.send_cli_command("ACTION=PLAY")
async def pause(self) -> None:
"""Send PAUSE command to player."""
return
async with self._lock:
- if not self.raop_stream or not self.raop_stream.running:
+ if not self.stream or not self.stream.running:
return
- await self.raop_stream.send_cli_command("ACTION=PAUSE")
+ await self.stream.send_cli_command("ACTION=PAUSE")
async def play_media(self, media: PlayerMedia) -> None:
"""Handle PLAY MEDIA on given player."""
if media.media_type == MediaType.ANNOUNCEMENT:
# special case: stream announcement
assert media.custom_data
- input_format = AIRPLAY_PCM_FORMAT
+ pcm_format = AIRPLAY_PCM_FORMAT
audio_source = self.mass.streams.get_announcement_stream(
media.custom_data["announcement_url"],
output_format=AIRPLAY_PCM_FORMAT,
)
elif media.media_type == MediaType.PLUGIN_SOURCE:
# special case: plugin source stream
- input_format = AIRPLAY_PCM_FORMAT
+ pcm_format = AIRPLAY_PCM_FORMAT
assert media.custom_data
audio_source = self.mass.streams.get_plugin_source_stream(
plugin_source_id=media.custom_data["source_id"],
ugp_player = cast("UniversalGroupPlayer", self.mass.players.get(media.source_id))
ugp_stream = ugp_player.stream
assert ugp_stream is not None # for type checker
- input_format = ugp_stream.base_pcm_format
+ pcm_format = ugp_stream.base_pcm_format
audio_source = ugp_stream.subscribe_raw()
elif media.source_id and media.queue_item_id:
# regular queue (flow) stream request
- input_format = AIRPLAY_FLOW_PCM_FORMAT
+ pcm_format = AIRPLAY_FLOW_PCM_FORMAT
queue = self.mass.player_queues.get(media.source_id)
assert queue
start_queue_item = self.mass.player_queues.get_item(
audio_source = self.mass.streams.get_queue_flow_stream(
queue=queue,
start_queue_item=start_queue_item,
- pcm_format=input_format,
+ pcm_format=pcm_format,
)
else:
# assume url or some other direct path
# NOTE: this will fail if its an uri not playable by ffmpeg
- input_format = AIRPLAY_PCM_FORMAT
+ pcm_format = AIRPLAY_PCM_FORMAT
audio_source = get_ffmpeg_stream(
audio_input=media.uri,
input_format=AudioFormat(content_type=ContentType.try_parse(media.uri)),
)
# if an existing stream session is running, we could replace it with the new stream
- if self.raop_stream and self.raop_stream.running:
+ if self.stream and self.stream.running:
# check if we need to replace the stream
- if self.raop_stream.prevent_playback:
+ if self.stream.prevent_playback:
# player is in prevent playback mode, we need to stop the stream
await self.stop()
else:
- await self.raop_stream.session.replace_stream(audio_source)
+ await self.stream.session.replace_stream(audio_source)
return
- # setup RaopStreamSession for player (and its sync childs if any)
+ # setup StreamSession for player (and its sync childs if any)
sync_clients = self._get_sync_clients()
provider = cast("AirPlayProvider", self.provider)
- raop_stream_session = RaopStreamSession(provider, sync_clients, input_format, audio_source)
- await raop_stream_session.start()
+ stream_session = AirPlayStreamSession(provider, sync_clients, pcm_format, audio_source)
+ await stream_session.start()
async def volume_set(self, volume_level: int) -> None:
"""Send VOLUME_SET command to given player."""
- if self.raop_stream and self.raop_stream.running:
- await self.raop_stream.send_cli_command(f"VOLUME={volume_level}\n")
+ if self.stream and self.stream.running:
+ await self.stream.send_cli_command(f"VOLUME={volume_level}\n")
self._attr_volume_level = volume_level
self.update_state()
# store last state in cache
# nothing to do
return
- raop_session = self.raop_stream.session if self.raop_stream else None
+ stream_session = self.stream.session if self.stream else None
# handle removals first
if player_ids_to_remove:
if self.player_id in player_ids_to_remove:
# dissolve the entire sync group
- if self.raop_stream and self.raop_stream.running:
+ if self.stream and self.stream.running:
# stop the stream session if it is running
- await self.raop_stream.session.stop()
+ await self.stream.session.stop()
self._attr_group_members = []
self.update_state()
return
for child_player in self._get_sync_clients():
if child_player.player_id in player_ids_to_remove:
- if raop_session:
- await raop_session.remove_client(child_player)
+ if stream_session:
+ await stream_session.remove_client(child_player)
self._attr_group_members.remove(child_player.player_id)
# handle additions
"AirPlayPlayer | None", self.mass.players.get(player_id)
):
if (
- child_player_to_add.raop_stream
- and child_player_to_add.raop_stream.running
- and child_player_to_add.raop_stream.session != raop_session
+ child_player_to_add.stream
+ and child_player_to_add.stream.running
+ and child_player_to_add.stream.session != stream_session
):
- await child_player_to_add.raop_stream.session.remove_client(child_player_to_add)
+ await child_player_to_add.stream.session.remove_client(child_player_to_add)
- # add new child to the existing raop session (if any)
+ # add new child to the existing stream (RAOP or AirPlay2) session (if any)
self._attr_group_members.append(player_id)
- if raop_session:
- await raop_session.add_client(child_player_to_add)
+ if stream_session:
+ await stream_session.add_client(child_player_to_add)
# always update the state after modifying group members
self.update_state()
def update_volume_from_device(self, volume: int) -> None:
"""Update volume from device feedback."""
ignore_volume_report = (
- self.mass.config.get_raw_player_config_value(self.player_id, CONF_IGNORE_VOLUME, False)
+ self.config.get_value(CONF_IGNORE_VOLUME)
or self.device_info.manufacturer.lower() == "apple"
)
def set_discovery_info(self, discovery_info: AsyncServiceInfo, display_name: str) -> None:
"""Set/update the discovery info for the player."""
self._attr_name = display_name
- self.discovery_info = discovery_info
+ if discovery_info.type == AIRPLAY_DISCOVERY_TYPE:
+ self.airplay_discovery_info = discovery_info
+ elif discovery_info.type == RAOP_DISCOVERY_TYPE:
+ self.raop_discovery_info = discovery_info
+ else: # guard
+ return
cur_address = self.address
new_address = get_primary_ip_address_from_zeroconf(discovery_info)
if new_address is None:
self._attr_device_info.ip_address = new_address
self.update_state()
- def set_state_from_raop(
+ def set_state_from_stream(
self, state: PlaybackState | None = None, elapsed_time: float | None = None
) -> None:
- """Set the playback state from RAOP."""
+ """Set the playback state from stream (RAOP or AirPlay2)."""
if state is not None:
self._attr_playback_state = state
if elapsed_time is not None:
async def on_unload(self) -> None:
"""Handle logic when the player is unloaded from the Player controller."""
await super().on_unload()
- if self.raop_stream:
+ if self.stream:
# stop the stream session if it is running
- if self.raop_stream.running:
- self.mass.create_task(self.raop_stream.session.stop())
- self.raop_stream = None
+ if self.stream.running:
+ self.mass.create_task(self.stream.session.stop())
+ self.stream = None
def _get_sync_clients(self) -> list[AirPlayPlayer]:
"""Get all sync clients for a player."""
--- /dev/null
+"""AirPlay (streaming) Protocols."""
--- /dev/null
+"""Base protocol class for AirPlay streaming implementations."""
+
+from __future__ import annotations
+
+import asyncio
+import os
+import time
+from abc import ABC, abstractmethod
+from contextlib import suppress
+from random import randint
+from typing import TYPE_CHECKING
+
+from music_assistant_models.enums import ContentType, PlaybackState
+from music_assistant_models.media_items import AudioFormat
+
+from music_assistant.constants import VERBOSE_LOG_LEVEL
+from music_assistant.helpers.namedpipe import AsyncNamedPipeWriter
+
+if TYPE_CHECKING:
+ from music_assistant_models.player import PlayerMedia
+
+ from music_assistant.helpers.process import AsyncProcess
+ from music_assistant.providers.airplay.player import AirPlayPlayer
+ from music_assistant.providers.airplay.stream_session import AirPlayStreamSession
+
+
+class AirPlayProtocol(ABC):
+ """Base class for AirPlay streaming protocols (RAOP and AirPlay2).
+
+ This class contains common logic shared between protocol implementations,
+ with abstract methods for protocol-specific behavior.
+ """
+
+ # the pcm audio format used for streaming to this protocol
+ pcm_format = AudioFormat(
+ content_type=ContentType.PCM_S16LE, sample_rate=44100, bit_depth=16, channels=2
+ )
+
+ def __init__(
+ self,
+ session: AirPlayStreamSession,
+ player: AirPlayPlayer,
+ ) -> None:
+ """Initialize base AirPlay protocol.
+
+ Args:
+ session: The stream session managing this protocol instance
+ player: The player to stream to
+ """
+ self.session = session
+ self.prov = session.prov
+ self.mass = session.prov.mass
+ self.player = player
+ # Generate unique ID to prevent race conditions with named pipes
+ self.active_remote_id: str = str(randint(1000, 8000))
+ self.prevent_playback: bool = False
+ self._cli_proc: AsyncProcess | None = None
+ # State tracking
+ self._started = asyncio.Event()
+ self._stopped = False
+ self._total_bytes_sent = 0
+ self._stream_bytes_sent = 0
+ self.audio_named_pipe = (
+ f"/tmp/{player.protocol.value}-{self.player.player_id}-{self.active_remote_id}-audio" # noqa: S108
+ )
+ self.commands_named_pipe = (
+ f"/tmp/{player.protocol.value}-{self.player.player_id}-{self.active_remote_id}-cmd" # noqa: S108
+ )
+ # Async named pipe writers (kept open for session duration)
+ self._audio_pipe: AsyncNamedPipeWriter | None = None
+ self._commands_pipe: AsyncNamedPipeWriter | None = None
+
+ @property
+ def running(self) -> bool:
+ """Return boolean if this stream is running."""
+ return (
+ not self._stopped
+ and self._started.is_set()
+ and self._cli_proc is not None
+ and not self._cli_proc.closed
+ )
+
+ @abstractmethod
+ async def get_ntp(self) -> int:
+ """Get current NTP timestamp from the CLI binary."""
+
+ @abstractmethod
+ async def start(self, start_ntp: int, skip: int = 0) -> None:
+ """Initialize streaming process for the player.
+
+ Args:
+ start_ntp: NTP timestamp to start streaming
+ skip: Number of seconds to skip (for late joiners)
+ """
+
+ async def _open_pipes(self) -> None:
+ """Open both named pipes in non-blocking mode for async I/O."""
+ # Open audio pipe with buffer size optimization
+ self._audio_pipe = AsyncNamedPipeWriter(self.audio_named_pipe, logger=self.player.logger)
+ await self._audio_pipe.open(increase_buffer=True)
+
+ # Open command pipe (no need to increase buffer for small commands)
+ self._commands_pipe = AsyncNamedPipeWriter(
+ self.commands_named_pipe, logger=self.player.logger
+ )
+ await self._commands_pipe.open(increase_buffer=False)
+
+ self.player.logger.debug("Named pipes opened in non-blocking mode for streaming session")
+
+ async def stop(self) -> None:
+ """Stop playback and cleanup."""
+ # Send stop command before setting _stopped flag
+ await self.send_cli_command("ACTION=STOP")
+
+ self._stopped = True
+
+ # Close named pipes (sends EOF to C side, triggering graceful shutdown)
+ if self._audio_pipe is not None:
+ await self._audio_pipe.close()
+ self._audio_pipe = None
+
+ if self._commands_pipe is not None:
+ await self._commands_pipe.close()
+ self._commands_pipe = None
+
+ # Close the CLI process (wait for it to terminate)
+ if self._cli_proc and not self._cli_proc.closed:
+ await self._cli_proc.close(True)
+
+ self.player.set_state_from_stream(state=PlaybackState.IDLE, elapsed_time=0)
+
+ # Remove named pipes from filesystem
+ with suppress(Exception):
+ await asyncio.to_thread(os.remove, self.audio_named_pipe)
+ with suppress(Exception):
+ await asyncio.to_thread(os.remove, self.commands_named_pipe)
+
+ async def write_chunk(self, chunk: bytes) -> None:
+ """
+ Write a (pcm) audio chunk to the stream.
+
+ Writes one second worth of audio data based on the pcm format.
+ Uses non-blocking I/O with asyncio event loop (no thread pool consumption).
+ """
+ if self._audio_pipe is None or not self._audio_pipe.is_open:
+ return
+
+ pipe_write_start = time.time()
+
+ try:
+ await self._audio_pipe.write(chunk)
+ except TimeoutError as e:
+ # Re-raise with player context
+ raise TimeoutError(f"Player {self.player.player_id}: {e}") from e
+
+ pipe_write_elapsed = time.time() - pipe_write_start
+
+ # Log only truly abnormal pipe writes (>5s indicates a real stall)
+ # Normal writes take ~1s due to pipe rate-limiting to playback speed
+ # Can take up to ~4s if player's latency buffer is full
+ if pipe_write_elapsed > 5.0:
+ self.player.logger.error(
+ "!!! STALLED PIPE WRITE: Player %s took %.3fs to write %d bytes to pipe",
+ self.player.player_id,
+ pipe_write_elapsed,
+ len(chunk),
+ )
+
+ async def write_eof(self) -> None:
+ """Write EOF to signal end of stream."""
+ # default implementation simply closes the named pipe
+ # can be overridden with protocol specific implementation if needed
+ if self._audio_pipe is not None:
+ await self._audio_pipe.close()
+ self._audio_pipe = None
+
+ async def send_cli_command(self, command: str) -> None:
+ """Send an interactive command to the running CLI binary using non-blocking I/O."""
+ if self._stopped or not self._cli_proc or self._cli_proc.closed:
+ return
+ if self._commands_pipe is None or not self._commands_pipe.is_open:
+ return
+
+ await self._started.wait()
+
+ if not command.endswith("\n"):
+ command += "\n"
+
+ self.player.logger.log(VERBOSE_LOG_LEVEL, "sending command %s", command)
+ self.player.last_command_sent = time.time()
+
+ # Write command to pipe
+ data = command.encode("utf-8")
+
+ with suppress(BrokenPipeError):
+ try:
+ # Use shorter timeout for commands (1 second per wait iteration)
+ await self._commands_pipe.write(data, timeout_per_wait=1.0)
+ except TimeoutError:
+ self.player.logger.warning("Command pipe write timeout for %s", command.strip())
+
+ async def send_metadata(self, progress: int | None, metadata: PlayerMedia | None) -> None:
+ """Send metadata to player."""
+ if self._stopped:
+ return
+ if metadata:
+ duration = min(metadata.duration or 0, 3600)
+ title = metadata.title or ""
+ artist = metadata.artist or ""
+ album = metadata.album or ""
+ cmd = f"TITLE={title}\nARTIST={artist}\nALBUM={album}\n"
+ cmd += f"DURATION={duration}\nPROGRESS=0\nACTION=SENDMETA\n"
+ await self.send_cli_command(cmd)
+ # get image
+ if metadata.image_url:
+ await self.send_cli_command(f"ARTWORK={metadata.image_url}\n")
+ if progress is not None:
+ await self.send_cli_command(f"PROGRESS={progress}\n")
--- /dev/null
+"""Logic for AirPlay 2 audio streaming to AirPlay devices."""
+
+from __future__ import annotations
+
+import asyncio
+import logging
+import os
+import platform
+
+from music_assistant_models.enums import PlaybackState
+from music_assistant_models.errors import PlayerCommandFailed
+
+from music_assistant.constants import CONF_SYNC_ADJUST, VERBOSE_LOG_LEVEL
+from music_assistant.helpers.process import AsyncProcess
+from music_assistant.providers.airplay.constants import (
+ AIRPLAY2_MIN_LOG_LEVEL,
+ CONF_READ_AHEAD_BUFFER,
+)
+from music_assistant.providers.airplay.helpers import get_cli_binary, get_ntp_timestamp
+
+from ._protocol import AirPlayProtocol
+
+
+class AirPlay2Stream(AirPlayProtocol):
+ """
+ AirPlay 2 Audio Streamer.
+
+ Python is not suitable for realtime audio streaming so we do the actual streaming
+ of audio using a small executable written in C based on owntones to do
+ the actual timestamped playback. It reads pcm audio from a named pipe
+ and we can send some interactive commands using another named pipe.
+ """
+
+ _stderr_reader_task: asyncio.Task[None] | None = None
+
+ async def get_ntp(self) -> int:
+ """Get current NTP timestamp."""
+ # TODO!
+ return get_ntp_timestamp()
+
+ @property
+ def _cli_loglevel(self) -> int:
+ """
+ Return a cliap2 aligned loglevel.
+
+ Ensures that minimum level required for required cliap2 stderr output is respected.
+ """
+ force_verbose: bool = False # just for now
+ mass_level: int = 0
+ match self.prov.logger.level:
+ case logging.CRITICAL:
+ mass_level = 0
+ case logging.ERROR:
+ mass_level = 1
+ case logging.WARNING:
+ mass_level = 2
+ case logging.INFO:
+ mass_level = 3
+ case logging.DEBUG:
+ mass_level = 4
+ if self.prov.logger.isEnabledFor(VERBOSE_LOG_LEVEL):
+ mass_level = 5
+ if force_verbose:
+ mass_level = 5 # always use max log level for now to capture all stderr output
+ return max(mass_level, AIRPLAY2_MIN_LOG_LEVEL)
+
+ async def start(self, start_ntp: int, skip: int = 0) -> None:
+ """Initialize CLI process for a player."""
+ cli_binary = await get_cli_binary(self.player.protocol)
+ assert self.player.discovery_info is not None
+
+ player_id = self.player.player_id
+ sync_adjust = self.mass.config.get_raw_player_config_value(player_id, CONF_SYNC_ADJUST, 0)
+ assert isinstance(sync_adjust, int)
+ read_ahead = await self.mass.config.get_player_config_value(
+ player_id, CONF_READ_AHEAD_BUFFER
+ )
+
+ txt_kv: str = ""
+ for key, value in self.player.discovery_info.decoded_properties.items():
+ txt_kv += f'"{key}={value}" '
+
+ # Note: skip parameter is accepted for API compatibility with base class
+ # but is not currently used by the cliap2 binary (AirPlay2 handles late joiners differently)
+
+ # cliap2 is the binary that handles the actual streaming to the player
+ # this binary leverages from the AirPlay2 support in owntones
+ # https://github.com/music-assistant/cliairplay
+ cli_args = [
+ cli_binary,
+ "--config",
+ os.path.join(os.path.dirname(__file__), "bin", "cliap2.conf"),
+ "--name",
+ self.player.display_name,
+ "--hostname",
+ str(self.player.discovery_info.server),
+ "--address",
+ str(self.player.address),
+ "--port",
+ str(self.player.discovery_info.port),
+ "--txt",
+ txt_kv,
+ "--ntpstart",
+ str(start_ntp),
+ "--latency",
+ str(read_ahead),
+ "--volume",
+ str(self.player.volume_level),
+ "--loglevel",
+ str(self._cli_loglevel),
+ "--pipe",
+ self.audio_named_pipe,
+ ]
+ self.player.logger.debug(
+ "Starting cliap2 process for player %s with args: %s",
+ player_id,
+ cli_args,
+ )
+ self._cli_proc = AsyncProcess(cli_args, stdin=True, stderr=True, name="cliap2")
+ if platform.system() == "Darwin":
+ os.environ["DYLD_LIBRARY_PATH"] = "/usr/local/lib"
+ await self._cli_proc.start()
+ # read up to first num_lines lines of stderr to get the initial status
+ num_lines: int = 50
+ if self.prov.logger.level > logging.INFO:
+ num_lines *= 10
+ for _ in range(num_lines):
+ line = (await self._cli_proc.read_stderr()).decode("utf-8", errors="ignore")
+ self.player.logger.debug(line)
+ if f"airplay: Adding AirPlay device '{self.player.display_name}'" in line:
+ self.player.logger.info("AirPlay device connected. Starting playback.")
+ self._started.set()
+ # Open pipes now that cliraop is ready
+ await self._open_pipes()
+ break
+ if f"The AirPlay 2 device '{self.player.display_name}' failed" in line:
+ raise PlayerCommandFailed("Cannot connect to AirPlay device")
+ # start reading the stderr of the cliap2 process from another task
+ self._stderr_reader_task = self.mass.create_task(self._stderr_reader())
+
+ async def _stderr_reader(self) -> None:
+ """Monitor stderr for the running CLIap2 process."""
+ player = self.player
+ queue = self.mass.players.get_active_queue(player)
+ logger = player.logger
+ lost_packets = 0
+ if not self._cli_proc:
+ return
+ async for line in self._cli_proc.iter_stderr():
+ # TODO @bradkeifer make cliap2 work this way
+ if "elapsed milliseconds:" in line:
+ # this is received more or less every second while playing
+ # millis = int(line.split("elapsed milliseconds: ")[1])
+ # self.player.elapsed_time = (millis / 1000) - self.elapsed_time_correction
+ # self.player.elapsed_time_last_updated = time.time()
+ # NOTE: Metadata is now handled at the session level
+ pass
+ if "set pause" in line or "Pause at" in line:
+ player.set_state_from_stream(state=PlaybackState.PAUSED)
+ if "Restarted at" in line or "restarting w/ pause" in line:
+ player.set_state_from_stream(state=PlaybackState.PLAYING)
+ if "restarting w/o pause" in line:
+ # streaming has started
+ player.set_state_from_stream(state=PlaybackState.PLAYING, elapsed_time=0)
+ if "lost packet out of backlog" in line:
+ lost_packets += 1
+ if lost_packets == 100 and queue:
+ logger.error("High packet loss detected, restarting playback...")
+ self.mass.create_task(self.mass.player_queues.resume(queue.queue_id, False))
+ else:
+ logger.warning("Packet loss detected!")
+ if "end of stream reached" in line:
+ logger.debug("End of stream reached")
+ break
+
+ # log cli stderr output in alignment with mass logging level
+ if "[FATAL]" in line:
+ logger.critical(line)
+ elif "[ LOG]" in line:
+ logger.error(line)
+ elif "[ INFO]" in line:
+ logger.info(line)
+ elif "[ WARN]" in line:
+ logger.warning(line)
+ elif "[DEBUG]" in line:
+ logger.debug(line)
+ elif "[ SPAM]" in line:
+ logger.log(VERBOSE_LOG_LEVEL, line)
+ else: # for now, log unknown lines as error
+ logger.error(line)
+
+ # ensure we're cleaned up afterwards (this also logs the returncode)
+ await self.stop()
--- /dev/null
+"""Logic for RAOP audio streaming to AirPlay devices."""
+
+from __future__ import annotations
+
+import asyncio
+import logging
+
+from music_assistant_models.enums import PlaybackState
+from music_assistant_models.errors import PlayerCommandFailed
+
+from music_assistant.constants import CONF_SYNC_ADJUST, VERBOSE_LOG_LEVEL
+from music_assistant.helpers.process import AsyncProcess, check_output
+from music_assistant.providers.airplay.constants import (
+ CONF_ALAC_ENCODE,
+ CONF_ENCRYPTION,
+ CONF_PASSWORD,
+ CONF_READ_AHEAD_BUFFER,
+)
+from music_assistant.providers.airplay.helpers import get_cli_binary
+
+from ._protocol import AirPlayProtocol
+
+
+class RaopStream(AirPlayProtocol):
+ """
+ RAOP (AirPlay 1) Audio Streamer.
+
+ Python is not suitable for realtime audio streaming so we do the actual streaming
+ of (RAOP) audio using a small executable written in C based on libraop to do
+ the actual timestamped playback, which reads pcm audio from stdin
+ and we can send some interactive commands using a named pipe.
+ """
+
+ _stderr_reader_task: asyncio.Task[None] | None = None
+
+ @property
+ def running(self) -> bool:
+ """Return boolean if this stream is running."""
+ return (
+ not self._stopped
+ and self._started.is_set()
+ and self._cli_proc is not None
+ and not self._cli_proc.closed
+ )
+
+ async def get_ntp(self) -> int:
+ """Get current NTP timestamp from the CLI binary."""
+ cli_binary = await get_cli_binary(self.player.protocol)
+ # TODO: we can potentially also just generate this ourselves?
+ self.prov.logger.debug("Getting NTP timestamp from %s CLI binary", self.player.protocol)
+ _, stdout = await check_output(cli_binary, "-ntp")
+ self.prov.logger.debug(f"Output from ntp check: {stdout.decode().strip()}")
+ return int(stdout.strip())
+
+ async def start(self, start_ntp: int, skip: int = 0) -> None:
+ """Initialize CLIRaop process for a player."""
+ assert self.player.discovery_info is not None # for type checker
+ cli_binary = await get_cli_binary(self.player.protocol)
+
+ extra_args: list[str] = []
+ player_id = self.player.player_id
+ extra_args += ["-if", self.mass.streams.bind_ip]
+ if self.player.config.get_value(CONF_ENCRYPTION, True):
+ extra_args += ["-encrypt"]
+ if self.player.config.get_value(CONF_ALAC_ENCODE, True):
+ extra_args += ["-alac"]
+ for prop in ("et", "md", "am", "pk", "pw"):
+ if prop_value := self.player.discovery_info.decoded_properties.get(prop):
+ extra_args += [f"-{prop}", prop_value]
+ if skip > 0:
+ extra_args += ["-skip", str(skip)]
+ sync_adjust = self.player.config.get_value(CONF_SYNC_ADJUST, 0)
+ assert isinstance(sync_adjust, int)
+ if device_password := self.mass.config.get_raw_player_config_value(
+ player_id, CONF_PASSWORD, None
+ ):
+ extra_args += ["-password", str(device_password)]
+ # Add AirPlay credentials from pyatv pairing if available (for Apple devices)
+ # if raop_credentials := self.player.config.get_value(CONF_AP_CREDENTIALS):
+ # # pyatv AirPlay credentials are in format "identifier:secret_key:other:data"
+ # # cliraop expects just the secret_key (2nd part, 64-char hex string) for -secret
+ # parts = str(raop_credentials).split(":")
+ # if len(parts) >= 2:
+ # # Take the second part (index 1) as the secret key
+ # secret_key = parts[1]
+ # self.prov.logger.debug(
+ # "Using AirPlay credentials for %s: id=%s, secret_len=%d, parts=%d",
+ # self.player.player_id,
+ # parts[0],
+ # len(secret_key),
+ # len(parts),
+ # )
+ # extra_args += ["-secret", secret_key]
+ # else:
+ # # Fallback: assume it's already just the key
+ # self.prov.logger.debug(
+ # "Using AirPlay credentials for %s: single value, length=%d",
+ # self.player.player_id,
+ # len(str(raop_credentials)),
+ # )
+ # extra_args += ["-secret", str(raop_credentials)]
+ if self.prov.logger.isEnabledFor(logging.DEBUG):
+ extra_args += ["-debug", "5"]
+ elif self.prov.logger.isEnabledFor(VERBOSE_LOG_LEVEL):
+ extra_args += ["-debug", "10"]
+ read_ahead = await self.mass.config.get_player_config_value(
+ player_id, CONF_READ_AHEAD_BUFFER
+ )
+ self.player.logger.info("Starting cliraop with latency buffer: %dms", read_ahead)
+
+ # cliraop is the binary that handles the actual raop streaming to the player
+ # this is a slightly modified version of philippe44's libraop
+ # https://github.com/music-assistant/libraop
+ # we use this intermediate binary to do the actual streaming because attempts to do
+ # so using pure python (e.g. pyatv) were not successful due to the realtime nature
+ cliraop_args = [
+ cli_binary,
+ "-ntpstart",
+ str(start_ntp),
+ "-port",
+ str(self.player.discovery_info.port),
+ "-latency",
+ str(read_ahead),
+ "-volume",
+ str(self.player.volume_level),
+ *extra_args,
+ "-dacp",
+ self.prov.dacp_id,
+ "-activeremote",
+ self.active_remote_id,
+ "-cmdpipe",
+ self.commands_named_pipe,
+ "-udn",
+ self.player.discovery_info.name,
+ self.player.address,
+ self.audio_named_pipe,
+ ]
+ self.player.logger.debug(
+ "Starting cliraop process for player %s with args: %s",
+ self.player.player_id,
+ cliraop_args,
+ )
+ self._cli_proc = AsyncProcess(cliraop_args, stdin=False, stderr=True, name="cliraop")
+ await self._cli_proc.start()
+ # read up to first 50 lines of stderr to get the initial status
+ for _ in range(50):
+ line = (await self._cli_proc.read_stderr()).decode("utf-8", errors="ignore")
+ self.player.logger.debug(line)
+ if "connected to " in line:
+ self.player.logger.info("AirPlay device connected. Starting playback.")
+ self._started.set()
+ # Open pipes now that cliraop is ready
+ await self._open_pipes()
+ break
+ if "Cannot connect to AirPlay device" in line:
+ raise PlayerCommandFailed("Cannot connect to AirPlay device")
+ # repeat sending the volume level to the player because some players seem
+ # to ignore it the first time
+ # https://github.com/music-assistant/support/issues/3330
+ await self.send_cli_command(f"VOLUME={self.player.volume_level}\n")
+ # start reading the stderr of the cliraop process from another task
+ self._stderr_reader_task = self.mass.create_task(self._stderr_reader())
+
+ async def _stderr_reader(self) -> None:
+ """Monitor stderr for the running CLIRaop process."""
+ player = self.player
+ logger = player.logger
+ lost_packets = 0
+ if not self._cli_proc:
+ return
+ async for line in self._cli_proc.iter_stderr():
+ if "elapsed milliseconds:" in line:
+ # this is received more or less every second while playing
+ # millis = int(line.split("elapsed milliseconds: ")[1])
+ # self.player.elapsed_time = (millis / 1000) - self.elapsed_time_correction
+ # self.player.elapsed_time_last_updated = time.time()
+ logger.log(VERBOSE_LOG_LEVEL, line)
+ continue
+ if "set pause" in line or "Pause at" in line:
+ player.set_state_from_stream(state=PlaybackState.PAUSED)
+ if "Restarted at" in line or "restarting w/ pause" in line:
+ player.set_state_from_stream(state=PlaybackState.PLAYING)
+ if "restarting w/o pause" in line:
+ # streaming has started
+ player.set_state_from_stream(state=PlaybackState.PLAYING, elapsed_time=0)
+ if "lost packet out of backlog" in line:
+ lost_packets += 1
+ if lost_packets == 100:
+ logger.error("High packet loss detected, restarting playback...")
+ self.mass.create_task(self.mass.players.cmd_resume(self.player.player_id))
+ else:
+ logger.warning("Packet loss detected!")
+ if "end of stream reached" in line:
+ logger.debug("End of stream reached")
+ break
+ logger.debug(line)
+
+ # ensure we're cleaned up afterwards (this also logs the returncode)
+ logger.debug("CLIRaop stderr reader ended")
+ await self.stop()
from zeroconf.asyncio import AsyncServiceInfo
from music_assistant.helpers.datetime import utc
-from music_assistant.helpers.util import get_ip_pton, select_free_port
+from music_assistant.helpers.util import (
+ get_ip_pton,
+ get_primary_ip_address_from_zeroconf,
+ select_free_port,
+)
from music_assistant.models.player_provider import PlayerProvider
-from .constants import CACHE_CATEGORY_PREV_VOLUME, CONF_IGNORE_VOLUME, FALLBACK_VOLUME
-from .helpers import (
- convert_airplay_volume,
- get_cliraop_binary,
- get_model_info,
- get_primary_ip_address_from_zeroconf,
+from .constants import (
+ CACHE_CATEGORY_PREV_VOLUME,
+ CONF_IGNORE_VOLUME,
+ DACP_DISCOVERY_TYPE,
+ FALLBACK_VOLUME,
)
+from .helpers import convert_airplay_volume, get_model_info
from .player import AirPlayPlayer
# TODO: AirPlay provider
class AirPlayProvider(PlayerProvider):
"""Player provider for AirPlay based players."""
- cliraop_bin: str | None
_dacp_server: asyncio.Server
_dacp_info: AsyncServiceInfo
async def handle_async_init(self) -> None:
"""Handle async initialization of the provider."""
- # we locate the cliraop binary here, so we can fail early if it is not available
- self.cliraop_bin: str | None = await get_cliraop_binary()
# register DACP zeroconf service
dacp_port = await select_free_port(39831, 49831)
self.dacp_id = dacp_id = f"{randrange(2**64):X}"
self._dacp_server = await asyncio.start_server(
self._handle_dacp_request, "0.0.0.0", dacp_port
)
- zeroconf_type = "_dacp._tcp.local."
- server_id = f"iTunes_Ctrl_{dacp_id}.{zeroconf_type}"
+ server_id = f"iTunes_Ctrl_{dacp_id}.{DACP_DISCOVERY_TYPE}"
self._dacp_info = AsyncServiceInfo(
- zeroconf_type,
+ DACP_DISCOVERY_TYPE,
name=server_id,
addresses=[await get_ip_pton(str(self.mass.streams.publish_ip))],
port=dacp_port,
# update the latest discovery info for existing player
player.set_discovery_info(info, display_name)
return
- # handle new player
await self._setup_player(player_id, display_name, info)
async def unload(self, is_removed: bool = False) -> None:
self, player_id: str, display_name: str, discovery_info: AsyncServiceInfo
) -> None:
"""Handle setup of a new player that is discovered using mdns."""
- # prefer airplay mdns info as it has more details
- # fallback to raop info if airplay info is not available
- airplay_info = AsyncServiceInfo(
- "_airplay._tcp.local.", discovery_info.name.split("@")[-1].replace("_raop", "_airplay")
- )
+ if discovery_info.type == "_raop._tcp.local.":
+ # RAOP service discovered
+ self.logger.debug("Discovered RAOP service for %s", display_name)
+ # always prefer airplay mdns info as it has more details
+ # fallback to raop info if airplay info is not available,
+ # (old device only announcing raop)
+ airplay_info = AsyncServiceInfo(
+ "_airplay._tcp.local.",
+ discovery_info.name.split("@")[-1].replace("_raop", "_airplay"),
+ )
+ else:
+ # AirPlay service discovered
+ self.logger.debug("Discovered AirPlay service for %s", display_name)
+ airplay_info = discovery_info
if await airplay_info.async_request(self.mass.aiozc.zeroconf, 3000):
manufacturer, model = get_model_info(airplay_info)
else:
self.logger.debug("Ignoring %s in discovery as it is disabled.", display_name)
return
- if "apple tv" in model.lower():
- # For now, we ignore the Apple TV until we implement the authentication.
- # maybe we can simply use pyatv only for this part?
- # the cliraop application has already been prepared to accept the secret.
- self.logger.info(
- "Ignoring %s in discovery because it is not yet supported.", display_name
- )
- return
-
address = get_primary_ip_address_from_zeroconf(discovery_info)
if not address:
return # should not happen, but guard just in case
+ if not discovery_info:
+ return # should not happen, but guard just in case
# if we reach this point, all preflights are ok and we can create the player
self.logger.debug("Discovered AirPlay device %s on %s", display_name, address)
- # append airplay to the default display name for generic (non-apple) devices
- # this makes it easier for users to distinguish between airplay and non-airplay devices
- if manufacturer.lower() != "apple" and "airplay" not in display_name.lower():
- display_name += " (AirPlay)"
-
# Get volume from cache
if not (
volume := await self.mass.cache.get(
):
volume = FALLBACK_VOLUME
+ # Append airplay to the default name for non-apple devices
+ # to make it easier for users to distinguish
+ is_apple = manufacturer.lower() == "apple"
+ if not is_apple and "airplay" not in display_name.lower():
+ display_name += " (AirPlay)"
+
+ self.logger.debug(
+ "Setting up player %s: manufacturer=%s, model=%s",
+ display_name,
+ manufacturer,
+ model,
+ )
+
+ # Create single AirPlayPlayer for all devices
+ # Pairing config entries will be shown conditionally based on device type
player = AirPlayPlayer(
provider=self,
player_id=player_id,
model=model,
initial_volume=volume,
)
+
+ # Final check before registration to handle race conditions
+ # (multiple MDNS events processed in parallel for same device)
+ if self.mass.players.get(player_id):
+ self.logger.debug(
+ "Player %s already registered during setup, skipping registration", player_id
+ )
+ return
+
await self.mass.players.register(player)
async def _handle_dacp_request( # noqa: PLR0915
active_remote = headers.get("Active-Remote")
_, path, _ = headers_split[0].split(" ")
# lookup airplay player by active remote id
- player = next(
+ player: AirPlayPlayer | None = next(
(
x
for x in self.get_players()
- if x.raop_stream and x.raop_stream.active_remote_id == active_remote
+ if x.stream and x.stream.active_remote_id == active_remote
),
None,
)
self.logger.debug(
"DACP request for %s (%s): %s -- %s",
- player.discovery_info.name if player else "UNKNOWN PLAYER",
+ player.name if player else "UNKNOWN PLAYER",
active_remote,
path,
body,
# we've sent or the device requesting a new volume itself.
# In case of a small rounding difference, we ignore this,
# to prevent an endless pingpong of volume changes
- raop_volume = float(path.split("dmcp.device-volume=", 1)[-1])
- volume = convert_airplay_volume(raop_volume)
+ airplay_volume = float(path.split("dmcp.device-volume=", 1)[-1])
+ volume = convert_airplay_volume(airplay_volume)
player.update_volume_from_device(volume)
elif "dmcp.volume=" in path:
# volume change request from device (e.g. volume buttons)
player.update_volume_from_device(volume)
elif "device-prevent-playback=1" in path:
# device switched to another source (or is powered off)
- if raop_stream := player.raop_stream:
- raop_stream.prevent_playback = True
- self.mass.create_task(player.raop_stream.session.remove_client(player))
+ if stream := player.stream:
+ stream.prevent_playback = True
+ self.mass.create_task(player.stream.session.remove_client(player))
elif "device-prevent-playback=0" in path:
# device reports that its ready for playback again
- if raop_stream := player.raop_stream:
- raop_stream.prevent_playback = False
+ if stream := player.stream:
+ stream.prevent_playback = False
# send response
date_str = utc().strftime("%a, %-d %b %Y %H:%M:%S")
+++ /dev/null
-"""Logic for RAOP (AirPlay 1) audio streaming to AirPlay devices."""
-
-from __future__ import annotations
-
-import asyncio
-import logging
-import time
-from collections.abc import AsyncGenerator
-from contextlib import suppress
-from random import randint
-from typing import TYPE_CHECKING
-
-from music_assistant_models.enums import PlaybackState
-from music_assistant_models.errors import PlayerCommandFailed
-
-from music_assistant.constants import CONF_SYNC_ADJUST, VERBOSE_LOG_LEVEL
-from music_assistant.helpers.audio import get_chunksize, get_player_filter_params
-from music_assistant.helpers.ffmpeg import FFMpeg
-from music_assistant.helpers.process import AsyncProcess, check_output
-from music_assistant.helpers.util import TaskManager, close_async_generator
-
-from .constants import (
- AIRPLAY_PCM_FORMAT,
- CONF_ALAC_ENCODE,
- CONF_ENCRYPTION,
- CONF_PASSWORD,
- CONF_READ_AHEAD_BUFFER,
-)
-
-if TYPE_CHECKING:
- from music_assistant_models.media_items import AudioFormat
-
- from .player import AirPlayPlayer
- from .provider import AirPlayProvider
-
-
-class RaopStreamSession:
- """Object that holds the details of a (RAOP) stream session to one or more players."""
-
- def __init__(
- self,
- airplay_provider: AirPlayProvider,
- sync_clients: list[AirPlayPlayer],
- input_format: AudioFormat,
- audio_source: AsyncGenerator[bytes, None],
- ) -> None:
- """Initialize RaopStreamSession."""
- assert sync_clients
- self.prov = airplay_provider
- self.mass = airplay_provider.mass
- self.input_format = input_format
- self.sync_clients = sync_clients
- self._audio_source = audio_source
- self._audio_source_task: asyncio.Task[None] | None = None
- self._lock = asyncio.Lock()
-
- async def start(self) -> None:
- """Initialize RaopStreamSession."""
- # initialize raop stream for all players
-
- # get current ntp and start RaopStream per player
- assert self.prov.cliraop_bin
- _, stdout = await check_output(self.prov.cliraop_bin, "-ntp")
- start_ntp = int(stdout.strip())
- wait_start = 1750 + (250 * len(self.sync_clients))
-
- async def _start_client(raop_player: AirPlayPlayer) -> None:
- # stop existing stream if running
- if raop_player.raop_stream and raop_player.raop_stream.running:
- await raop_player.raop_stream.stop()
-
- raop_player.raop_stream = RaopStream(self, raop_player)
- await raop_player.raop_stream.start(start_ntp, wait_start)
-
- async with TaskManager(self.mass) as tm:
- for _raop_player in self.sync_clients:
- tm.create_task(_start_client(_raop_player))
- self._audio_source_task = asyncio.create_task(self._audio_streamer())
-
- async def stop(self) -> None:
- """Stop playback and cleanup."""
- if self._audio_source_task and not self._audio_source_task.done():
- self._audio_source_task.cancel()
- with suppress(asyncio.CancelledError):
- await self._audio_source_task
- await asyncio.gather(
- *[self.remove_client(x) for x in self.sync_clients],
- return_exceptions=True,
- )
-
- async def remove_client(self, airplay_player: AirPlayPlayer) -> None:
- """Remove a sync client from the session."""
- if airplay_player not in self.sync_clients:
- return
- assert airplay_player.raop_stream
- assert airplay_player.raop_stream.session == self
- async with self._lock:
- self.sync_clients.remove(airplay_player)
- await airplay_player.raop_stream.stop()
- airplay_player.raop_stream = None
- # if this was the last client, stop the session
- if not self.sync_clients:
- await self.stop()
- return
-
- async def add_client(self, airplay_player: AirPlayPlayer) -> None:
- """Add a sync client to the session."""
- # TODO: Add the ability to add a new client to an existing session
- # e.g. by counting the number of frames sent etc.
-
- # temp solution: just restart the whole playback session when new client(s) join
- sync_leader = self.sync_clients[0]
- if not sync_leader.raop_stream or not sync_leader.raop_stream.running:
- return
-
- await self.stop() # we need to stop the current session to add a new client
- # this could potentially be called by multiple players at the exact same time
- # so we debounce the resync a bit here with a timer
- if sync_leader.current_media:
- self.mass.call_later(
- 0.5,
- self.mass.players.cmd_resume(sync_leader.player_id),
- task_id=f"resync_session_{sync_leader.player_id}",
- )
-
- async def replace_stream(self, audio_source: AsyncGenerator[bytes, None]) -> None:
- """Replace the audio source of the stream."""
- # cancel the current audio source task
- assert self._audio_source_task # for type checker
- self._audio_source_task.cancel()
- with suppress(asyncio.CancelledError, RuntimeError):
- await self._audio_source_task
- # set new audio source and restart the stream
- self._audio_source = audio_source
- self._audio_source_task = asyncio.create_task(self._audio_streamer())
- # restart the (player-specific) ffmpeg stream for all players
- # this is the easiest way to ensure the new audio source is used
- # as quickly as possible, without waiting for the buffers to be drained
- # it also allows to change the player settings such as DSP on the fly
- for sync_client in self.sync_clients:
- if not sync_client.raop_stream:
- continue # guard
- sync_client.raop_stream.start_ffmpeg_stream()
-
- async def _audio_streamer(self) -> None:
- """Stream audio to all players."""
- generator_exhausted = False
- try:
- async for chunk in self._audio_source:
- async with self._lock:
- sync_clients = [
- x for x in self.sync_clients if x.raop_stream and x.raop_stream.running
- ]
- if not sync_clients:
- return
- await asyncio.gather(
- *[x.raop_stream.write_chunk(chunk) for x in sync_clients if x.raop_stream],
- return_exceptions=True,
- )
- # entire stream consumed: send EOF
- generator_exhausted = True
- async with self._lock:
- await asyncio.gather(
- *[
- x.raop_stream.write_eof()
- for x in self.sync_clients
- if x.raop_stream and x.raop_stream.running
- ],
- return_exceptions=True,
- )
- except Exception as err:
- logger = self.prov.logger
- logger.error(
- "Stream error: %s",
- str(err) or err.__class__.__name__,
- exc_info=err if logger.isEnabledFor(logging.DEBUG) else None,
- )
- raise
- finally:
- if not generator_exhausted:
- await close_async_generator(self._audio_source)
-
-
-class RaopStream:
- """
- RAOP (AirPlay 1) Audio Streamer.
-
- Python is not suitable for realtime audio streaming so we do the actual streaming
- of (RAOP) audio using a small executable written in C based on libraop to do
- the actual timestamped playback, which reads pcm audio from stdin
- and we can send some interactive commands using a named pipe.
- """
-
- def __init__(
- self,
- session: RaopStreamSession,
- player: AirPlayPlayer,
- ) -> None:
- """Initialize RaopStream."""
- self.session = session
- self.prov = session.prov
- self.mass = session.prov.mass
- self.player = player
-
- # always generate a new active remote id to prevent race conditions
- # with the named pipe used to send audio
- self.active_remote_id: str = str(randint(1000, 8000))
- self.prevent_playback: bool = False
- self._stderr_reader_task: asyncio.Task[None] | None = None
- self._cliraop_proc: AsyncProcess | None = None
- self._ffmpeg_proc: AsyncProcess | None = None
- self._ffmpeg_reader_task: asyncio.Task[None] | None = None
- self._started = asyncio.Event()
- self._stopped = False
- self._total_bytes_sent = 0
- self._stream_bytes_sent = 0
-
- @property
- def running(self) -> bool:
- """Return boolean if this stream is running."""
- return (
- not self._stopped
- and self._started.is_set()
- and self._cliraop_proc is not None
- and not self._cliraop_proc.closed
- )
-
- async def start(self, start_ntp: int, wait_start: int = 1000) -> None:
- """Initialize CLIRaop process for a player."""
- assert self.prov.cliraop_bin
- extra_args: list[str] = []
- player_id = self.player.player_id
- extra_args += ["-if", self.mass.streams.bind_ip]
- if self.mass.config.get_raw_player_config_value(player_id, CONF_ENCRYPTION, False):
- extra_args += ["-encrypt"]
- if self.mass.config.get_raw_player_config_value(player_id, CONF_ALAC_ENCODE, True):
- extra_args += ["-alac"]
- for prop in ("et", "md", "am", "pk", "pw"):
- if prop_value := self.player.discovery_info.decoded_properties.get(prop):
- extra_args += [f"-{prop}", prop_value]
- sync_adjust = self.mass.config.get_raw_player_config_value(player_id, CONF_SYNC_ADJUST, 0)
- assert isinstance(sync_adjust, int)
- if device_password := self.mass.config.get_raw_player_config_value(
- player_id, CONF_PASSWORD, None
- ):
- extra_args += ["-password", str(device_password)]
- if self.prov.logger.isEnabledFor(logging.DEBUG):
- extra_args += ["-debug", "5"]
- elif self.prov.logger.isEnabledFor(VERBOSE_LOG_LEVEL):
- extra_args += ["-debug", "10"]
- read_ahead = await self.mass.config.get_player_config_value(
- player_id, CONF_READ_AHEAD_BUFFER
- )
- # ffmpeg handles the player specific stream + filters and pipes
- # audio to the cliraop process
- self.start_ffmpeg_stream()
-
- # cliraop is the binary that handles the actual raop streaming to the player
- # this is a slightly modified version of philippe44's libraop
- # https://github.com/music-assistant/libraop
- # we use this intermediate binary to do the actual streaming because attempts to do
- # so using pure python (e.g. pyatv) were not successful due to the realtime nature
- # TODO: Either enhance libraop with airplay 2 support or find a better alternative
- cliraop_args = [
- self.prov.cliraop_bin,
- "-ntpstart",
- str(start_ntp),
- "-port",
- str(self.player.discovery_info.port),
- "-wait",
- str(wait_start - sync_adjust),
- "-latency",
- str(read_ahead),
- "-volume",
- str(self.player.volume_level),
- *extra_args,
- "-dacp",
- self.prov.dacp_id,
- "-activeremote",
- self.active_remote_id,
- "-udn",
- self.player.discovery_info.name,
- self.player.address,
- "-",
- ]
- self._cliraop_proc = AsyncProcess(cliraop_args, stdin=True, stderr=True, name="cliraop")
- await self._cliraop_proc.start()
- # read first 20 lines of stderr to get the initial status
- for _ in range(20):
- line = (await self._cliraop_proc.read_stderr()).decode("utf-8", errors="ignore")
- self.player.logger.debug(line)
- if "connected to " in line:
- self._started.set()
- break
- if "Cannot connect to AirPlay device" in line:
- if self._ffmpeg_reader_task:
- self._ffmpeg_reader_task.cancel()
- raise PlayerCommandFailed("Cannot connect to AirPlay device")
- # repeat sending the volume level to the player because some players seem
- # to ignore it the first time
- # https://github.com/music-assistant/support/issues/3330
- await self.send_cli_command(f"VOLUME={self.player.volume_level}\n")
- # start reading the stderr of the cliraop process from another task
- self._stderr_reader_task = self.mass.create_task(self._stderr_reader())
-
- async def stop(self) -> None:
- """Stop playback and cleanup."""
- await self.send_cli_command("ACTION=STOP")
- self._stopped = True
- if self._stderr_reader_task and not self._stderr_reader_task.done():
- self._stderr_reader_task.cancel()
- if self._ffmpeg_reader_task and not self._ffmpeg_reader_task.done():
- self._ffmpeg_reader_task.cancel()
- if self._cliraop_proc and not self._cliraop_proc.closed:
- await self._cliraop_proc.close(True)
- if self._ffmpeg_proc and not self._ffmpeg_proc.closed:
- await self._ffmpeg_proc.close(True)
- self.player.set_state_from_raop(state=PlaybackState.IDLE, elapsed_time=0)
-
- async def write_chunk(self, chunk: bytes) -> None:
- """Write a (pcm) audio chunk."""
- if self._stopped:
- raise RuntimeError("Stream is already stopped")
- await self._started.wait()
- assert self._ffmpeg_proc
- await self._ffmpeg_proc.write(chunk)
-
- async def write_eof(self) -> None:
- """Write EOF."""
- if self._stopped:
- raise RuntimeError("Stream is already stopped")
- await self._started.wait()
- assert self._ffmpeg_proc
- await self._ffmpeg_proc.write_eof()
-
- async def send_cli_command(self, command: str) -> None:
- """Send an interactive command to the running CLIRaop binary."""
- if self._stopped or not self._cliraop_proc or self._cliraop_proc.closed:
- return
- await self._started.wait()
-
- if not command.endswith("\n"):
- command += "\n"
-
- def send_data() -> None:
- with suppress(BrokenPipeError), open(named_pipe, "w") as f:
- f.write(command)
-
- named_pipe = f"/tmp/raop-{self.active_remote_id}" # noqa: S108
- self.player.logger.log(VERBOSE_LOG_LEVEL, "sending command %s", command)
- self.player.last_command_sent = time.time()
- await asyncio.to_thread(send_data)
-
- def start_ffmpeg_stream(self) -> None:
- """Start (or replace) the player-specific ffmpeg stream to feed cliraop."""
- # cancel existing ffmpeg reader task
- if self._ffmpeg_reader_task and not self._ffmpeg_reader_task.done():
- self._ffmpeg_reader_task.cancel()
- if self._ffmpeg_proc and not self._ffmpeg_proc.closed:
- self.mass.create_task(self._ffmpeg_proc.close(True))
- # start new ffmpeg reader task
- self._ffmpeg_reader_task = self.mass.create_task(self._ffmpeg_reader())
-
- async def _ffmpeg_reader(self) -> None:
- """Read audio from the audio source and pipe it to the CLIRaop process."""
- self._ffmpeg_proc = FFMpeg(
- audio_input="-",
- input_format=self.session.input_format,
- output_format=AIRPLAY_PCM_FORMAT,
- filter_params=get_player_filter_params(
- self.mass,
- self.player.player_id,
- self.session.input_format,
- AIRPLAY_PCM_FORMAT,
- ),
- )
- self._stream_bytes_sent = 0
- await self._ffmpeg_proc.start()
- chunksize = get_chunksize(AIRPLAY_PCM_FORMAT)
- # wait for cliraop to be ready
- await asyncio.wait_for(self._started.wait(), 20)
- async for chunk in self._ffmpeg_proc.iter_chunked(chunksize):
- if self._stopped:
- break
- if not self._cliraop_proc or self._cliraop_proc.closed:
- break
- await self._cliraop_proc.write(chunk)
- self._stream_bytes_sent += len(chunk)
- self._total_bytes_sent += len(chunk)
- del chunk
- # we base elapsed time on the amount of bytes sent
- # so we can account for reusing the same session for multiple streams
- self.player.set_state_from_raop(
- elapsed_time=self._stream_bytes_sent / chunksize,
- )
- # if we reach this point, the process exited, most likely because the stream ended
- if self._cliraop_proc and not self._cliraop_proc.closed:
- await self._cliraop_proc.write_eof()
-
- async def _stderr_reader(self) -> None:
- """Monitor stderr for the running CLIRaop process."""
- player = self.player
- logger = player.logger
- lost_packets = 0
- prev_metadata_checksum: str = ""
- prev_progress_report: float = 0
- if not self._cliraop_proc:
- return
- async for line in self._cliraop_proc.iter_stderr():
- if "elapsed milliseconds:" in line:
- # this is received more or less every second while playing
- # millis = int(line.split("elapsed milliseconds: ")[1])
- # self.player.elapsed_time = (millis / 1000) - self.elapsed_time_correction
- # self.player.elapsed_time_last_updated = time.time()
- # send metadata to player(s) if needed
- # NOTE: this must all be done in separate tasks to not disturb audio
- now = time.time()
- if (player.elapsed_time or 0) > 2 and player.current_media:
- metadata_checksum = f"{player.current_media.uri}.{player.current_media.title}.{player.current_media.image_url}" # noqa: E501
- if prev_metadata_checksum != metadata_checksum:
- prev_metadata_checksum = metadata_checksum
- prev_progress_report = now
- self.mass.create_task(self._send_metadata())
- # send the progress report every 5 seconds
- elif now - prev_progress_report >= 5:
- prev_progress_report = now
- self.mass.create_task(self._send_progress())
- if "set pause" in line or "Pause at" in line:
- player.set_state_from_raop(state=PlaybackState.PAUSED)
- if "Restarted at" in line or "restarting w/ pause" in line:
- player.set_state_from_raop(state=PlaybackState.PLAYING)
- if "restarting w/o pause" in line:
- # streaming has started
- player.set_state_from_raop(state=PlaybackState.PLAYING, elapsed_time=0)
- if "lost packet out of backlog" in line:
- lost_packets += 1
- if lost_packets == 100:
- logger.error("High packet loss detected, restarting playback...")
- self.mass.create_task(self.mass.players.cmd_resume(self.player.player_id))
- else:
- logger.warning("Packet loss detected!")
- if "end of stream reached" in line:
- logger.debug("End of stream reached")
- break
- logger.log(VERBOSE_LOG_LEVEL, line)
-
- # ensure we're cleaned up afterwards (this also logs the returncode)
- await self.stop()
-
- async def _send_metadata(self) -> None:
- """Send metadata to player (and connected sync childs)."""
- if not self.player or not self.player.current_media or self._stopped:
- return
- duration = min(self.player.current_media.duration or 0, 3600)
- title = self.player.current_media.title or ""
- artist = self.player.current_media.artist or ""
- album = self.player.current_media.album or ""
- cmd = f"TITLE={title}\nARTIST={artist}\nALBUM={album}\n"
- cmd += f"DURATION={duration}\nPROGRESS=0\nACTION=SENDMETA\n"
-
- await self.send_cli_command(cmd)
-
- # get image
- if not self.player.current_media.image_url or self._stopped:
- return
- await self.send_cli_command(f"ARTWORK={self.player.current_media.image_url}\n")
-
- async def _send_progress(self) -> None:
- """Send progress report to player (and connected sync childs)."""
- if not self.player.current_media or self._stopped:
- return
- progress = int(self.player.corrected_elapsed_time or 0)
- await self.send_cli_command(f"PROGRESS={progress}\n")
--- /dev/null
+"""Unified AirPlay/RAOP stream session logic for AirPlay devices."""
+
+from __future__ import annotations
+
+import asyncio
+import logging
+import time
+from collections.abc import AsyncGenerator
+from contextlib import suppress
+from typing import TYPE_CHECKING
+
+from music_assistant.helpers.audio import get_player_filter_params
+from music_assistant.helpers.ffmpeg import FFMpeg
+from music_assistant.helpers.util import TaskManager, close_async_generator
+from music_assistant.providers.airplay.helpers import unix_time_to_ntp
+
+from .constants import CONF_ENABLE_LATE_JOIN, StreamingProtocol
+from .protocols.airplay2 import AirPlay2Stream
+from .protocols.raop import RaopStream
+
+if TYPE_CHECKING:
+ from music_assistant_models.media_items import AudioFormat
+ from music_assistant_models.player import PlayerMedia
+
+ from .player import AirPlayPlayer
+ from .provider import AirPlayProvider
+
+
+class AirPlayStreamSession:
+ """Stream session (RAOP or AirPlay2) to one or more players."""
+
+ def __init__(
+ self,
+ airplay_provider: AirPlayProvider,
+ sync_clients: list[AirPlayPlayer],
+ pcm_format: AudioFormat,
+ audio_source: AsyncGenerator[bytes, None],
+ ) -> None:
+ """Initialize AirPlayStreamSession.
+
+ Args:
+ airplay_provider: The AirPlay provider instance
+ sync_clients: List of AirPlay players to stream to
+ pcm_format: PCM format of the input stream
+ audio_source: Async generator yielding audio chunks
+ """
+ assert sync_clients
+ self.prov = airplay_provider
+ self.mass = airplay_provider.mass
+ self.pcm_format = pcm_format
+ self.sync_clients = sync_clients
+ self._audio_source = audio_source
+ self._audio_source_task: asyncio.Task[None] | None = None
+ self._player_ffmpeg: dict[str, FFMpeg] = {}
+ self._player_start_chunk: dict[str, int] = {} # Chunk number when player joined
+ self._lock = asyncio.Lock()
+ self.start_ntp: int = 0
+ self.start_time: float = 0.0
+ self.chunks_streamed: int = 0 # Total chunks sent to session (each chunk = 1 second)
+
+ async def start(self) -> None:
+ """Initialize stream session for all players."""
+ # Get current NTP timestamp and calculate wait time
+ cur_time = time.time()
+ wait_start = 1750 + (250 * len(self.sync_clients)) # in milliseconds
+ wait_start_seconds = wait_start / 1000
+ self.wait_start = wait_start_seconds # in seconds
+ self.start_time = cur_time + wait_start_seconds
+ self.start_ntp = unix_time_to_ntp(self.start_time)
+
+ self.prov.logger.info(
+ "Starting stream session with %d clients",
+ len(self.sync_clients),
+ )
+
+ async def _start_client(airplay_player: AirPlayPlayer) -> None:
+ """Start stream for a single client."""
+ # Stop existing stream if running
+ if airplay_player.stream and airplay_player.stream.running:
+ await airplay_player.stream.stop()
+ if ffmpeg := self._player_ffmpeg.pop(airplay_player.player_id, None):
+ await ffmpeg.close()
+ del ffmpeg
+
+ self._player_start_chunk[airplay_player.player_id] = 1
+
+ # Create appropriate stream type based on protocol
+ if airplay_player.protocol == StreamingProtocol.AIRPLAY2:
+ airplay_player.stream = AirPlay2Stream(self, airplay_player)
+ else:
+ airplay_player.stream = RaopStream(self, airplay_player)
+
+ # create optional FFMpeg instance per player if needed
+ # this is used to do any optional DSP processing/filtering
+ filter_params = get_player_filter_params(
+ self.mass,
+ airplay_player.player_id,
+ self.pcm_format,
+ airplay_player.stream.pcm_format,
+ )
+ if filter_params or self.pcm_format != airplay_player.stream.pcm_format:
+ ffmpeg = FFMpeg(
+ audio_input="-",
+ input_format=self.pcm_format,
+ output_format=airplay_player.stream.pcm_format,
+ filter_params=filter_params,
+ )
+ await ffmpeg.start()
+ self._player_ffmpeg[airplay_player.player_id] = ffmpeg
+
+ await airplay_player.stream.start(self.start_ntp)
+
+ # Tracking will be initialized on first write
+
+ async with TaskManager(self.mass) as tm:
+ for _airplay_player in self.sync_clients:
+ tm.create_task(_start_client(_airplay_player))
+ # Start audio source streamer task
+ # this will read from the audio source and distribute to all players
+ self._audio_source_task = asyncio.create_task(self._audio_streamer())
+
+ async def stop(self) -> None:
+ """Stop playback and cleanup."""
+ if self._audio_source_task and not self._audio_source_task.done():
+ self._audio_source_task.cancel()
+ with suppress(asyncio.CancelledError):
+ await self._audio_source_task
+ await asyncio.gather(
+ *[self.remove_client(x) for x in self.sync_clients],
+ return_exceptions=True,
+ )
+
+ async def remove_client(self, airplay_player: AirPlayPlayer) -> None:
+ """Remove a sync client from the session."""
+ if airplay_player not in self.sync_clients:
+ return
+ assert airplay_player.stream
+ assert airplay_player.stream.session == self
+ async with self._lock:
+ self.sync_clients.remove(airplay_player)
+ if ffmpeg := self._player_ffmpeg.pop(airplay_player.player_id, None):
+ await ffmpeg.close()
+ del ffmpeg
+ # Clean up player tracking
+ self._player_start_chunk.pop(airplay_player.player_id, None)
+ await airplay_player.stream.stop()
+ airplay_player.stream = None
+ # If this was the last client, stop the session
+ if not self.sync_clients:
+ await self.stop()
+ return
+
+ async def add_client(self, airplay_player: AirPlayPlayer) -> None:
+ """Add a sync client to the session as a late joiner.
+
+ The late joiner will:
+ 1. Start playing at a compensated NTP timestamp (start_ntp + offset)
+ 2. Receive silence calculated dynamically based on how much audio has been sent
+ 3. Then receive real audio chunks in sync with other players
+ """
+ sync_leader = self.sync_clients[0]
+ if not sync_leader.stream or not sync_leader.stream.running:
+ return
+
+ allow_late_join = self.prov.config.get_value(CONF_ENABLE_LATE_JOIN, False)
+ if not allow_late_join:
+ # Late joining is not allowed - restart the session for all players
+ await self.stop() # we need to stop the current session to add a new client
+ # this could potentially be called by multiple players at the exact same time
+ # so we debounce the resync a bit here with a timer
+ if sync_leader.current_media:
+ self.mass.call_later(
+ 0.5,
+ self.mass.players.cmd_resume(sync_leader.player_id),
+ task_id=f"resync_session_{sync_leader.player_id}",
+ )
+
+ # Stop existing stream if the player is already streaming
+ if airplay_player.stream and airplay_player.stream.running:
+ await airplay_player.stream.stop()
+
+ # Clean up any existing FFmpeg instance for this player
+ if ffmpeg := self._player_ffmpeg.pop(airplay_player.player_id, None):
+ await ffmpeg.close()
+ del ffmpeg
+
+ # Create appropriate stream type based on protocol
+ if airplay_player.protocol == StreamingProtocol.AIRPLAY2:
+ airplay_player.stream = AirPlay2Stream(self, airplay_player)
+ else:
+ airplay_player.stream = RaopStream(self, airplay_player)
+
+ # Create optional FFMpeg instance per player if needed
+ filter_params = get_player_filter_params(
+ self.mass,
+ airplay_player.player_id,
+ self.pcm_format,
+ airplay_player.stream.pcm_format,
+ )
+ if filter_params or self.pcm_format != airplay_player.stream.pcm_format:
+ ffmpeg = FFMpeg(
+ audio_input="-",
+ input_format=self.pcm_format,
+ output_format=airplay_player.stream.pcm_format,
+ filter_params=filter_params,
+ )
+ await ffmpeg.start()
+ self._player_ffmpeg[airplay_player.player_id] = ffmpeg
+
+ # Snapshot chunks_streamed inside lock to prevent race conditions
+ # Keep lock held during stream.start() to ensure player doesn't miss any chunks
+ async with self._lock:
+ # Calculate skip_seconds based on how many chunks have been sent
+ skip_seconds = self.chunks_streamed
+
+ # Add player to sync clients list
+ if airplay_player not in self.sync_clients:
+ self.sync_clients.append(airplay_player)
+
+ await airplay_player.stream.start(self.start_ntp, skip_seconds)
+
+ async def replace_stream(self, audio_source: AsyncGenerator[bytes, None]) -> None:
+ """Replace the audio source of the stream."""
+ # Cancel the current audio source task
+ assert self._audio_source_task # for type checker
+ self._audio_source_task.cancel()
+ with suppress(asyncio.CancelledError):
+ await self._audio_source_task
+ # Set new audio source and restart the stream
+ self._audio_source = audio_source
+ self._audio_source_task = asyncio.create_task(self._audio_streamer())
+ # Restart the (player-specific) ffmpeg stream for all players
+ # This is the easiest way to ensure the new audio source is used
+ # as quickly as possible, without waiting for the buffers to be drained
+ # It also allows changing the player settings such as DSP on the fly
+ # for sync_client in self.sync_clients:
+ # if not sync_client.stream:
+ # continue # guard
+ # sync_client.stream.start_ffmpeg_stream()
+
+ async def _audio_streamer(self) -> None:
+ """Stream audio to all players."""
+ generator_exhausted = False
+ _last_metadata: str | None = None
+ try:
+ # each chunk is exactly one second of audio data based on the pcm format.
+ async for chunk in self._audio_source:
+ async with self._lock:
+ sync_clients = [x for x in self.sync_clients if x.stream and x.stream.running]
+ if not sync_clients:
+ self.prov.logger.error(
+ "!!! AUDIO STREAMER EXITING: No running clients left! "
+ "Total sync_clients: %d, Details: %s",
+ len(self.sync_clients),
+ [
+ (x.player_id, x.stream.running if x.stream else None)
+ for x in self.sync_clients
+ ],
+ )
+ return
+
+ # Write to all players with a timeout (10 seconds)
+ # Timeout must account for player's internal latency buffer (1-4 seconds)
+ # The player may legitimately not accept data while draining its buffer
+ write_start = time.time()
+ write_tasks = [
+ asyncio.wait_for(self._write_chunk_to_player(x, chunk), timeout=10.0)
+ for x in sync_clients
+ if x.stream
+ ]
+ results = await asyncio.gather(*write_tasks, return_exceptions=True)
+ write_elapsed = time.time() - write_start
+
+ # Check for write errors or timeouts
+ players_to_remove = []
+ for i, result in enumerate(results):
+ if i >= len(sync_clients):
+ continue
+ player = sync_clients[i]
+
+ if isinstance(result, asyncio.TimeoutError):
+ self.prov.logger.error(
+ "!!! TIMEOUT writing chunk %d to player %s - "
+ "REMOVING from sync group! Total write time=%.3fs",
+ self.chunks_streamed,
+ player.player_id,
+ write_elapsed,
+ )
+ players_to_remove.append(player)
+ elif isinstance(result, Exception):
+ self.prov.logger.error(
+ "!!! Error writing chunk %d to player %s: %s - "
+ "REMOVING from sync group! Total write time=%.3fs",
+ self.chunks_streamed,
+ player.player_id,
+ result,
+ write_elapsed,
+ )
+ players_to_remove.append(player)
+
+ # Remove failed/timed-out players from sync group
+ for player in players_to_remove:
+ if player in self.sync_clients:
+ self.sync_clients.remove(player)
+ self.prov.logger.warning(
+ "Player %s removed from sync group due to write failure/timeout",
+ player.player_id,
+ )
+ # Stop the player's stream
+ if player.stream:
+ self.mass.create_task(player.stream.stop())
+
+ # Update chunk counter (each chunk is exactly one second of audio)
+ self.chunks_streamed += 1
+
+ # send metadata if changed
+ # do this in a separate task to not disturb audio streaming
+ # NOTE: we should probably move this out of the audio stream task into it's own task
+ if (
+ self.sync_clients
+ and (_leader := self.sync_clients[0])
+ and (_leader.corrected_elapsed_time or 0) > 2
+ and (metadata := _leader.current_media) is not None
+ ):
+ now = time.time()
+ metadata_checksum = f"{metadata.uri}.{metadata.title}.{metadata.image_url}"
+ progress = metadata.corrected_elapsed_time or 0
+ if _last_metadata != metadata_checksum:
+ _last_metadata = metadata_checksum
+ prev_progress_report = now
+ self.mass.create_task(self._send_metadata(progress, metadata))
+ # send the progress report every 5 seconds
+ elif now - prev_progress_report >= 5:
+ prev_progress_report = now
+ self.mass.create_task(self._send_metadata(progress, None))
+ # Entire stream consumed: send EOF
+ generator_exhausted = True
+ async with self._lock:
+ await asyncio.gather(
+ *[
+ self._write_eof_to_player(x)
+ for x in self.sync_clients
+ if x.stream and x.stream.running
+ ],
+ return_exceptions=True,
+ )
+ except Exception as err:
+ logger = self.prov.logger
+ logger.error(
+ "Stream error: %s",
+ str(err) or err.__class__.__name__,
+ exc_info=err if logger.isEnabledFor(logging.DEBUG) else None,
+ )
+ raise
+ finally:
+ if not generator_exhausted:
+ await close_async_generator(self._audio_source)
+
+ async def _write_chunk_to_player(self, airplay_player: AirPlayPlayer, chunk: bytes) -> None:
+ """
+ Write audio chunk to a specific player.
+
+ each chunk is exactly one second of audio data based on the pcm format.
+ For late joiners, compensates for chunks sent between join time and actual chunk delivery.
+ Blocks (async) until the data has been written.
+ """
+ write_start = time.time()
+ chunk_number = self.chunks_streamed + 1
+ player_id = airplay_player.player_id
+
+ # Calculate chunk offset based on actual time vs start time
+ self._player_start_chunk.pop(player_id, None)
+
+ # if the player has an associated FFMpeg instance, use that first
+ if ffmpeg := self._player_ffmpeg.get(player_id):
+ await ffmpeg.write(chunk)
+ chunk_to_send = await ffmpeg.read(len(chunk))
+ else:
+ chunk_to_send = chunk
+
+ assert airplay_player.stream
+ stream_write_start = time.time()
+ await airplay_player.stream.write_chunk(chunk_to_send)
+ stream_write_elapsed = time.time() - stream_write_start
+
+ total_elapsed = time.time() - write_start
+
+ # Log only truly abnormal writes (>5s indicates a real stall)
+ # Can take up to ~4s if player's latency buffer is being drained
+ if total_elapsed > 5.0:
+ self.prov.logger.error(
+ "!!! STALLED WRITE: Player %s chunk %d took %.3fs total (stream write: %.3fs)",
+ player_id,
+ chunk_number,
+ total_elapsed,
+ stream_write_elapsed,
+ )
+
+ async def _write_eof_to_player(self, airplay_player: AirPlayPlayer) -> None:
+ """Write EOF to a specific player."""
+ # cleanup any associated FFMpeg instance first
+ if ffmpeg := self._player_ffmpeg.pop(airplay_player.player_id, None):
+ await ffmpeg.write_eof()
+ await ffmpeg.close()
+ assert airplay_player.stream
+ await airplay_player.stream.write_eof()
+
+ async def _send_metadata(self, progress: int | None, metadata: PlayerMedia | None) -> None:
+ """Send metadata to all players."""
+ async with self._lock:
+ await asyncio.gather(
+ *[
+ x.stream.send_metadata(progress, metadata)
+ for x in self.sync_clients
+ if x.stream and x.stream.running
+ ],
+ return_exceptions=True,
+ )