-"""Airplay Player provider.
-
-This is more like a "virtual" player provider, running on top of slimproto.
-It uses the amazing work of Philippe44 who created a bridge from airplay to slimproto.
-https://github.com/philippe44/LMS-Raop
-"""
-
+"""Airplay Player provider for Music Assistant."""
from __future__ import annotations
import asyncio
import os
import platform
-import xml.etree.ElementTree as ET # noqa: N817
-from contextlib import suppress
-from typing import TYPE_CHECKING
+import socket
+import time
+from random import randint, randrange
+from typing import TYPE_CHECKING, cast
import aiofiles
+from pyatv import connect, exceptions, interface, scan
+from pyatv.conf import AppleTV as ATVConf
+from pyatv.const import DeviceModel, DeviceState, PowerState, Protocol
+from pyatv.convert import model_str
+from pyatv.interface import AppleTV as AppleTVInterface
+from pyatv.interface import DeviceListener
+from pyatv.protocols.raop import RaopStream
+from zeroconf import ServiceInfo
-from music_assistant.common.models.config_entries import ConfigEntry, ConfigValueType
-from music_assistant.common.models.enums import ConfigEntryType, ProviderFeature
+from music_assistant.common.helpers.datetime import utc
+from music_assistant.common.helpers.util import get_ip_pton
+from music_assistant.common.models.config_entries import (
+ CONF_ENTRY_CROSSFADE,
+ CONF_ENTRY_CROSSFADE_DURATION,
+ ConfigEntry,
+ ConfigValueType,
+)
+from music_assistant.common.models.enums import (
+ ConfigEntryType,
+ ContentType,
+ PlayerFeature,
+ PlayerState,
+ PlayerType,
+ ProviderFeature,
+)
+from music_assistant.common.models.media_items import AudioFormat
from music_assistant.common.models.player import DeviceInfo, Player
-from music_assistant.constants import CONF_LOG_LEVEL, CONF_PLAYERS
+from music_assistant.server.helpers.process import AsyncProcess, check_output
+from music_assistant.server.helpers.util import create_tempfile
from music_assistant.server.models.player_provider import PlayerProvider
if TYPE_CHECKING:
- from music_assistant.common.models.config_entries import PlayerConfig, ProviderConfig
+ from music_assistant.common.models.config_entries import ProviderConfig
from music_assistant.common.models.provider import ProviderManifest
from music_assistant.common.models.queue_item import QueueItem
from music_assistant.server import MusicAssistant
from music_assistant.server.controllers.streams import MultiClientStreamJob
from music_assistant.server.models import ProviderInstanceType
- from music_assistant.server.providers.slimproto import SlimprotoProvider
-
+CONF_LATENCY = "latency"
+CONF_ENCRYPTION = "encryption"
+CONF_ALAC_ENCODE = "alac_encode"
+CONF_VOLUME_START = "volume_start"
+CONF_SYNC_ADJUST = "sync_adjust"
PLAYER_CONFIG_ENTRIES = (
+ CONF_ENTRY_CROSSFADE,
+ CONF_ENTRY_CROSSFADE_DURATION,
ConfigEntry(
- key="airplay_header",
- type=ConfigEntryType.DIVIDER,
- label="Airplay specific settings",
- description="Configure Airplay specific settings. "
- "Note that changing any airplay specific setting, will reconnect all players.",
- advanced=True,
- ),
- ConfigEntry(
- key="read_ahead",
+ key=CONF_LATENCY,
type=ConfigEntryType.INTEGER,
range=(200, 3000),
- default_value=1000,
- label="Read ahead buffer",
+ default_value=1200,
+ label="Latency",
description="Sets the number of milliseconds of audio buffer in the player. "
"This is important to absorb network throughput jitter. "
"Note that the resume after pause will be skipping that amount of time "
advanced=True,
),
ConfigEntry(
- key="encryption",
+ key=CONF_ENCRYPTION,
type=ConfigEntryType.BOOLEAN,
default_value=False,
label="Enable encryption",
advanced=True,
),
ConfigEntry(
- key="alac_encode",
+ key=CONF_ALAC_ENCODE,
type=ConfigEntryType.BOOLEAN,
default_value=True,
label="Enable compression",
advanced=True,
),
ConfigEntry(
- key="remove_timeout",
+ key=CONF_VOLUME_START,
+ type=ConfigEntryType.BOOLEAN,
+ default_value=False,
+ label="Send volume at playback start",
+ description="Some players require to send/confirm the volume when playback starts. \n"
+ "Enable this setting if the playback volume does not match the MA interface.",
+ advanced=True,
+ ),
+ ConfigEntry(
+ key=CONF_SYNC_ADJUST,
type=ConfigEntryType.INTEGER,
+ range=(-500, 500),
default_value=0,
- range=(-1, 3600),
- label="Remove timeout",
- description="Player discovery is managed using mDNS protocol, "
- "which means that a player sends regular keep-alive messages and a bye when "
- "disconnecting. Some faulty mDNS stack implementations (e.g. Riva) do not always"
- "send keep-alive messages, so the Airplay bridge is disconnecting them regularly. \n\n"
- "As a workaround, a timer can be set so that the bridge does not immediately remove "
- "the player from LMS when missing a keep-alive, waiting for it to reconnect. \n\n\n"
- "A value of -1 will disable this feature and never remove the player. \n\n"
- "A value of 0 (the default) disabled the player when keep-alive is missed or "
- "when a bye message is received. \n\n"
- "Any other value means to disable the player after missing keep-alive for "
- "this number of seconds.",
+ label="Audio synchronization delay correction",
+ description="If this player is playing audio synced with other players "
+ "and you always hear the audio too early or late on this player, "
+ "you can shift the audio a bit.",
advanced=True,
),
)
+BACKOFF_TIME_LOWER_LIMIT = 15 # seconds
+BACKOFF_TIME_UPPER_LIMIT = 300 # Five minutes
-NEED_BRIDGE_RESTART = {"values/read_ahead", "values/encryption", "values/alac_encode", "enabled"}
+CONF_CREDENTIALS = "credentials"
async def setup(
return () # we do not have any config entries (yet)
+def convert_airplay_volume(value: float) -> int:
+ """Remap Airplay Volume to 0..100 scale."""
+ airplay_min = -30
+ airplay_max = 0
+ normal_min = 0
+ normal_max = 100
+ portion = (value - airplay_min) * (normal_max - normal_min) / (airplay_max - airplay_min)
+ return int(portion + normal_min)
+
+
+class AirPlayPlayer(DeviceListener):
+ """Holds the connection to the apyatv instance and the cliraop."""
+
+ def __init__(
+ self, mass: MusicAssistant, player_id: str, discovery_info: interface.BaseConfig
+ ) -> None:
+ """Initialize power manager."""
+ self.player_id = player_id
+ self.discovery_info = discovery_info
+ self.mass = mass
+ self.atv: AppleTVInterface | None = None
+ self.is_on = False
+ self.connected = False
+ self._connection_attempts = 0
+ self._connection_was_lost = False
+ self._task = None
+ self._playing: interface.Playing | None = None
+ self.logger = self.mass.players.logger.getChild("airplay").getChild(self.player_id)
+ self.cliraop_proc: AsyncProcess | None = None
+ self.active_remote_id = str(randint(1000, 8000))
+ self.optimistic_state: PlayerState = PlayerState.IDLE
+
+ def connection_lost(self, _):
+ """Device was unexpectedly disconnected.
+
+ This is a callback function from pyatv.interface.DeviceListener.
+ """
+ self.logger.warning('Connection lost to Apple TV "%s"', self.discovery_info.name)
+ self._connection_was_lost = True
+ self._handle_disconnect()
+
+ def connection_closed(self):
+ """Device connection was (intentionally) closed.
+
+ This is a callback function from pyatv.interface.DeviceListener.
+ """
+ self.connected = False
+ self._handle_disconnect()
+
+ def _handle_disconnect(self):
+ """Handle that the device disconnected and restart connect loop."""
+ self.connected = False
+ if self.atv:
+ self.atv.close()
+ self.atv = None
+ self._start_connect_loop()
+
+ async def connect(self):
+ """Connect to device."""
+ self.is_on = True
+ if self.connected:
+ return
+ self._start_connect_loop()
+ await asyncio.sleep(2)
+
+ async def disconnect(self):
+ """Disconnect from device."""
+ self.logger.debug("Disconnecting from device")
+ self.is_on = False
+ self.connected = False
+ try:
+ if self.atv:
+ self.atv.close()
+ self.atv = None
+ if self._task:
+ self._task.cancel()
+ self._task = None
+ except Exception: # pylint: disable=broad-except
+ self.logger.exception("An error occurred while disconnecting")
+
+ async def stop(self):
+ """Stop playback and cleanup any running CLIRaop Process."""
+ if self.cliraop_proc and not self.cliraop_proc.closed:
+ # prefer interactive command to our streamer
+ await self.send_cli_command("ACTION=STOP")
+ await self.cliraop_proc.wait()
+ self.optimistic_state = PlayerState.IDLE
+ self.update_attributes()
+ elif atv := self.atv:
+ await atv.remote_control.stop()
+
+ async def send_cli_command(self, command: str) -> None:
+ """Send an interactive command to the running CLIRaop binary."""
+ if not self.cliraop_proc or self.cliraop_proc.closed:
+ return
+
+ named_pipe = f"/tmp/fifo-{self.active_remote_id}" # noqa: S108
+ if not command.endswith("\n"):
+ command += "\n"
+
+ def send_data():
+ with open(named_pipe, "w") as f:
+ f.write(command)
+ f.flush()
+
+ self.logger.debug("sending command %s", command)
+ await self.mass.create_task(send_data)
+
+ def _start_connect_loop(self):
+ """Start background connect loop to device."""
+ if not self._task and self.atv is None and self.is_on:
+ self._task = asyncio.create_task(self._connect_loop())
+ else:
+ self.logger.debug("Not starting connect loop (%s, %s)", self.atv is None, self.is_on)
+
+ async def connect_once(self) -> None:
+ """Try to connect once."""
+ try:
+ if conf := await self._scan():
+ await self._connect(conf)
+ except exceptions.AuthenticationError:
+ await self.disconnect()
+ self.logger.exception(
+ "Authentication failed for %s, try reconfiguring device",
+ self.discovery_info.name,
+ )
+ return
+ except asyncio.CancelledError:
+ pass
+ except Exception: # pylint: disable=broad-except
+ self.logger.exception("Failed to connect")
+ self.atv = None
+
+ async def _connect_loop(self):
+ """Connect loop background task function."""
+ self.logger.debug("Starting connect loop")
+
+ # Try to find device and connect as long as the user has said that
+ # we are allowed to connect and we are not already connected.
+ while self.is_on and self.atv is None:
+ await self.connect_once()
+ if self.atv is not None:
+ break
+ self._connection_attempts += 1
+ backoff = min(
+ max(
+ BACKOFF_TIME_LOWER_LIMIT,
+ randrange(2**self._connection_attempts),
+ ),
+ BACKOFF_TIME_UPPER_LIMIT,
+ )
+
+ self.logger.debug("Reconnecting in %d seconds", backoff)
+ await asyncio.sleep(backoff)
+
+ self.logger.debug("Connect loop ended")
+ self._task = None
+
+ async def _scan(self) -> ATVConf | None:
+ """Try to find device by scanning for it."""
+ address: str = self.discovery_info.address
+
+ self.logger.debug("Discovering device %s", self.discovery_info.name)
+ atvs = await scan(
+ self.mass.loop,
+ identifier=self.discovery_info.identifier,
+ hosts=[address],
+ )
+ if atvs:
+ return cast(ATVConf, atvs[0])
+
+ self.logger.debug(
+ "Failed to find device %s with address %s",
+ self.discovery_info.name,
+ address,
+ )
+ # We no longer multicast scan for the device since as soon as async_step_zeroconf runs,
+ # it will update the address and reload the config entry when the device is found.
+ return None
+
+ async def _connect(self, conf: ATVConf) -> None:
+ """Connect to device."""
+ credentials: dict[int, str | None] = self.mass.config.get_raw_player_config_value(
+ self.player_id, CONF_CREDENTIALS, {}
+ )
+ name: str = self.discovery_info.name
+ missing_protocols = []
+ for protocol_int, creds in credentials.items():
+ protocol = Protocol(int(protocol_int))
+ if conf.get_service(protocol) is not None:
+ conf.set_credentials(protocol, creds) # type: ignore[arg-type]
+ else:
+ missing_protocols.append(protocol.name)
+
+ if missing_protocols:
+ missing_protocols_str = ", ".join(missing_protocols)
+ self.logger.info(
+ "Protocol(s) %s not yet found for %s, trying later",
+ missing_protocols_str,
+ name,
+ )
+ return
+
+ self.logger.debug("Connecting to device %s", name)
+ session = self.mass.http_session
+
+ self.atv = await connect(conf, self.mass.loop, session=session)
+ self.connected = True
+ self.atv.power.listener = self
+ self.atv.listener = self
+ self.atv.audio.listener = self
+ if self.atv.features.in_state(
+ interface.FeatureState.Available, interface.FeatureName.PushUpdates
+ ):
+ self.atv.push_updater.listener = self
+ self.atv.push_updater.start()
+
+ self.address_updated(str(conf.address))
+
+ self._setup_device()
+ self.update_attributes()
+
+ self._connection_attempts = 0
+ if self._connection_was_lost:
+ self.logger.info(
+ 'Connection was re-established to device "%s"',
+ name,
+ )
+ self._connection_was_lost = False
+
+ def _setup_device(self):
+ if not (mass_player := self.mass.players.get(self.player_id)):
+ mass_player = Player(
+ player_id=self.player_id,
+ provider="airplay",
+ type=PlayerType.PLAYER,
+ name=self.discovery_info.name,
+ available=True,
+ powered=False,
+ device_info=DeviceInfo(
+ model=self.discovery_info.device_info.raw_model,
+ manufacturer="Apple",
+ address=str(self.discovery_info.address),
+ ),
+ supported_features=(
+ PlayerFeature.PAUSE,
+ PlayerFeature.SYNC,
+ PlayerFeature.VOLUME_SET,
+ PlayerFeature.POWER,
+ ),
+ max_sample_rate=44100,
+ supports_24bit=False,
+ )
+ if self.atv:
+ dev_info = self.atv.device_info
+ mass_player.device_info = DeviceInfo(
+ model=(
+ dev_info.raw_model
+ if dev_info.model == DeviceModel.Unknown and dev_info.raw_model
+ else model_str(dev_info.model)
+ ),
+ manufacturer="Apple",
+ address=str(self.discovery_info.address),
+ )
+ self.mass.players.register_or_update(mass_player)
+
+ def playstatus_update(self, _, playstatus: interface.Playing) -> None:
+ """Inform about changes to what is currently playing."""
+ self.logger.debug("Playstatus received: %s", playstatus)
+ self._playing = playstatus
+ self.update_attributes()
+
+ def playstatus_error(self, updater, exception: Exception) -> None:
+ """Inform about an error when updating play status."""
+ self.logger.debug("Playstatus error received", exc_info=exception)
+ self._playing = None
+ self.update_attributes()
+
+ def powerstate_update(self, old_state: PowerState, new_state: PowerState) -> None:
+ """Update power state when it changes."""
+ self.update_attributes()
+
+ def volume_update(self, old_level: float, new_level: float) -> None:
+ """Update volume when it changes."""
+ self.update_attributes()
+
+ def update_attributes(self) -> None:
+ """Update the player attributes."""
+ mass_player = self.mass.players.get(self.player_id)
+ mass_player.volume_level = int(self.atv.audio.volume)
+ mass_player.powered = self.is_on
+ if self.cliraop_proc and not self.cliraop_proc.closed:
+ mass_player.state = self.optimistic_state
+ # NOTE: alapsed time is pushed from cliraop
+ elif self.atv is None or not self.connected:
+ mass_player.powered = False
+ mass_player.state = PlayerState.IDLE
+ elif self._playing:
+ state = self._playing.device_state
+ if state in (DeviceState.Idle, DeviceState.Loading):
+ mass_player.state = PlayerState.IDLE
+ elif state == DeviceState.Playing:
+ mass_player.state = PlayerState.PLAYING
+ elif state in (DeviceState.Paused, DeviceState.Seeking, DeviceState.Stopped):
+ mass_player.state = PlayerState.PAUSED
+ else:
+ mass_player.state = PlayerState.IDLE
+ mass_player.elapsed_time = self._playing.position or 0
+ mass_player.elapsed_time_last_updated = time.time()
+ mass_player.current_item_id = self._playing.content_identifier
+ else:
+ mass_player.state = PlayerState.IDLE
+ self.mass.players.update(self.player_id)
+
+ @property
+ def is_connecting(self):
+ """Return true if connection is in progress."""
+ return self._task is not None
+
+ def address_updated(self, address):
+ """Update cached address in config entry."""
+ self.logger.debug("Changing address to %s", address)
+ self._setup_device()
+
+
class AirplayProvider(PlayerProvider):
- """Player provider for Airplay based players, using the slimproto bridge."""
+ """Player provider for Airplay based players."""
- _bridge_bin: str | None = None
- _bridge_proc: asyncio.subprocess.Process | None = None
- _timer_handle: asyncio.TimerHandle | None = None
- _closing: bool = False
- _config_file: str | None = None
- _log_reader_task: asyncio.Task | None = None
- _removed_players: set[str] | None = None
+ _atv_players: dict[str, AirPlayPlayer]
+ _discovery_running: bool = False
+ _cliraop_bin: str | None = None
+ _stream_tasks: dict[str, asyncio.Task]
@property
def supported_features(self) -> tuple[ProviderFeature, ...]:
"""Return the features supported by this Provider."""
- # for now do not allow creation of airplay groups
- # in preparation of new airplay provider coming up soon
- # return (ProviderFeature.SYNC_PLAYERS,)
- return ()
+ return (ProviderFeature.SYNC_PLAYERS,)
async def handle_setup(self) -> None:
"""Handle async initialization of the provider."""
- self._removed_players = set()
- self._config_file = os.path.join(self.mass.storage_path, "airplay_bridge.xml")
- # locate the raopbridge binary (will raise if that fails)
- self._bridge_bin = await self._get_bridge_binary()
- # make sure that slimproto provider is loaded
- slimproto_prov: SlimprotoProvider = self.mass.get_provider("slimproto")
- assert slimproto_prov, "This provider depends on the SlimProto provider."
- # register as virtual provider on slimproto provider
- slimproto_prov.register_virtual_provider(
- "RaopBridge",
- self._handle_player_register_callback,
- self._handle_player_update_callback,
+ self._atv_players = {}
+ self._stream_tasks = {}
+ self._cliraop_bin = await self.get_cliraop_binary()
+ self.mass.create_task(self._run_discovery())
+ dacp_port = 49831
+ self.dacp_id = dacp_id = f"{randrange(2 ** 64):X}"
+ self.logger.debug("Starting DACP ActiveRemote %s on port %s", dacp_id, dacp_port)
+ await asyncio.start_server(self._handle_dacp_request, "0.0.0.0", dacp_port)
+ zeroconf_type = "_dacp._tcp.local."
+ server_id = f"iTunes_Ctrl_{dacp_id}.{zeroconf_type}"
+ info = ServiceInfo(
+ zeroconf_type,
+ name=server_id,
+ addresses=[await get_ip_pton(self.mass.streams.publish_ip)],
+ port=dacp_port,
+ properties={
+ "txtvers": "1",
+ "Ver": "63B5E5C0C201542E",
+ "DbId": "63B5E5C0C201542E",
+ "OSsi": "0x1F5",
+ },
+ server=f"{socket.gethostname()}.local",
)
- await self._check_config_xml()
- # start running the bridge
- asyncio.create_task(self._bridge_process_runner(slimproto_prov))
+ await self.mass.zeroconf.async_register_service(info)
+
+ async def _handle_dacp_request( # noqa: PLR0915
+ self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter
+ ) -> None:
+ """Handle new connection on the socket."""
+ try:
+ raw_request = b""
+ while recv := await reader.read(1024):
+ raw_request += recv
+ if len(recv) < 1024:
+ break
+
+ request = raw_request.decode("UTF-8")
+ headers_raw, body = request.split("\r\n\r\n", 1)
+ headers_raw = headers_raw.split("\r\n")
+ headers = {}
+ for line in headers_raw[1:]:
+ x, y = line.split(":", 1)
+ headers[x.strip()] = y.strip()
+ active_remote = headers.get("Active-Remote")
+ _, path, _ = headers_raw[0].split(" ")
+ atv_player = next(
+ (x for x in self._atv_players.values() if x.active_remote_id == active_remote), None
+ )
+ self.logger.debug(
+ "DACP request for %s (%s): %s -- %s",
+ atv_player.discovery_info.name if atv_player else "UNKNOWN PLAYER",
+ active_remote,
+ path,
+ body,
+ )
+ if not atv_player:
+ return
+
+ player_id = atv_player.player_id
+ if path == "/ctrl-int/1/nextitem":
+ self.mass.create_task(self.mass.player_queues.next(player_id))
+ elif path == "/ctrl-int/1/previtem":
+ self.mass.create_task(self.mass.player_queues.previous(player_id))
+ elif path == "/ctrl-int/1/play":
+ self.mass.create_task(self.mass.player_queues.play(player_id))
+ elif path == "/ctrl-int/1/playpause":
+ self.mass.create_task(self.mass.player_queues.play_pause(player_id))
+ elif path == "/ctrl-int/1/stop":
+ self.mass.create_task(self.cmd_stop(player_id))
+ elif path == "/ctrl-int/1/volumeup":
+ self.mass.create_task(self.mass.players.cmd_volume_up(player_id))
+ elif path == "/ctrl-int/1/volumedown":
+ self.mass.create_task(self.mass.players.cmd_volume_down(player_id))
+ elif path == "/ctrl-int/1/shuffle_songs":
+ queue = self.mass.player_queues.get(player_id)
+ self.mass.create_task(
+ self.mass.player_queues.set_shuffle(player_id, not queue.shuffle_enabled)
+ )
+ elif path in ("/ctrl-int/1/pause", "/ctrl-int/1/discrete-pause"):
+ self.mass.create_task(self.mass.player_queues.pause(player_id))
+ elif "dmcp.device-volume=" in path:
+ raop_volume = float(path.split("dmcp.device-volume=", 1)[-1])
+ volume = convert_airplay_volume(raop_volume)
+ if abs(volume - int(atv_player.atv.audio.volume)) > 2:
+ self.mass.create_task(self.cmd_volume_set(player_id, volume))
+ elif "dmcp.volume=" in path:
+ volume = int(path.split("dmcp.volume=", 1)[-1])
+ if abs(volume - int(atv_player.atv.audio.volume)) > 2:
+ self.mass.create_task(self.cmd_volume_set(player_id, volume))
+ else:
+ self.logger.warning(
+ "Unknown DACP request for %s: %s",
+ atv_player.discovery_info.name,
+ path,
+ )
- async def unload(self) -> None:
- """Handle close/cleanup of the provider."""
- self._closing = True
- if slimproto_prov := self.mass.get_provider("slimproto"):
- slimproto_prov.unregister_virtual_provider("RaopBridge")
- await self._stop_bridge()
+ # send response
+ date_str = utc().strftime("%a, %-d %b %Y %H:%M:%S")
+ response = (
+ f"HTTP/1.0 204 No Content\r\nDate: {date_str} "
+ "GMT\r\nDAAP-Server: iTunes/7.6.2 (Windows; N;)\r\nContent-Type: "
+ "application/x-dmap-tagged\r\nContent-Length: 0\r\n"
+ "Connection: close\r\n\r\n"
+ )
+ writer.write(response.encode())
+ await writer.drain()
+ finally:
+ writer.close()
async def get_player_config_entries(self, player_id: str) -> tuple[ConfigEntry, ...]:
"""Return all (provider/player specific) Config Entries for the given player (if any)."""
- slimproto_prov = self.mass.get_provider("slimproto")
- base_entries = await slimproto_prov.get_player_config_entries(player_id)
- return base_entries + PLAYER_CONFIG_ENTRIES
-
- def on_player_config_changed(self, config: PlayerConfig, changed_keys: set[str]) -> None:
- """Call (by config manager) when the configuration of a player changes."""
- super().on_player_config_changed(config, changed_keys)
- # forward to slimproto too
- slimproto_prov = self.mass.get_provider("slimproto")
- slimproto_prov.on_player_config_changed(config, changed_keys)
-
- async def update_config() -> None:
- # stop bridge (it will be auto restarted)
- if changed_keys.intersection(NEED_BRIDGE_RESTART):
- self.restart_bridge()
-
- asyncio.create_task(update_config())
-
- def on_player_config_removed(self, player_id: str) -> None:
- """Call (by config manager) when the configuration of a player is removed."""
- super().on_player_config_removed(player_id)
- self._removed_players.add(player_id)
- self.restart_bridge()
+ entries = await super().get_player_config_entries(player_id)
+ return entries + PLAYER_CONFIG_ENTRIES
async def cmd_stop(self, player_id: str) -> None:
- """Send STOP command to given player."""
- # simply forward to underlying slimproto player
- slimproto_prov = self.mass.get_provider("slimproto")
- await slimproto_prov.cmd_stop(player_id)
+ """Send STOP command to given player.
+
+ - player_id: player_id of the player to handle the command.
+ """
+ if stream_task := self._stream_tasks.pop(player_id, None):
+ if not stream_task.done():
+ stream_task.cancel()
+
+ # forward command to player and any connected sync members
+ async with asyncio.TaskGroup() as tg:
+ for atv_player in self._get_sync_clients(player_id):
+ tg.create_task(atv_player.stop())
async def cmd_play(self, player_id: str) -> None:
- """Send PLAY command to given player."""
- # simply forward to underlying slimproto player
- slimproto_prov = self.mass.get_provider("slimproto")
- await slimproto_prov.cmd_play(player_id)
+ """Send PLAY (unpause) command to given player.
+
+ - player_id: player_id of the player to handle the command.
+ """
+ # forward command to player and any connected sync members
+ async with asyncio.TaskGroup() as tg:
+ for atv_player in self._get_sync_clients(player_id):
+ if atv_player.cliraop_proc and not atv_player.cliraop_proc.closed:
+ # prefer interactive command to our streamer
+ tg.create_task(atv_player.send_cli_command("ACTION=PLAY"))
+ atv_player.optimistic_state = PlayerState.PLAYING
+ atv_player.update_attributes()
+ elif atv := atv_player.atv:
+ tg.create_task(atv.remote_control.play())
+
+ async def cmd_pause(self, player_id: str) -> None:
+ """Send PAUSE command to given player.
+
+ - player_id: player_id of the player to handle the command.
+ """
+ # forward command to player and any connected sync members
+ async with asyncio.TaskGroup() as tg:
+ for atv_player in self._get_sync_clients(player_id):
+ if atv_player.cliraop_proc and not atv_player.cliraop_proc.closed:
+ # prefer interactive command to our streamer
+ tg.create_task(atv_player.send_cli_command("ACTION=PAUSE"))
+ atv_player.optimistic_state = PlayerState.PAUSED
+ atv_player.update_attributes()
+ elif atv := atv_player.atv:
+ tg.create_task(atv.remote_control.pause())
async def play_media(
self,
- seek_position: Optional seek to this position.
- fade_in: Optionally fade in the item at playback start.
"""
- # simply forward to underlying slimproto player
- slimproto_prov = self.mass.get_provider("slimproto")
- await slimproto_prov.play_media(
- player_id,
- queue_item=queue_item,
- seek_position=seek_position,
- fade_in=fade_in,
- )
+ # stop existing streams first
+ await self.cmd_stop(player_id)
+ await self.cmd_power(player_id, True)
+ atv_player = self._atv_players[player_id]
+ player = self.mass.players.get(player_id)
+
+ if player.synced_to:
+ # should not happen, but just in case
+ raise RuntimeError("Player is synced")
+
+ # NOTE: Although the pyatv library is perfectly capable of playback
+ # to not only raop targets but also airplay 1 + 2, its not suitable
+ # for synced playback to multiple clients at once.
+ # Also the performance is horrible. Python is not suitable for realtime
+ # audio streaming.
+ # So, I've decided to a combined route here. I've created a small binary
+ # written in C based on libraop to do the actual timestamped playback.
+ # the raw pcm audio is fed to the stdin of this cliraop binary and we can
+ # send some commands over a named pipe.
+
+ # get current ntp before we start
+ _, stdout = await check_output(f"{self._cliraop_bin} -ntp")
+ ntp = int(stdout.strip())
+
+ # setup Raop process for player and its sync childs
+ async with asyncio.TaskGroup() as tg:
+ for atv_player in self._get_sync_clients(player_id):
+ if not atv_player.atv:
+ # just in case...
+ await atv_player.connect()
+ tg.create_task(self._init_cliraop(atv_player, ntp))
+
+ async def _streamer() -> None:
+ queue = self.mass.player_queues.get(queue_item.queue_id)
+ player.current_item_id = f"{queue_item.queue_id}.{queue_item.queue_item_id}"
+ player.elapsed_time = 0
+ player.elapsed_time_last_updated = time.time()
+ player.state = PlayerState.PLAYING
+ self.mass.players.register_or_update(player)
+ prev_metadata_checksum: str = ""
+ pcm_format = AudioFormat(
+ content_type=ContentType.PCM_S16LE,
+ sample_rate=44100,
+ bit_depth=16,
+ channels=2,
+ )
+ try:
+ async for pcm_chunk in self.mass.streams.get_flow_stream(
+ queue,
+ start_queue_item=queue_item,
+ pcm_format=pcm_format,
+ seek_position=seek_position,
+ fade_in=fade_in,
+ ):
+ # send metadata to player(s) if needed
+ # NOTE: this must all be done in separate tasks to not disturb audio
+ if queue and queue.current_item and queue.current_item.streamdetails:
+ metadata_checksum = (
+ queue.current_item.streamdetails.stream_title
+ or queue.current_item.queue_item_id
+ )
+ if prev_metadata_checksum != metadata_checksum:
+ prev_metadata_checksum = metadata_checksum
+ self.mass.create_task(self._send_metadata(player_id))
+ # send progress metadata
+ for atv_player in self._get_sync_clients(player_id):
+ self.mass.create_task(
+ atv_player.send_cli_command(f"PROGRESS={int(queue.elapsed_time)}\n")
+ )
+
+ # send audio chunk to player(s)
+ async with asyncio.TaskGroup() as tg:
+ available_clients = 0
+ for atv_player in self._get_sync_clients(player_id):
+ if not atv_player.cliraop_proc or atv_player.cliraop_proc.closed:
+ # this may not happen, but just in case
+ continue
+ available_clients += 1
+ tg.create_task(atv_player.cliraop_proc.write(pcm_chunk))
+ if not available_clients:
+ return
+
+ finally:
+ self.logger.debug("Streamer task ended for player %s", queue.display_name)
+ for atv_player in self._get_sync_clients(player_id):
+ if atv_player.cliraop_proc and not atv_player.cliraop_proc.closed:
+ atv_player.cliraop_proc.write_eof()
+
+ # start streaming the queue (pcm) audio in a background task
+ self._stream_tasks[player_id] = asyncio.create_task(_streamer())
async def play_stream(self, player_id: str, stream_job: MultiClientStreamJob) -> None:
"""Handle PLAY STREAM on given player.
This is a special feature from the Universal Group provider.
"""
- # simply forward to underlying slimproto player
- slimproto_prov = self.mass.get_provider("slimproto")
- await slimproto_prov.play_stream(player_id, stream_job)
-
- async def enqueue_next_queue_item(self, player_id: str, queue_item: QueueItem) -> None:
- """Handle enqueuing of the next queue item on the player."""
- # simply forward to underlying slimproto player
- slimproto_prov = self.mass.get_provider("slimproto")
- await slimproto_prov.enqueue_next_queue_item(player_id, queue_item)
-
- async def cmd_pause(self, player_id: str) -> None:
- """Send PAUSE command to given player."""
- # simply forward to underlying slimproto player
- slimproto_prov = self.mass.get_provider("slimproto")
- await slimproto_prov.cmd_pause(player_id)
+ raise NotImplementedError
async def cmd_power(self, player_id: str, powered: bool) -> None:
- """Send POWER command to given player."""
- # simply forward to underlying slimproto player
- slimproto_prov = self.mass.get_provider("slimproto")
- await slimproto_prov.cmd_power(player_id, powered)
+ """Send POWER command to given player.
+
+ - player_id: player_id of the player to handle the command.
+ - powered: bool if player should be powered on or off.
+ """
+ atv_player = self._atv_players[player_id]
+ mass_player = self.mass.players.get(player_id)
+ if powered:
+ await atv_player.connect()
+ elif not powered:
+ await self.cmd_stop(player_id)
+ await atv_player.disconnect()
+ mass_player.powered = powered
+ self.mass.players.update(player_id)
async def cmd_volume_set(self, player_id: str, volume_level: int) -> None:
- """Send VOLUME_SET command to given player."""
- # simply forward to underlying slimproto player
- slimproto_prov = self.mass.get_provider("slimproto")
- await slimproto_prov.cmd_volume_set(player_id, volume_level)
+ """Send VOLUME_SET command to given player.
- async def cmd_volume_mute(self, player_id: str, muted: bool) -> None:
- """Send VOLUME MUTE command to given player."""
- # simply forward to underlying slimproto player
- slimproto_prov = self.mass.get_provider("slimproto")
- await slimproto_prov.cmd_volume_mute(player_id, muted)
+ - player_id: player_id of the player to handle the command.
+ - volume_level: volume level (0..100) to set on the player.
+ """
+ atv_player = self._atv_players[player_id]
+ if atv_player.cliraop_proc:
+ # prefer interactive command to our streamer
+ await atv_player.send_cli_command(f"VOLUME={volume_level}")
+ elif atv := atv_player.atv:
+ await atv.audio.set_volume(volume_level)
async def cmd_sync(self, player_id: str, target_player: str) -> None:
- """Handle SYNC command for given player."""
- # simply forward to underlying slimproto player
- slimproto_prov = self.mass.get_provider("slimproto")
- await slimproto_prov.cmd_sync(player_id, target_player)
+ """Handle SYNC command for given player.
+
+ Join/add the given player(id) to the given (master) player/sync group.
+
+ - player_id: player_id of the player to handle the command.
+ - target_player: player_id of the syncgroup master or group player.
+ """
+ player = self.mass.players.get(player_id, raise_unavailable=True)
+ group_leader = self.mass.players.get(target_player, raise_unavailable=True)
+ if group_leader.synced_to:
+ raise RuntimeError("Player is already synced")
+ player.synced_to = target_player
+ group_leader.group_childs.add(player_id)
+ self.mass.players.update(target_player)
+ if group_leader.powered:
+ await self.cmd_power(player_id, True)
+ active_queue = self.mass.player_queues.get_active_queue(group_leader.player_id)
+ if active_queue.state == PlayerState.PLAYING:
+ self.mass.create_task(self.mass.player_queues.resume(active_queue.queue_id))
async def cmd_unsync(self, player_id: str) -> None:
- """Handle UNSYNC command for given player."""
- # simply forward to underlying slimproto player
- slimproto_prov = self.mass.get_provider("slimproto")
- await slimproto_prov.cmd_unsync(player_id)
-
- async def _handle_player_register_callback(self, player: Player) -> None:
- """Handle player register callback from slimproto source player."""
- # TODO: Can we get better device info from mDNS ?
- player.provider = self.instance_id
- player.device_info = DeviceInfo(
- model="Airplay device",
- address=player.device_info.address,
- manufacturer="Generic",
+ """Handle UNSYNC command for given player.
+
+ Remove the given player from any syncgroups it currently is synced to.
+
+ - player_id: player_id of the player to handle the command.
+ """
+ player = self.mass.players.get(player_id, raise_unavailable=True)
+ if not player.synced_to:
+ return
+ group_leader = self.mass.players.get(player.synced_to, raise_unavailable=True)
+ group_leader.group_childs.remove(player_id)
+ player.synced_to = None
+ if player.state == PlayerState.PLAYING:
+ await self.cmd_stop(player_id)
+ self.mass.players.update(player_id)
+
+ async def _run_discovery(self) -> None:
+ """Discover Airplay players on the network."""
+ if self._discovery_running:
+ return
+ try:
+ self._discovery_running = True
+ self.logger.debug("Airplay discovery started...")
+ discovered_devices = await scan(self.mass.loop, timeout=5)
+
+ if not discovered_devices:
+ self.logger.debug("No devices found")
+ return
+
+ for dev in discovered_devices:
+ self.mass.create_task(self._player_discovered(dev))
+
+ finally:
+ self._discovery_running = False
+
+ def reschedule():
+ self.mass.create_task(self._run_discovery())
+
+ # reschedule self once finished
+ self.mass.loop.call_later(300, reschedule)
+
+ async def _player_discovered(self, discovery_info: interface.BaseConfig) -> None:
+ """Handle discovered Airplay player on mdns."""
+ player_id = f"ap{discovery_info.identifier.lower().replace(':', '')}"
+ if player_id in self._atv_players:
+ atv_player = self._atv_players[player_id]
+ if discovery_info.address != atv_player.discovery_info.address:
+ atv_player.address_updated(discovery_info.address)
+ return
+ self.logger.info(f"Connecting to {discovery_info.address}")
+ self._atv_players[player_id] = atv_player = AirPlayPlayer(
+ self.mass, player_id, discovery_info
)
- player.supports_24bit = False
- # disable sonos by default
- if "sonos" in player.name.lower() or "rincon" in player.name.lower():
- player.enabled_by_default = False
-
- # extend info from the discovery xml
- async with aiofiles.open(self._config_file, "r") as _file:
- xml_data = await _file.read()
- with suppress(ET.ParseError):
- xml_root = ET.XML(xml_data)
- for device_elem in xml_root.findall("device"):
- player_id = device_elem.find("mac").text
- if player_id != player.player_id:
- continue
- # prefer name from UDN because default name is often wrong
- udn = device_elem.find("udn").text
- udn_name = udn.split("@")[1].split("._")[0]
- player.name = udn_name
- break
+ atv_player._setup_device()
+ for player in self.players:
+ player.can_sync_with = tuple(x for x in self._atv_players if x != player.player_id)
+ self.mass.players.update(player.player_id)
- def _handle_player_update_callback(self, player: Player) -> None:
- """Handle player update callback from slimproto source player."""
+ def _is_feature_available(
+ self, atv_player: interface.AppleTV, feature: interface.FeatureName
+ ) -> bool:
+ """Return if a feature is available."""
+ mass_player = self.mass.players.get(atv_player.device_info.output_device_id)
+ if atv_player and mass_player.state == PlayerState.PLAYING:
+ return atv_player.features.in_state(interface.FeatureState.Available, feature)
+ return False
- async def _get_bridge_binary(self):
- """Find the correct bridge binary belonging to the platform."""
+ async def get_cliraop_binary(self):
+ """Find the correct raop/airplay binary belonging to the platform."""
# ruff: noqa: SIM102
+ if self._cliraop_bin is not None:
+ return self._cliraop_bin
- async def check_bridge_binary(bridge_binary_path: str) -> str | None:
+ async def check_binary(cliraop_path: str) -> str | None:
try:
- bridge_binary = await asyncio.create_subprocess_exec(
- *[bridge_binary_path, "-t", "-x", self._config_file],
+ cliraop = await asyncio.create_subprocess_exec(
+ *[cliraop_path, "-check"],
stdout=asyncio.subprocess.PIPE,
+ stderr=asyncio.subprocess.STDOUT,
)
- stdout, _ = await bridge_binary.communicate()
- if (
- bridge_binary.returncode == 1
- and b"This program is free software: you can redistribute it and/or modify"
- in stdout
- ):
- self._bridge_bin = bridge_binary_path
- return bridge_binary_path
- except OSError as err:
- self.logger.exception(err)
+ stdout, _ = await cliraop.communicate()
+ stdout = stdout.strip().decode()
+ if cliraop.returncode == 0 and stdout == "cliraop check":
+ self._cliraop_bin = cliraop_path
+ return cliraop_path
+ except OSError:
return None
base_path = os.path.join(os.path.dirname(__file__), "bin")
-
- system = platform.system().lower()
+ system = platform.system().lower().replace("darwin", "macos")
architecture = platform.machine().lower()
- if bridge_binary := await check_bridge_binary(
- os.path.join(base_path, f"squeeze2raop-{system}-{architecture}-static")
+ if bridge_binary := await check_binary(
+ os.path.join(base_path, f"cliraop-{system}-{architecture}")
):
return bridge_binary
- msg = f"Unable to locate RaopBridge for {system}/{architecture}"
+ msg = f"Unable to locate RAOP Play binary for {system}/{architecture}"
raise RuntimeError(msg)
- async def _bridge_process_runner(self, slimproto_prov: SlimprotoProvider) -> None:
- """Run the bridge binary in the background."""
- self.logger.debug(
- "Starting Airplay bridge using config file %s",
- self._config_file,
+ def _get_sync_clients(self, player_id: str) -> list[AirPlayPlayer]:
+ """Get all sync clients for a player."""
+ mass_player = self.mass.players.get(player_id, True)
+ sync_clients: list[AirPlayPlayer] = []
+ # we need to return the player itself too
+ group_child_ids = {player_id}
+ group_child_ids.update(mass_player.group_childs)
+ for child_id in group_child_ids:
+ if client := self._atv_players.get(child_id):
+ sync_clients.append(client)
+ return sync_clients
+
+ async def _init_cliraop(self, atv_player: AirPlayPlayer, ntp: int) -> None:
+ """Initiatlize CLIRaop process for a player."""
+ stream: RaopStream | None = next(
+ (x for x in atv_player.atv.stream.instances if isinstance(x, RaopStream)), None
)
- conf_log_level = self.config.get_value(CONF_LOG_LEVEL)
- enable_debug_log = conf_log_level == "DEBUG"
- args = [
- self._bridge_bin,
- "-s",
- f"localhost:{slimproto_prov.port}",
- "-x",
- self._config_file,
- "-I",
- "-Z",
- "-d",
- f'all={"debug" if enable_debug_log else "warn"}',
- # filter out apple tv's for now until we fix auth
- "-m",
- "apple-tv,appletv",
- ]
- start_success = False
- while True:
- try:
- self._bridge_proc = await asyncio.create_subprocess_shell(
- " ".join(args),
- stdout=asyncio.subprocess.PIPE,
- stderr=asyncio.subprocess.STDOUT,
- )
- self._log_reader_task = asyncio.create_task(self._log_reader())
- await self._bridge_proc.wait()
- except Exception as err:
- if not start_success:
- raise
- self.logger.exception("Error in Airplay bridge", exc_info=err)
- if self._closing:
- break
- await asyncio.sleep(10)
+ if stream is None:
+ raise RuntimeError("RAOP Not available")
- async def _stop_bridge(self) -> None:
- """Stop the bridge process."""
- if self._bridge_proc:
- try:
- self.logger.info("Stopping bridge process...")
- self._bridge_proc.terminate()
- await self._bridge_proc.wait()
- self.logger.info("Bridge process stopped.")
- await asyncio.sleep(5)
- except ProcessLookupError:
- pass
- if self._log_reader_task and not self._log_reader_task.done():
- self._log_reader_task.cancel()
-
- async def _check_config_xml(self, recreate: bool = False) -> None:
- """Check the bridge config XML file."""
- # ruff: noqa: PLR0915
- if recreate or not os.path.isfile(self._config_file):
- if os.path.isfile(self._config_file):
- os.remove(self._config_file)
- # discover players and create default config file
- args = [
- self._bridge_bin,
- "-i",
- self._config_file,
- ]
- proc = await asyncio.create_subprocess_shell(
- " ".join(args),
- stdout=asyncio.subprocess.DEVNULL,
- stderr=asyncio.subprocess.DEVNULL,
+ async def log_watcher(cliraop_proc: AsyncProcess) -> None:
+ """Monitor stderr for a running CLIRaop process."""
+ mass_player = self.mass.players.get(atv_player.player_id)
+ logger = self.logger.getChild(atv_player.player_id)
+ async for line in cliraop_proc._proc.stderr:
+ line = line.decode().strip() # noqa: PLW2901
+ if "set pause" in line:
+ atv_player.optimistic_state = PlayerState.PAUSED
+ atv_player.update_attributes()
+ if "Restarted at" in line:
+ atv_player.optimistic_state = PlayerState.PLAYING
+ atv_player.update_attributes()
+ elif "after start), played" in line:
+ millis = int(line.split("played ")[1].split(" ")[0])
+ mass_player.elapsed_time = millis / 1000
+ mass_player.elapsed_time_last_updated = time.time()
+ else:
+ logger.debug(line)
+ # if we reach this point, the process exited
+ if cliraop_proc._proc.returncode is not None:
+ cliraop_proc.closed = True
+ logger.debug(
+ "CLIRaop process stopped with errorcode %s",
+ cliraop_proc._proc.returncode,
)
- await proc.wait()
- # read xml file's data
- async with aiofiles.open(self._config_file, "r") as _file:
- xml_data = await _file.read()
+ extra_args = []
+ latency = self.mass.config.get_raw_player_config_value(
+ atv_player.player_id, CONF_LATENCY, 1200
+ )
+ extra_args += ["-l", str(latency)]
+ if self.mass.config.get_raw_player_config_value(
+ atv_player.player_id, CONF_ENCRYPTION, False
+ ):
+ extra_args += ["-et"]
+ if self.mass.config.get_raw_player_config_value(
+ atv_player.player_id, CONF_ALAC_ENCODE, False
+ ):
+ extra_args += ["-a"]
+ if self.mass.config.get_raw_player_config_value(
+ atv_player.player_id, CONF_VOLUME_START, False
+ ):
+ extra_args += ["-v", str(int(atv_player.atv.audio.volume))]
+ sync_adjust = self.mass.config.get_raw_player_config_value(
+ atv_player.player_id, CONF_SYNC_ADJUST, 0
+ )
- try:
- xml_root = ET.XML(xml_data)
- except ET.ParseError:
- if recreate:
- raise
- await self._check_config_xml(True)
- return
+ atv_player.optimistic_state = PlayerState.PLAYING
+ # always generate a new active remote id to prevent race conditions
+ # with the named pipe used to send commands
+ atv_player.active_remote_id = str(randint(1000, 8000))
+ args = [
+ self._cliraop_bin,
+ "-n",
+ str(ntp),
+ "-p",
+ str(stream.core.service.port),
+ "-w",
+ str(2000 + sync_adjust),
+ *extra_args,
+ "-dacp",
+ self.dacp_id,
+ "-ar",
+ atv_player.active_remote_id,
+ "-md",
+ atv_player.discovery_info.properties["_raop._tcp.local"]["md"],
+ "-et",
+ atv_player.discovery_info.properties["_raop._tcp.local"]["et"],
+ str(atv_player.discovery_info.address),
+ "-",
+ ]
+ if platform.system() == "Darwin":
+ os.environ["DYLD_LIBRARY_PATH"] = "/usr/local/lib"
+ atv_player.cliraop_proc = AsyncProcess(
+ args, enable_stdin=True, enable_stdout=False, enable_stderr=True
+ )
+ await atv_player.cliraop_proc.start()
+ # send empty command to unblock named pipe
+ await atv_player.send_cli_command("\n")
+ atv_player.cliraop_proc.attach_task(log_watcher(atv_player.cliraop_proc))
- # set common/global values
- common_elem = xml_root.find("common")
- for key, value in {
- "codecs": "flc,pcm",
- "sample_rate": "44100",
- "resample": "0",
- }.items():
- xml_elem = common_elem.find(key)
- xml_elem.text = value
-
- # default values for players
- for conf_entry in PLAYER_CONFIG_ENTRIES:
- if conf_entry.type == ConfigEntryType.LABEL:
- continue
- conf_val = conf_entry.default_value
- xml_elem = common_elem.find(conf_entry.key)
- if xml_elem is None:
- xml_elem = ET.SubElement(common_elem, conf_entry.key)
- if conf_entry.type == ConfigEntryType.BOOLEAN:
- xml_elem.text = "1" if conf_val else "0"
+ async def _send_metadata(self, player_id: str) -> None:
+ """Send metadata to player (and connected sync childs)."""
+ queue = self.mass.player_queues.get_active_queue(player_id)
+ if not queue or not queue.current_item:
+ return
+ duration = min(queue.current_item.duration or 0, 3600)
+ title = queue.current_item.name
+ artist = ""
+ album = ""
+ if queue.current_item.streamdetails and queue.current_item.streamdetails.stream_title:
+ # stream title from radio station
+ stream_title = queue.current_item.streamdetails.stream_title
+ if " - " in stream_title:
+ artist, title = stream_title.split(" - ", 1)
else:
- xml_elem.text = str(conf_val)
-
- # get/set all device configs
- for device_elem in xml_root.findall("device"):
- player_id = device_elem.find("mac").text
- if player_id in self._removed_players:
- xml_root.remove(device_elem)
- self._removed_players.remove(player_id)
- continue
- # use raw config values because players are not
- # yet available at startup/init (race condition)
- raw_player_conf = self.mass.config.get(f"{CONF_PLAYERS}/{player_id}")
- if not raw_player_conf:
- continue
- device_elem.find("enabled").text = "1" if raw_player_conf["enabled"] else "0"
-
- # set some values that are not (yet) configurable
- for key, value in {
- "player_volume": "-1",
- "prevent_playback": "off",
- }.items():
- xml_elem = device_elem.find(key)
- if xml_elem is None:
- xml_elem = ET.SubElement(device_elem, key)
- xml_elem.text = value
-
- # set values based on config entries
- for conf_entry in PLAYER_CONFIG_ENTRIES:
- if conf_entry.type == ConfigEntryType.LABEL:
- continue
- conf_val = raw_player_conf["values"].get(conf_entry.key, conf_entry.default_value)
- xml_elem = device_elem.find(conf_entry.key)
- if xml_elem is None:
- xml_elem = ET.SubElement(device_elem, conf_entry.key)
- if conf_entry.type == ConfigEntryType.BOOLEAN:
- xml_elem.text = "1" if conf_val else "0"
- else:
- xml_elem.text = str(conf_val)
-
- # save config file
- async with aiofiles.open(self._config_file, "w") as _file:
- xml_str = ET.tostring(xml_root)
- await _file.write(xml_str.decode())
-
- async def _log_reader(self) -> None:
- """Read log output from bridge process."""
- bridge_logger = self.logger.getChild("squeeze2raop")
- while self._bridge_proc.returncode is None:
- async for line in self._bridge_proc.stdout:
- bridge_logger.debug(line.decode().strip())
-
- def restart_bridge(self) -> None:
- """Schedule restart of bridge process."""
- if self._timer_handle is not None:
- self._timer_handle.cancel()
- self._timer_handle = None
-
- async def restart_bridge() -> None:
- self.logger.info("Restarting Airplay bridge (due to config changes)")
- await self._stop_bridge()
- await self._check_config_xml()
-
- # schedule the action for later
- self._timer_handle = self.mass.loop.call_later(10, self.mass.create_task, restart_bridge)
+ title = stream_title
+ # set album to radio station name
+ album = queue.current_item.name
+ if media_item := queue.current_item.media_item:
+ if artist_str := getattr(media_item, "artist_str", None):
+ artist = artist_str
+ if _album := getattr(media_item, "album", None):
+ album = _album.name
+
+ cmd = f"TITLE={title or 'Music Assistant'}\nARTIST={artist}\nALBUM={album}\n"
+ cmd += f"DURATION={duration}\nACTION=SENDMETA\n"
+
+ async with asyncio.TaskGroup() as tg:
+ for atv_player in self._get_sync_clients(player_id):
+ tg.create_task(atv_player.send_cli_command(cmd))
+
+ # get image
+ if not queue.current_item.image:
+ return
+ image_path = create_tempfile()
+ image_data = await self.mass.metadata.get_thumbnail(
+ queue.current_item.image.path,
+ 512,
+ queue.current_item.image.provider,
+ )
+ async with aiofiles.open(image_path.name, "wb") as outfile:
+ await outfile.write(image_data)
+ async with asyncio.TaskGroup() as tg:
+ for atv_player in self._get_sync_clients(player_id):
+ if image_path:
+ tg.create_task(atv_player.send_cli_command(f"ARTWORK={image_path.name}\n"))
+ # make sure the temp file gets deleted again
+ await asyncio.sleep(5)
+ image_path.close()