import socket
import time
from collections.abc import AsyncGenerator
+from contextlib import suppress
+from dataclasses import dataclass
from random import randint, randrange
-from typing import TYPE_CHECKING, cast
-
-from pyatv import connect, interface, scan
-from pyatv.conf import AppleTV as ATVConf
-from pyatv.const import DeviceModel, DeviceState, PowerState, Protocol
-from pyatv.convert import model_str
-from pyatv.interface import AppleTV as AppleTVInterface
-from pyatv.interface import DeviceListener
-from pyatv.protocols.raop import RaopStream
+from typing import TYPE_CHECKING
+
+from zeroconf import ServiceStateChange
from zeroconf.asyncio import AsyncServiceInfo
from music_assistant.common.helpers.datetime import utc
from music_assistant.common.models.media_items import AudioFormat
from music_assistant.common.models.player import DeviceInfo, Player
from music_assistant.common.models.player_queue import PlayerQueue
-from music_assistant.server.helpers.process import AsyncProcess, check_output
+from music_assistant.server.helpers.process import check_output
from music_assistant.server.models.player_provider import PlayerProvider
if TYPE_CHECKING:
DOMAIN = "airplay"
CONF_LATENCY = "latency"
+DEFAULT_LATENCY = 2000
CONF_ENCRYPTION = "encryption"
CONF_ALAC_ENCODE = "alac_encode"
CONF_VOLUME_START = "volume_start"
ConfigEntry(
key=CONF_LATENCY,
type=ConfigEntryType.INTEGER,
- range=(200, 3000),
- default_value=1200,
+ range=(500, 4000),
+ default_value=DEFAULT_LATENCY,
label="Latency",
description="Sets the number of milliseconds of audio buffer in the player. "
- "This is important to absorb network throughput jitter. "
- "Note that the resume after pause will be skipping that amount of time "
- "and volume changes will be delayed by the same amount, when using digital volume.",
+ "This is important to absorb network throughput jitter. \n"
+ "Increase this value if you notice network dropouts at the cost of a slower "
+ "response to commands.",
advanced=True,
),
ConfigEntry(
"(lossless) ALAC at the cost of a bit CPU.",
advanced=True,
),
- ConfigEntry(
- key=CONF_VOLUME_START,
- type=ConfigEntryType.BOOLEAN,
- default_value=False,
- label="Send volume at playback start",
- description="Some players require to send/confirm the volume when playback starts. \n"
- "Enable this setting if the playback volume does not match the MA interface.",
- advanced=True,
- ),
ConfigEntry(
key=CONF_SYNC_ADJUST,
type=ConfigEntryType.INTEGER,
BACKOFF_TIME_UPPER_LIMIT = 300 # Five minutes
CONF_CREDENTIALS = "credentials"
+CACHE_KEY_PREV_VOLUME = "airplay_prev_volume"
+FALLBACK_VOLUME = 20
async def setup(
return int(portion + normal_min)
-class AirPlayPlayer(DeviceListener):
- """Holds the connection to the apyatv instance and the cliraop."""
+def get_model_from_am(am_property: str | None) -> tuple[str, str]:
+ """Return Manufacturer and Model name from mdns AM property."""
+ manufacturer = "Unknown"
+ model = "Generic Airplay device"
+ if not am_property:
+ return (manufacturer, model)
+ if isinstance(am_property, bytes):
+ am_property = am_property.decode("utf-8")
+ if am_property == "AudioAccessory5,1":
+ model = "HomePod"
+ manufacturer = "Apple"
+ elif "AppleTV" in am_property:
+ model = "Apple TV"
+ manufacturer = "Apple"
+ else:
+ model = am_property
+ return (manufacturer, model)
+
+
+class AirplayStreamJob:
+ """Object that holds the details of a stream job."""
+
+ def __init__(self, prov: AirplayProvider, airplay_player: AirPlayPlayer) -> None:
+ """Initialize AirplayStreamJob."""
+ self.prov = prov
+ self.mass = prov.mass
+ self.airplay_player = airplay_player
+ # always generate a new active remote id to prevent race conditions
+ # with the named pipe used to send commands
+ self.active_remote_id: str = str(randint(1000, 8000))
+ self.start_ntp: int | None = None # use as checksum
+ self._log_reader_task: asyncio.Task | None = None
+ self._cliraop_proc: asyncio.subprocess.Process | None = None
- def __init__(
- self, mass: MusicAssistant, player_id: str, discovery_info: interface.BaseConfig
- ) -> None:
- """Initialize power manager."""
- self.player_id = player_id
- self.discovery_info = discovery_info
- self.mass = mass
- self.atv: AppleTVInterface | None = None
- self.connected = False
- self._connection_attempts = 0
- self._connection_was_lost = False
- self._playing: interface.Playing | None = None
- self.logger = self.mass.players.logger.getChild("airplay").getChild(self.player_id)
- self.cliraop_proc: AsyncProcess | None = None
- self.active_remote_id = str(randint(1000, 8000))
- self.optimistic_state: PlayerState = PlayerState.IDLE
-
- def connection_lost(self, _):
- """Device was unexpectedly disconnected.
-
- This is a callback function from pyatv.interface.DeviceListener.
- """
- self.logger.warning('Connection lost to Apple TV "%s"', self.discovery_info.name)
- self._connection_was_lost = True
- self._handle_disconnect()
+ @property
+ def running(self) -> bool:
+ """Return bool if we're running."""
+ return self._cliraop_proc and self._cliraop_proc.returncode is None
- def connection_closed(self):
- """Device connection was (intentionally) closed.
+ async def init_cliraop(self, start_ntp: int) -> None:
+ """Initialize CLIRaop process for a player."""
+ self.start_ntp = start_ntp
+ extra_args = []
+ player_id = self.airplay_player.player_id
+ mass_player = self.mass.players.get(player_id)
+ latency = self.mass.config.get_raw_player_config_value(
+ player_id, CONF_LATENCY, DEFAULT_LATENCY
+ )
+ extra_args += ["-l", str(latency)]
+ if self.mass.config.get_raw_player_config_value(player_id, CONF_ENCRYPTION, False):
+ extra_args += ["-e"]
+ if self.mass.config.get_raw_player_config_value(player_id, CONF_ALAC_ENCODE, True):
+ extra_args += ["-a"]
+ sync_adjust = self.mass.config.get_raw_player_config_value(player_id, CONF_SYNC_ADJUST, 0)
+ if device_password := self.mass.config.get_raw_player_config_value(
+ player_id, CONF_PASSWORD, None
+ ):
+ # NOTE: This may not work as we might need to do
+ # some fancy hashing with the plain password first?!
+ extra_args += ["-P", device_password]
+ if self.airplay_player.logger.level == logging.DEBUG:
+ extra_args += ["-d", "5"]
- This is a callback function from pyatv.interface.DeviceListener.
- """
- self.connected = False
- self._handle_disconnect()
-
- def _handle_disconnect(self):
- """Handle that the device disconnected and restart connect loop."""
- self.connected = False
- if self.atv:
- self.atv.close()
- self.atv = None
-
- async def connect(self):
- """Connect to device."""
- if self.connected:
- return
- try:
- await self._connect(self.discovery_info)
- except Exception:
- # retry with scanning for the device
- if conf := await self._scan():
- await self._connect(conf)
- raise
-
- async def disconnect(self):
- """Disconnect from device."""
- self.logger.debug("Disconnecting from device")
- self.is_on = False
- self.connected = False
- try:
- if self.atv:
- self.atv.close()
- self.atv = None
- except Exception: # pylint: disable=broad-except
- self.logger.exception("An error occurred while disconnecting")
+ args = [
+ self.prov.cliraop_bin,
+ "-n",
+ str(start_ntp),
+ "-p",
+ str(self.airplay_player.discovery_info.port),
+ "-w",
+ str(2500 - sync_adjust),
+ "-v",
+ str(mass_player.volume_level),
+ *extra_args,
+ "-dacp",
+ self.prov.dacp_id,
+ "-ar",
+ self.active_remote_id,
+ "-md",
+ self.airplay_player.discovery_info.decoded_properties["md"],
+ "-et",
+ self.airplay_player.discovery_info.decoded_properties["et"],
+ str(self.airplay_player.discovery_info.parsed_addresses()[0]),
+ "-",
+ ]
+ if platform.system() == "Darwin":
+ os.environ["DYLD_LIBRARY_PATH"] = "/usr/local/lib"
+ self._cliraop_proc = await asyncio.create_subprocess_exec(
+ *args,
+ stdin=asyncio.subprocess.PIPE,
+ stderr=asyncio.subprocess.PIPE,
+ close_fds=True,
+ )
+ self._log_reader_task = asyncio.create_task(self._log_watcher())
async def stop(self):
- """Stop playback and cleanup any running CLIRaop Process."""
- if self.cliraop_proc and not self.cliraop_proc.closed:
- # prefer interactive command to our streamer
- await self.send_cli_command("ACTION=STOP")
- await self.cliraop_proc.wait()
- self.optimistic_state = PlayerState.IDLE
- self.update_attributes()
- elif atv := self.atv:
- await atv.remote_control.stop()
+ """Stop playback and cleanup."""
+ if not self.running:
+ return
+ # prefer interactive command to our streamer
+ await self.send_cli_command("ACTION=STOP")
+ # use communicate to clear stdin/stdout and wait for exit
+ await self._cliraop_proc.wait()
+ # stop background task
+ if self._log_reader_task and not self._log_reader_task.done():
+ self._log_reader_task.cancel()
async def send_cli_command(self, command: str) -> None:
"""Send an interactive command to the running CLIRaop binary."""
- if not self.cliraop_proc or self.cliraop_proc.closed:
+ if not self.running:
return
named_pipe = f"/tmp/fifo-{self.active_remote_id}" # noqa: S108
with open(named_pipe, "w") as f:
f.write(command)
- self.logger.debug("sending command %s", command)
+ self.airplay_player.logger.debug("sending command %s", command)
await self.mass.create_task(send_data)
- async def _scan(self) -> ATVConf | None:
- """Try to find device by scanning for it."""
- address: str = self.discovery_info.address
-
- self.logger.debug("Discovering device %s", self.discovery_info.name)
- atvs = await scan(
- self.mass.loop,
- identifier=self.discovery_info.identifier,
- aiozc=self.mass.aiozc,
- hosts=[address],
+ async def _log_watcher(self) -> None:
+ """Monitor stderr for the running CLIRaop process."""
+ airplay_player = self.airplay_player
+ mass_player = self.mass.players.get(airplay_player.player_id)
+ logger = airplay_player.logger
+ airplay_player.logger.debug("Starting log watcher task...")
+ async for line in self._cliraop_proc.stderr:
+ line = line.decode().strip() # noqa: PLW2901
+ if not line:
+ continue
+ logger.debug(line)
+ if "set pause" in line:
+ mass_player.state = PlayerState.PAUSED
+ self.mass.players.update(airplay_player.player_id)
+ elif "Restarted at" in line:
+ mass_player.state = PlayerState.PLAYING
+ self.mass.players.update(airplay_player.player_id)
+ elif "after start), played" in line:
+ millis = int(line.split("played ")[1].split(" ")[0])
+ mass_player.elapsed_time = millis / 1000
+ mass_player.elapsed_time_last_updated = time.time()
+ elif "restarting w/o pause" in line:
+ # streaming has started
+ mass_player.state = PlayerState.PLAYING
+ mass_player.elapsed_time = 0
+ mass_player.elapsed_time_last_updated = time.time()
+ self.mass.players.update(airplay_player.player_id)
+
+ # if we reach this point, the process exited
+ airplay_player.logger.debug("Log watcher task finished...")
+ mass_player.state = PlayerState.IDLE
+ self.mass.players.update(airplay_player.player_id)
+ logger.debug(
+ "CLIRaop process stopped with errorcode %s",
+ self._cliraop_proc.returncode,
)
- if atvs:
- return cast(ATVConf, atvs[0])
- self.logger.debug(
- "Failed to find device %s with address %s",
- self.discovery_info.name,
- address,
- )
- return None
+ async def write_chunk(self, data: bytes) -> None:
+ """Write a chunk of (pcm) data to the stdin of CLIRaop."""
+ if not self.running or not self._cliraop_proc.stdin.can_write_eof():
+ return
+ self._cliraop_proc.stdin.write(data)
+ if not self.running or not self._cliraop_proc.stdin.can_write_eof():
+ return
+ with suppress(BrokenPipeError):
+ await self._cliraop_proc.stdin.drain()
- async def _connect(self, conf: ATVConf) -> None:
- """Connect to device."""
- credentials: dict[int, str | None] = self.mass.config.get_raw_player_config_value(
- self.player_id, CONF_CREDENTIALS, {}
- )
- name: str = self.discovery_info.name
- missing_protocols = []
- for protocol_int, creds in credentials.items():
- protocol = Protocol(int(protocol_int))
- if conf.get_service(protocol) is not None:
- conf.set_credentials(protocol, creds) # type: ignore[arg-type]
- else:
- missing_protocols.append(protocol.name)
-
- if missing_protocols:
- missing_protocols_str = ", ".join(missing_protocols)
- self.logger.info(
- "Protocol(s) %s not yet found for %s, trying later",
- missing_protocols_str,
- name,
- )
+ async def write_eof(self, data: bytes) -> None:
+ """Write a chunk of (pcm) data to the stdin of CLIRaop."""
+ if not self.running or not self._cliraop_proc.stdin.can_write_eof():
+ return
+ self._cliraop_proc.stdin.write_eof()
+ if not self.running or not self._cliraop_proc.stdin.can_write_eof():
return
+ with suppress(BrokenPipeError):
+ await self._cliraop_proc.stdin.drain()
- self.logger.debug("Connecting to device %s", name)
- session = self.mass.http_session
- self.atv = await connect(conf, self.mass.loop, session=session)
- self.connected = True
- self.atv.power.listener = self
- self.atv.listener = self
- self.atv.audio.listener = self
- if self.atv.features.in_state(
- interface.FeatureState.Available, interface.FeatureName.PushUpdates
- ):
- self.atv.push_updater.listener = self
- self.atv.push_updater.start()
- self.address_updated(str(conf.address))
+@dataclass
+class AirPlayPlayer:
+ """Holds the details of the (discovered) Airplay (RAOP) player."""
- self._setup_device()
- self.update_attributes()
-
- self._connection_attempts = 0
- if self._connection_was_lost:
- self.logger.info(
- 'Connection was (re)established to device "%s"',
- name,
- )
- self._connection_was_lost = False
-
- def _setup_device(self):
- if not (mass_player := self.mass.players.get(self.player_id)):
- mass_player = Player(
- player_id=self.player_id,
- provider=DOMAIN,
- type=PlayerType.PLAYER,
- name=self.discovery_info.name,
- available=True,
- powered=False,
- device_info=DeviceInfo(
- model=self.discovery_info.device_info.raw_model,
- manufacturer="Apple",
- address=str(self.discovery_info.address),
- ),
- supported_features=(
- PlayerFeature.PAUSE,
- PlayerFeature.SYNC,
- PlayerFeature.VOLUME_SET,
- PlayerFeature.POWER,
- ),
- max_sample_rate=44100,
- supports_24bit=False,
- )
- if self.atv:
- dev_info = self.atv.device_info
- mass_player.device_info = DeviceInfo(
- model=(
- dev_info.raw_model
- if dev_info.model == DeviceModel.Unknown and dev_info.raw_model
- else model_str(dev_info.model)
- ),
- manufacturer="Apple",
- address=str(self.discovery_info.address),
- )
- self.mass.players.register_or_update(mass_player)
-
- def playstatus_update(self, _, playstatus: interface.Playing) -> None:
- """Inform about changes to what is currently playing."""
- self.logger.debug("Playstatus received: %s", playstatus)
- self._playing = playstatus
- self.update_attributes()
-
- def playstatus_error(self, updater, exception: Exception) -> None:
- """Inform about an error when updating play status."""
- self.logger.debug("Playstatus error received", exc_info=exception)
- self._playing = None
- self.update_attributes()
-
- def powerstate_update(self, old_state: PowerState, new_state: PowerState) -> None:
- """Update power state when it changes."""
- self.update_attributes()
-
- def volume_update(self, old_level: float, new_level: float) -> None:
- """Update volume when it changes."""
- self.update_attributes()
-
- def update_attributes(self) -> None:
- """Update the player attributes."""
- mass_player = self.mass.players.get(self.player_id)
- mass_player.volume_level = int(self.atv.audio.volume)
- mass_player.powered = self.connected or self.cliraop_proc and not self.cliraop_proc.closed
- if self.cliraop_proc and not self.cliraop_proc.closed:
- mass_player.state = self.optimistic_state
- # NOTE: alapsed time is pushed from cliraop
- elif self.atv is None or not self.connected:
- mass_player.powered = False
- mass_player.state = PlayerState.IDLE
- elif self._playing:
- state = self._playing.device_state
- if state in (DeviceState.Idle, DeviceState.Loading):
- mass_player.state = PlayerState.IDLE
- elif state == DeviceState.Playing:
- mass_player.state = PlayerState.PLAYING
- elif state in (DeviceState.Paused, DeviceState.Seeking, DeviceState.Stopped):
- mass_player.state = PlayerState.PAUSED
- else:
- mass_player.state = PlayerState.IDLE
- mass_player.elapsed_time = self._playing.position or 0
- mass_player.elapsed_time_last_updated = time.time()
- mass_player.current_item_id = self._playing.content_identifier
- else:
- mass_player.state = PlayerState.IDLE
- self.mass.players.update(self.player_id)
-
- def address_updated(self, address):
- """Update cached address in config entry."""
- self.logger.debug("Changing address to %s", address)
- self._setup_device()
+ player_id: str
+ discovery_info: AsyncServiceInfo
+ logger: logging.Logger
+ active_stream: AirplayStreamJob | None = None
class AirplayProvider(PlayerProvider):
"""Player provider for Airplay based players."""
- _atv_players: dict[str, AirPlayPlayer]
+ cliraop_bin: str | None = None
+ _players: dict[str, AirPlayPlayer]
_discovery_running: bool = False
- _cliraop_bin: str | None = None
_stream_tasks: dict[str, asyncio.Task]
_dacp_server: asyncio.Server = None
_dacp_info: AsyncServiceInfo = None
async def handle_async_init(self) -> None:
"""Handle async initialization of the provider."""
- self._atv_players = {}
+ self._players = {}
self._stream_tasks = {}
- self._cliraop_bin = await self.get_cliraop_binary()
+ self.cliraop_bin = await self._getcliraop_binary()
dacp_port = await select_free_port(39831, 49831)
- # the pyatv logger is way to noisy, silence it a bit
- logging.getLogger("pyatv").setLevel(self.logger.level + 10)
self.dacp_id = dacp_id = f"{randrange(2 ** 64):X}"
self.logger.debug("Starting DACP ActiveRemote %s on port %s", dacp_id, dacp_port)
self._dacp_server = await asyncio.start_server(
)
await self.mass.aiozc.async_register_service(self._dacp_info)
- async def loaded_in_mass(self) -> None:
- """Call after the provider has been loaded."""
- await self._run_discovery()
+ async def on_mdns_service_state_change(
+ self, name: str, state_change: ServiceStateChange, info: AsyncServiceInfo | None
+ ) -> None:
+ """Handle MDNS service state callback."""
+ raw_id, display_name = name.split(".")[0].split("@", 1)
+ player_id = f"ap{raw_id.lower()}"
+ # handle removed player
+ if state_change == ServiceStateChange.Removed:
+ self.logger.debug("Airplay device %s removed", name)
+ if mass_player := self.mass.players.get(player_id):
+ # the player has become unavailable
+ self.logger.info("Player removed %s", display_name)
+ mass_player.available = False
+ self.mass.players.update(player_id)
+ return
+ # handle update for existing device
+ if airplay_player := self._players.get(player_id):
+ if mass_player := self.mass.players.get(player_id):
+ cur_address = info.parsed_addresses()[0]
+ prev_address = airplay_player.discovery_info.parsed_addresses()[0]
+ if cur_address != prev_address:
+ airplay_player.logger.info(
+ "Address updated from %s to %s", prev_address, cur_address
+ )
+ mass_player.device_info = DeviceInfo(
+ model=mass_player.device_info.model,
+ manufacturer=mass_player.device_info.manufacturer,
+ address=str(cur_address),
+ )
+ if not mass_player.available:
+ mass_player.available = True
+ # always update the latest discovery info
+ airplay_player.discovery_info = info
+ self.mass.players.update(player_id)
+ return
+ # handle new player
+ await self._setup_player(player_id, display_name, info)
async def unload(self) -> None:
"""Handle close/cleanup of the provider."""
- # power off all players (will disconnct and close cliraop)
- for player_id in self._atv_players:
+ # power off all players (will disconnect and close cliraop)
+ for player_id in self._players:
await self.cmd_power(player_id, False)
# shutdown DACP server
if self._dacp_server:
if self._dacp_info:
await self.mass.aiozc.async_unregister_service(self._dacp_info)
- async def _handle_dacp_request( # noqa: PLR0915
- self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter
- ) -> None:
- """Handle new connection on the socket."""
- try:
- raw_request = b""
- while recv := await reader.read(1024):
- raw_request += recv
- if len(recv) < 1024:
- break
-
- request = raw_request.decode("UTF-8")
- headers_raw, body = request.split("\r\n\r\n", 1)
- headers_raw = headers_raw.split("\r\n")
- headers = {}
- for line in headers_raw[1:]:
- x, y = line.split(":", 1)
- headers[x.strip()] = y.strip()
- active_remote = headers.get("Active-Remote")
- _, path, _ = headers_raw[0].split(" ")
- atv_player = next(
- (x for x in self._atv_players.values() if x.active_remote_id == active_remote), None
- )
- self.logger.debug(
- "DACP request for %s (%s): %s -- %s",
- atv_player.discovery_info.name if atv_player else "UNKNOWN PLAYER",
- active_remote,
- path,
- body,
- )
- if not atv_player:
- return
-
- player_id = atv_player.player_id
- if path == "/ctrl-int/1/nextitem":
- self.mass.create_task(self.mass.player_queues.next(player_id))
- elif path == "/ctrl-int/1/previtem":
- self.mass.create_task(self.mass.player_queues.previous(player_id))
- elif path == "/ctrl-int/1/play":
- self.mass.create_task(self.mass.player_queues.play(player_id))
- elif path == "/ctrl-int/1/playpause":
- self.mass.create_task(self.mass.player_queues.play_pause(player_id))
- elif path == "/ctrl-int/1/stop":
- self.mass.create_task(self.cmd_stop(player_id))
- elif path == "/ctrl-int/1/volumeup":
- self.mass.create_task(self.mass.players.cmd_volume_up(player_id))
- elif path == "/ctrl-int/1/volumedown":
- self.mass.create_task(self.mass.players.cmd_volume_down(player_id))
- elif path == "/ctrl-int/1/shuffle_songs":
- queue = self.mass.player_queues.get(player_id)
- self.mass.create_task(
- self.mass.player_queues.set_shuffle(player_id, not queue.shuffle_enabled)
- )
- elif path in ("/ctrl-int/1/pause", "/ctrl-int/1/discrete-pause"):
- self.mass.create_task(self.mass.player_queues.pause(player_id))
- elif "dmcp.device-volume=" in path:
- raop_volume = float(path.split("dmcp.device-volume=", 1)[-1])
- volume = convert_airplay_volume(raop_volume)
- if abs(volume - int(atv_player.atv.audio.volume)) > 2:
- self.mass.create_task(self.cmd_volume_set(player_id, volume))
- elif "dmcp.volume=" in path:
- volume = int(path.split("dmcp.volume=", 1)[-1])
- if abs(volume - int(atv_player.atv.audio.volume)) > 2:
- self.mass.create_task(self.cmd_volume_set(player_id, volume))
- else:
- self.logger.debug(
- "Unknown DACP request for %s: %s",
- atv_player.discovery_info.name,
- path,
- )
-
- # send response
- date_str = utc().strftime("%a, %-d %b %Y %H:%M:%S")
- response = (
- f"HTTP/1.0 204 No Content\r\nDate: {date_str} "
- "GMT\r\nDAAP-Server: iTunes/7.6.2 (Windows; N;)\r\nContent-Type: "
- "application/x-dmap-tagged\r\nContent-Length: 0\r\n"
- "Connection: close\r\n\r\n"
- )
- writer.write(response.encode())
- await writer.drain()
- finally:
- writer.close()
-
async def get_player_config_entries(self, player_id: str) -> tuple[ConfigEntry, ...]:
"""Return all (provider/player specific) Config Entries for the given player (if any)."""
entries = await super().get_player_config_entries(player_id)
- player_id: player_id of the player to handle the command.
"""
- if stream_task := self._stream_tasks.pop(player_id, None):
- if not stream_task.done():
- stream_task.cancel()
+
+ async def stop_player(airplay_player: AirPlayPlayer) -> None:
+ if airplay_player.active_stream:
+ await airplay_player.active_stream.stop()
+ mass_player = self.mass.players.get(airplay_player.player_id)
+ mass_player.state = PlayerState.IDLE
+ self.mass.players.update(airplay_player.player_id)
# forward command to player and any connected sync members
async with asyncio.TaskGroup() as tg:
- for atv_player in self._get_sync_clients(player_id):
- tg.create_task(atv_player.stop())
+ for airplay_player in self._get_sync_clients(player_id):
+ tg.create_task(stop_player(airplay_player))
async def cmd_play(self, player_id: str) -> None:
"""Send PLAY (unpause) command to given player.
"""
# forward command to player and any connected sync members
async with asyncio.TaskGroup() as tg:
- for atv_player in self._get_sync_clients(player_id):
- if atv_player.cliraop_proc and not atv_player.cliraop_proc.closed:
+ for airplay_player in self._get_sync_clients(player_id):
+ if airplay_player.active_stream and airplay_player.active_stream.running:
# prefer interactive command to our streamer
- tg.create_task(atv_player.send_cli_command("ACTION=PLAY"))
- atv_player.optimistic_state = PlayerState.PLAYING
- atv_player.update_attributes()
- elif atv := atv_player.atv:
- tg.create_task(atv.remote_control.play())
+ tg.create_task(airplay_player.active_stream.send_cli_command("ACTION=PLAY"))
async def cmd_pause(self, player_id: str) -> None:
"""Send PAUSE command to given player.
"""
# forward command to player and any connected sync members
async with asyncio.TaskGroup() as tg:
- for atv_player in self._get_sync_clients(player_id):
- if atv_player.cliraop_proc and not atv_player.cliraop_proc.closed:
+ for airplay_player in self._get_sync_clients(player_id):
+ if airplay_player.active_stream and airplay_player.active_stream.running:
# prefer interactive command to our streamer
- tg.create_task(atv_player.send_cli_command("ACTION=PAUSE"))
- atv_player.optimistic_state = PlayerState.PAUSED
- atv_player.update_attributes()
- elif atv := atv_player.atv:
- tg.create_task(atv.remote_control.pause())
+ tg.create_task(airplay_player.active_stream.send_cli_command("ACTION=PAUSE"))
async def play_media(
self,
- seek_position: Optional seek to this position.
- fade_in: Optionally fade in the item at playback start.
"""
- # stop existing streams first
+ # always stop existing stream first
await self.cmd_stop(player_id)
- # power on player if needed
- await self.cmd_power(player_id, True)
# start streaming the queue (pcm) audio in a background task
queue = self.mass.player_queues.get_active_queue(player_id)
self._stream_tasks[player_id] = asyncio.create_task(
This is a special feature from the Universal Group provider.
"""
- # stop existing streams first
+ # always stop existing stream first
await self.cmd_stop(player_id)
- # power on player if needed
- await self.cmd_power(player_id, True)
if stream_job.pcm_format.bit_depth != 16 or stream_job.pcm_format.sample_rate != 44100:
# TODO: resample on the fly here ?
raise RuntimeError("Unsupported PCM format")
if player.synced_to:
# should not happen, but just in case
raise RuntimeError("Player is synced")
- player.elapsed_time = 0
- player.elapsed_time_last_updated = time.time()
- player.state = PlayerState.PLAYING
- self.mass.players.update(player_id)
- # NOTE: Although the pyatv library is perfectly capable of playback
- # to not only raop targets but also airplay 1 + 2, its not suitable
- # for synced playback to multiple clients at once.
- # Also the performance is horrible. Python is not suitable for realtime
- # audio streaming.
- # So, I've decided to a combined route here. I've created a small binary
+
+ # Python is not suitable for realtime audio streaming.
+ # So, I've decided to go the fancy route here. I've created a small binary
# written in C based on libraop to do the actual timestamped playback.
# the raw pcm audio is fed to the stdin of this cliraop binary and we can
# send some commands over a named pipe.
# get current ntp before we start
- _, stdout = await check_output(f"{self._cliraop_bin} -ntp")
- ntp = int(stdout.strip())
+ _, stdout = await check_output(f"{self.cliraop_bin} -ntp")
+ start_ntp = int(stdout.strip())
# setup Raop process for player and its sync childs
- async with asyncio.TaskGroup() as tg:
- for atv_player in self._get_sync_clients(player_id):
- if not atv_player.atv:
- # should not be possible, but just in case...
- await atv_player.connect()
- tg.create_task(self._init_cliraop(atv_player, ntp))
+ for airplay_player in self._get_sync_clients(player_id):
+ # make sure that existing stream is stopped
+ if airplay_player.active_stream:
+ await airplay_player.active_stream.stop()
+ airplay_player.active_stream = AirplayStreamJob(self, airplay_player)
+ await airplay_player.active_stream.init_cliraop(start_ntp)
prev_metadata_checksum: str = ""
- try:
- async for pcm_chunk in audio_iterator:
- # send metadata to player(s) if needed
- # NOTE: this must all be done in separate tasks to not disturb audio
- if queue and queue.current_item and queue.current_item.streamdetails:
- metadata_checksum = (
- queue.current_item.streamdetails.stream_title
- or queue.current_item.queue_item_id
+ async for pcm_chunk in audio_iterator:
+ # send audio chunk to player(s)
+ available_clients = 0
+ async with asyncio.TaskGroup() as tg:
+ for airplay_player in self._get_sync_clients(player_id):
+ if (
+ not airplay_player.active_stream
+ or not airplay_player.active_stream.running
+ or airplay_player.active_stream.start_ntp != start_ntp
+ ):
+ # catch when this stream is no longer active on the player
+ continue
+ available_clients += 1
+ tg.create_task(airplay_player.active_stream.write_chunk(pcm_chunk))
+ # always send the progress
+ tg.create_task(
+ airplay_player.active_stream.send_cli_command(
+ f"PROGRESS={int(queue.elapsed_time)}\n"
+ )
)
- if prev_metadata_checksum != metadata_checksum:
- prev_metadata_checksum = metadata_checksum
- self.mass.create_task(self._send_metadata(player_id))
-
- async with asyncio.TaskGroup() as tg:
- # send progress metadata
- if queue.elapsed_time:
- for atv_player in self._get_sync_clients(player_id):
- tg.create_task(
- atv_player.send_cli_command(f"PROGRESS={int(queue.elapsed_time)}\n")
- )
- # send audio chunk to player(s)
- available_clients = 0
- for atv_player in self._get_sync_clients(player_id):
- if not atv_player.cliraop_proc or atv_player.cliraop_proc.closed:
- # this may not happen, but just in case
- continue
- available_clients += 1
- tg.create_task(atv_player.cliraop_proc.write(pcm_chunk))
- if not available_clients:
- return
- finally:
- self.logger.debug("Streaming ended for player %s", player.display_name)
- for atv_player in self._get_sync_clients(player_id):
- if atv_player.cliraop_proc and not atv_player.cliraop_proc.closed:
- atv_player.cliraop_proc.write_eof()
-
- async def cmd_power(self, player_id: str, powered: bool) -> None:
- """Send POWER command to given player.
+ if not available_clients:
+ # this streamjob is no longer active
+ return
- - player_id: player_id of the player to handle the command.
- - powered: bool if player should be powered on or off.
- """
- atv_player = self._atv_players[player_id]
- mass_player = self.mass.players.get(player_id)
- if powered:
- await atv_player.connect()
- elif not powered:
- await self.cmd_stop(player_id)
- await atv_player.disconnect()
- mass_player.powered = powered
- self.mass.players.update(player_id)
+ # send metadata to player(s) if needed
+ # NOTE: this must all be done in separate tasks to not disturb audio
+ if queue and queue.current_item and queue.current_item.streamdetails:
+ metadata_checksum = (
+ queue.current_item.streamdetails.stream_title
+ or queue.current_item.queue_item_id
+ )
+ if prev_metadata_checksum != metadata_checksum:
+ prev_metadata_checksum = metadata_checksum
+ self.mass.create_task(self._send_metadata(player_id, queue))
+
+ # end of stream reached - write eof
+ for airplay_player in self._get_sync_clients(player_id):
+ if (
+ not airplay_player.active_stream
+ or not airplay_player.active_stream.running
+ or airplay_player.active_stream.start_ntp != start_ntp
+ ):
+ # this may not happen, but guard just in case
+ continue
+ await airplay_player.active_stream.write_eof()
async def cmd_volume_set(self, player_id: str, volume_level: int) -> None:
"""Send VOLUME_SET command to given player.
- player_id: player_id of the player to handle the command.
- volume_level: volume level (0..100) to set on the player.
"""
- atv_player = self._atv_players[player_id]
- if atv_player.cliraop_proc:
- # prefer interactive command to our streamer
- await atv_player.send_cli_command(f"VOLUME={volume_level}\n")
- if atv := atv_player.atv:
- await atv.audio.set_volume(volume_level)
+ airplay_player = self._players[player_id]
+ if airplay_player.active_stream:
+ await airplay_player.active_stream.send_cli_command(f"VOLUME={volume_level}\n")
+ mass_player = self.mass.players.get(player_id)
+ mass_player.volume_level = volume_level
+ self.mass.players.update(player_id)
+ # store last state in cache
+ await self.mass.cache.set(f"{CACHE_KEY_PREV_VOLUME}.{player_id}", volume_level)
async def cmd_sync(self, player_id: str, target_player: str) -> None:
"""Handle SYNC command for given player.
group_leader.group_childs.add(player_id)
self.mass.players.update(target_player)
if group_leader.powered:
- await self.cmd_power(player_id, True)
+ await self.mass.players.cmd_power(player_id, True)
active_queue = self.mass.player_queues.get_active_queue(group_leader.player_id)
if active_queue.state == PlayerState.PLAYING:
self.mass.create_task(self.mass.player_queues.resume(active_queue.queue_id))
await self.cmd_stop(player_id)
self.mass.players.update(player_id)
- async def _run_discovery(self) -> None:
- """Discover Airplay players on the network."""
- if self._discovery_running:
- return
- try:
- self._discovery_running = True
- self.logger.debug("Airplay discovery started...")
- discovered_devices = await scan(self.mass.loop, protocol=Protocol.RAOP, timeout=30)
-
- if not discovered_devices:
- self.logger.debug("No devices found")
- return
-
- for dev in discovered_devices:
- self.mass.create_task(self._player_discovered(dev))
-
- finally:
- self._discovery_running = False
-
- def reschedule():
- self.mass.create_task(self._run_discovery())
-
- # reschedule self once finished
- self.mass.loop.call_later(300, reschedule)
-
- async def _player_discovered(self, discovery_info: interface.BaseConfig) -> None:
- """Handle discovered Airplay player on mdns."""
- player_id = f"ap{discovery_info.identifier.lower().replace(':', '')}"
- if player_id in self._atv_players:
- atv_player = self._atv_players[player_id]
- if discovery_info.address != atv_player.discovery_info.address:
- atv_player.address_updated(discovery_info.address)
- return
- if "_raop._tcp.local" not in discovery_info.properties:
- # skip players without raop
- return
- self.logger.debug(
- "Discovered Airplay device %s on %s", discovery_info.name, discovery_info.address
- )
- self._atv_players[player_id] = atv_player = AirPlayPlayer(
- self.mass, player_id, discovery_info
- )
- atv_player._setup_device()
- for player in self.players:
- player.can_sync_with = tuple(x for x in self._atv_players if x != player.player_id)
- self.mass.players.update(player.player_id)
-
- def _is_feature_available(
- self, atv_player: interface.AppleTV, feature: interface.FeatureName
- ) -> bool:
- """Return if a feature is available."""
- mass_player = self.mass.players.get(atv_player.device_info.output_device_id)
- if atv_player and mass_player.state == PlayerState.PLAYING:
- return atv_player.features.in_state(interface.FeatureState.Available, feature)
- return False
-
- async def get_cliraop_binary(self):
+ async def _getcliraop_binary(self):
"""Find the correct raop/airplay binary belonging to the platform."""
# ruff: noqa: SIM102
- if self._cliraop_bin is not None:
- return self._cliraop_bin
+ if self.cliraop_bin is not None:
+ return self.cliraop_bin
async def check_binary(cliraop_path: str) -> str | None:
try:
stdout, _ = await cliraop.communicate()
stdout = stdout.strip().decode()
if cliraop.returncode == 0 and stdout == "cliraop check":
- self._cliraop_bin = cliraop_path
+ self.cliraop_bin = cliraop_path
return cliraop_path
except OSError:
return None
group_child_ids = {player_id}
group_child_ids.update(mass_player.group_childs)
for child_id in group_child_ids:
- if client := self._atv_players.get(child_id):
+ if client := self._players.get(child_id):
sync_clients.append(client)
return sync_clients
- async def _init_cliraop(self, atv_player: AirPlayPlayer, ntp: int) -> None: # noqa: PLR0915
- """Initiatlize CLIRaop process for a player."""
- stream: RaopStream | None = next(
- (x for x in atv_player.atv.stream.instances if isinstance(x, RaopStream)), None
+ async def _setup_player(
+ self, player_id: str, display_name: str, info: AsyncServiceInfo
+ ) -> None:
+ """Handle setup of a new player that is discovered using mdns."""
+ address = info.parsed_addresses()[0]
+ # some guards if our info is valid/complete
+ if address == "127.0.0.1":
+ return
+ if "md" not in info.decoded_properties:
+ return
+ if "et" not in info.decoded_properties:
+ return
+ self.logger.debug("Discovered Airplay device %s on %s", display_name, address)
+ self._players[player_id] = AirPlayPlayer(
+ player_id, discovery_info=info, logger=self.logger.getChild(player_id)
)
- if stream is None:
- raise RuntimeError("RAOP Not available")
-
- async def log_watcher(cliraop_proc: AsyncProcess) -> None:
- """Monitor stderr for a running CLIRaop process."""
- mass_player = self.mass.players.get(atv_player.player_id)
- logger = self.logger.getChild(atv_player.player_id)
- async for line in cliraop_proc._proc.stderr:
- line = line.decode().strip() # noqa: PLW2901
- if not line:
- continue
- if "set pause" in line:
- atv_player.optimistic_state = PlayerState.PAUSED
- atv_player.update_attributes()
- logger.info(line)
- elif "Restarted at" in line:
- atv_player.optimistic_state = PlayerState.PLAYING
- atv_player.update_attributes()
- logger.info(line)
- elif "after start), played" in line:
- millis = int(line.split("played ")[1].split(" ")[0])
- mass_player.elapsed_time = millis / 1000
- mass_player.elapsed_time_last_updated = time.time()
- else:
- logger.debug(line)
- # if we reach this point, the process exited
- if cliraop_proc._proc.returncode is not None:
- cliraop_proc.closed = True
- atv_player.optimistic_state = PlayerState.IDLE
- atv_player.update_attributes()
- logger.debug(
- "CLIRaop process stopped with errorcode %s",
- cliraop_proc._proc.returncode,
+ manufacturer, model = get_model_from_am(info.decoded_properties.get("am"))
+ if "apple tv" in model.lower():
+ # For now, we ignore the Apple TV until we implement the authentication.
+ # maybe we can simply use pyatv only for this part?
+ # the cliraop application has already been prepared to accept the secret.
+ self.logger.debug(
+ "Ignoring %s in discovery due to authentication requirement.", display_name
)
-
- extra_args = []
- latency = self.mass.config.get_raw_player_config_value(
- atv_player.player_id, CONF_LATENCY, 1200
- )
- extra_args += ["-l", str(latency)]
- if self.mass.config.get_raw_player_config_value(
- atv_player.player_id, CONF_ENCRYPTION, False
- ):
- extra_args += ["-u"]
- if self.mass.config.get_raw_player_config_value(
- atv_player.player_id, CONF_ALAC_ENCODE, True
- ):
- extra_args += ["-a"]
- if self.mass.config.get_raw_player_config_value(
- atv_player.player_id, CONF_VOLUME_START, False
- ):
- extra_args += ["-v", str(int(atv_player.atv.audio.volume))]
- sync_adjust = self.mass.config.get_raw_player_config_value(
- atv_player.player_id, CONF_SYNC_ADJUST, 0
+ return
+ if not self.mass.config.get_raw_player_config_value(player_id, "enabled", True):
+ self.logger.debug("Ignoring %s in discovery as it is disabled.", display_name)
+ return
+ if not (volume := await self.mass.cache.get(f"{CACHE_KEY_PREV_VOLUME}.{player_id}")):
+ volume = FALLBACK_VOLUME
+ mass_player = Player(
+ player_id=player_id,
+ provider=self.instance_id,
+ type=PlayerType.PLAYER,
+ name=display_name,
+ available=True,
+ powered=False,
+ device_info=DeviceInfo(
+ model=model,
+ manufacturer=manufacturer,
+ address=address,
+ ),
+ supported_features=(
+ PlayerFeature.PAUSE,
+ PlayerFeature.SYNC,
+ PlayerFeature.VOLUME_SET,
+ ),
+ max_sample_rate=44100,
+ supports_24bit=False,
+ can_sync_with=tuple(x for x in self._players if x != player_id),
+ volume_level=volume,
)
- if device_password := self.mass.config.get_raw_player_config_value(
- atv_player.player_id, CONF_PASSWORD, None
- ):
- extra_args += ["-P", device_password]
- if self.logger.level == logging.DEBUG:
- extra_args += ["-d", "5"]
+ self.mass.players.register_or_update(mass_player)
+ # update can_sync_with field of all other players
+ for player in self.players:
+ if player.player_id == player_id:
+ continue
+ player.can_sync_with = tuple(x for x in self._players if x != player.player_id)
+ self.mass.players.update(player.player_id)
- atv_player.optimistic_state = PlayerState.PLAYING
- # always generate a new active remote id to prevent race conditions
- # with the named pipe used to send commands
- atv_player.active_remote_id = str(randint(1000, 8000))
- args = [
- self._cliraop_bin,
- "-n",
- str(ntp),
- "-p",
- str(stream.core.service.port),
- "-w",
- str(2000 + sync_adjust),
- *extra_args,
- "-dacp",
- self.dacp_id,
- "-ar",
- atv_player.active_remote_id,
- "-md",
- atv_player.discovery_info.properties["_raop._tcp.local"]["md"],
- "-et",
- atv_player.discovery_info.properties["_raop._tcp.local"]["et"],
- str(atv_player.discovery_info.address),
- "-",
- ]
- if platform.system() == "Darwin":
- os.environ["DYLD_LIBRARY_PATH"] = "/usr/local/lib"
- atv_player.cliraop_proc = AsyncProcess(
- args, enable_stdin=True, enable_stdout=False, enable_stderr=True
- )
- await atv_player.cliraop_proc.start()
- atv_player.cliraop_proc.attach_task(log_watcher(atv_player.cliraop_proc))
+ async def _handle_dacp_request( # noqa: PLR0915
+ self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter
+ ) -> None:
+ """Handle new connection on the socket."""
+ try:
+ raw_request = b""
+ while recv := await reader.read(1024):
+ raw_request += recv
+ if len(recv) < 1024:
+ break
+
+ request = raw_request.decode("UTF-8")
+ headers_raw, body = request.split("\r\n\r\n", 1)
+ headers_raw = headers_raw.split("\r\n")
+ headers = {}
+ for line in headers_raw[1:]:
+ x, y = line.split(":", 1)
+ headers[x.strip()] = y.strip()
+ active_remote = headers.get("Active-Remote")
+ _, path, _ = headers_raw[0].split(" ")
+ airplay_player = next(
+ (
+ x
+ for x in self._players.values()
+ if x.active_stream and x.active_stream.active_remote_id == active_remote
+ ),
+ None,
+ )
+ self.logger.debug(
+ "DACP request for %s (%s): %s -- %s",
+ airplay_player.discovery_info.name if airplay_player else "UNKNOWN PLAYER",
+ active_remote,
+ path,
+ body,
+ )
+ if not airplay_player:
+ return
+
+ player_id = airplay_player.player_id
+ mass_player = self.mass.players.get(player_id)
+ if path == "/ctrl-int/1/nextitem":
+ self.mass.create_task(self.mass.player_queues.next(player_id))
+ elif path == "/ctrl-int/1/previtem":
+ self.mass.create_task(self.mass.player_queues.previous(player_id))
+ elif path == "/ctrl-int/1/play":
+ self.mass.create_task(self.mass.player_queues.play(player_id))
+ elif path == "/ctrl-int/1/playpause":
+ self.mass.create_task(self.mass.player_queues.play_pause(player_id))
+ elif path == "/ctrl-int/1/stop":
+ self.mass.create_task(self.cmd_stop(player_id))
+ elif path == "/ctrl-int/1/volumeup":
+ self.mass.create_task(self.mass.players.cmd_volume_up(player_id))
+ elif path == "/ctrl-int/1/volumedown":
+ self.mass.create_task(self.mass.players.cmd_volume_down(player_id))
+ elif path == "/ctrl-int/1/shuffle_songs":
+ queue = self.mass.player_queues.get(player_id)
+ self.mass.create_task(
+ self.mass.player_queues.set_shuffle(player_id, not queue.shuffle_enabled)
+ )
+ elif path in ("/ctrl-int/1/pause", "/ctrl-int/1/discrete-pause"):
+ self.mass.create_task(self.mass.player_queues.pause(player_id))
+ elif "dmcp.device-volume=" in path:
+ raop_volume = float(path.split("dmcp.device-volume=", 1)[-1])
+ volume = convert_airplay_volume(raop_volume)
+ if abs(volume - mass_player.volume_level) > 2:
+ self.mass.create_task(self.cmd_volume_set(player_id, volume))
+ elif "dmcp.volume=" in path:
+ volume = int(path.split("dmcp.volume=", 1)[-1])
+ if abs(volume - mass_player.volume_level) > 2:
+ self.mass.create_task(self.cmd_volume_set(player_id, volume))
+ else:
+ self.logger.debug(
+ "Unknown DACP request for %s: %s",
+ airplay_player.discovery_info.name,
+ path,
+ )
+
+ # send response
+ date_str = utc().strftime("%a, %-d %b %Y %H:%M:%S")
+ response = (
+ f"HTTP/1.0 204 No Content\r\nDate: {date_str} "
+ "GMT\r\nDAAP-Server: iTunes/7.6.2 (Windows; N;)\r\nContent-Type: "
+ "application/x-dmap-tagged\r\nContent-Length: 0\r\n"
+ "Connection: close\r\n\r\n"
+ )
+ writer.write(response.encode())
+ await writer.drain()
+ finally:
+ writer.close()
- async def _send_metadata(self, player_id: str) -> None:
+ async def _send_metadata(self, player_id: str, queue: PlayerQueue) -> None:
"""Send metadata to player (and connected sync childs)."""
- queue = self.mass.player_queues.get_active_queue(player_id)
if not queue or not queue.current_item:
return
duration = min(queue.current_item.duration or 0, 3600)
cmd = f"TITLE={title or 'Music Assistant'}\nARTIST={artist}\nALBUM={album}\n"
cmd += f"DURATION={duration}\nACTION=SENDMETA\n"
- for atv_player in self._get_sync_clients(player_id):
- await atv_player.send_cli_command(cmd)
+ for airplay_player in self._get_sync_clients(player_id):
+ if not airplay_player.active_stream:
+ continue
+ await airplay_player.active_stream.send_cli_command(cmd)
# get image
if not queue.current_item.image:
return
+ # the image format needs to be 500x500 jpeg for maximum compatibility with players
image_url = self.mass.metadata.get_image_url(
- queue.current_item.image, size=512, prefer_proxy=True
+ queue.current_item.image, size=500, prefer_proxy=True, image_format="jpeg"
)
- for atv_player in self._get_sync_clients(player_id):
- await atv_player.send_cli_command(f"ARTWORK={image_url}\n")
+ for airplay_player in self._get_sync_clients(player_id):
+ if not airplay_player.active_stream:
+ continue
+ await airplay_player.active_stream.send_cli_command(f"ARTWORK={image_url}\n")