From: Marcel van der Veldt Date: Sun, 18 Feb 2024 01:09:30 +0000 (+0100) Subject: Replace Airplay provider (#1084) X-Git-Url: https://git.kitaultman.com/?a=commitdiff_plain;h=921650b164e55ed3fde4b6dc4b4ff86bf5af8cde;p=music-assistant-server.git Replace Airplay provider (#1084) --- diff --git a/music_assistant/common/models/media_items.py b/music_assistant/common/models/media_items.py index 5085916f..c8b08a87 100644 --- a/music_assistant/common/models/media_items.py +++ b/music_assistant/common/models/media_items.py @@ -9,11 +9,7 @@ from typing import Any, Self from mashumaro import DataClassDictMixin from music_assistant.common.helpers.uri import create_uri -from music_assistant.common.helpers.util import ( - create_sort_name, - is_valid_uuid, - merge_lists, -) +from music_assistant.common.helpers.util import create_sort_name, is_valid_uuid, merge_lists from music_assistant.common.models.enums import ( AlbumType, ContentType, @@ -355,6 +351,11 @@ class Track(MediaItem): return self.album.image return super().image + @property + def artist_str(self) -> str: + """Return (combined) artist string for track.""" + return "/".join(x.name for x in self.artists) + @dataclass(kw_only=True) class AlbumTrack(Track): diff --git a/music_assistant/server/controllers/players.py b/music_assistant/server/controllers/players.py index 671f06bc..04c7ebdd 100644 --- a/music_assistant/server/controllers/players.py +++ b/music_assistant/server/controllers/players.py @@ -401,15 +401,6 @@ class PlayerController(CoreController): if player.powered == powered: return # nothing to do - # inform (active) group player if needed - # NOTE: this must be on the top to prevent race conditions - if active_group_player := self._get_active_player_group(player): - if active_group_player.player_id.startswith(SYNCGROUP_PREFIX): - self._on_syncgroup_child_power( - active_group_player.player_id, player.player_id, powered - ) - elif player_prov := self.get_player_provider(active_group_player.player_id): - player_prov.on_child_power(active_group_player.player_id, player.player_id, powered) # stop player at power off if ( not powered @@ -437,8 +428,16 @@ class PlayerController(CoreController): # as fast as possible and prevent race conditions player.powered = powered self.update(player_id) + # handle actions when a syncgroup child turns on + if active_group_player := self._get_active_player_group(player): + if active_group_player.player_id.startswith(SYNCGROUP_PREFIX): + self._on_syncgroup_child_power( + active_group_player.player_id, player.player_id, powered + ) + elif player_prov := self.get_player_provider(active_group_player.player_id): + player_prov.on_child_power(active_group_player.player_id, player.player_id, powered) # handle 'auto play on power on' feature - if ( + elif ( powered and self.mass.config.get_raw_player_config_value(player_id, CONF_AUTO_PLAY, False) and player.active_source in (None, player_id) @@ -899,6 +898,16 @@ class PlayerController(CoreController): continue elif child_player.group_childs: return child_player + # select new sync leader: return the first playing player + for child_player in self.iter_group_members( + group_player, only_powered=True, only_playing=True + ): + return child_player + # fallback select new sync leader: return the first powered player + for child_player in self.iter_group_members( + group_player, only_powered=True, only_playing=False + ): + return child_player return None async def _sync_syncgroup(self, player_id: str) -> None: diff --git a/music_assistant/server/helpers/process.py b/music_assistant/server/helpers/process.py index 3c725296..ea6af71e 100644 --- a/music_assistant/server/helpers/process.py +++ b/music_assistant/server/helpers/process.py @@ -42,6 +42,15 @@ class AsyncProcess: async def __aenter__(self) -> AsyncProcess: """Enter context manager.""" + await self.start() + return self + + async def __aexit__(self, exc_type, exc_value, traceback) -> bool: + """Exit context manager.""" + await self.close() + + async def start(self) -> None: + """Perform Async init of process.""" self._proc = await asyncio.create_subprocess_exec( *self._args, stdin=asyncio.subprocess.PIPE if self._enable_stdin else None, @@ -49,25 +58,6 @@ class AsyncProcess: stderr=asyncio.subprocess.PIPE if self._enable_stderr else None, close_fds=True, ) - return self - - async def __aexit__(self, exc_type, exc_value, traceback) -> bool: - """Exit context manager.""" - self.closed = True - # make sure the process is cleaned up - self.write_eof() - if self._proc.returncode is None: - try: - async with asyncio.timeout(10): - await self._proc.communicate() - except TimeoutError: - self._proc.kill() - await self._proc.communicate() - if self._proc.returncode is None: - self._proc.kill() - if self._attached_task and not self._attached_task.done(): - with suppress(asyncio.CancelledError): - self._attached_task.cancel() async def iter_chunked(self, n: int = DEFAULT_CHUNKSIZE) -> AsyncGenerator[bytes, None]: """Yield chunks of n size from the process stdout.""" @@ -113,6 +103,8 @@ class AsyncProcess: def write_eof(self) -> None: """Write end of file to to process stdin.""" + if not self._enable_stdin: + return if self.closed or self._proc.stdin.is_closing(): return try: @@ -128,6 +120,30 @@ class AsyncProcess: # already exited, race condition pass + async def close(self) -> None: + """Close/terminate the process.""" + self.closed = True + if self._attached_task and not self._attached_task.done(): + with suppress(asyncio.CancelledError): + self._attached_task.cancel() + # make sure the process is cleaned up + self.write_eof() + if self._proc.returncode is None: + try: + async with asyncio.timeout(10): + await self._proc.communicate() + except TimeoutError: + self._proc.kill() + await self.wait() + + async def wait(self) -> int: + """Wait for the process and return the returncode.""" + if self._proc.returncode is not None: + return self._proc.returncode + exitcode = await self._proc.wait() + self.closed = True + return exitcode + async def communicate(self, input_data: bytes | None = None) -> tuple[bytes, bytes]: """Write bytes to process and read back results.""" return await self._proc.communicate(input_data) diff --git a/music_assistant/server/providers/airplay/__init__.py b/music_assistant/server/providers/airplay/__init__.py index 2a82348c..a2738706 100644 --- a/music_assistant/server/providers/airplay/__init__.py +++ b/music_assistant/server/providers/airplay/__init__.py @@ -1,52 +1,68 @@ -"""Airplay Player provider. - -This is more like a "virtual" player provider, running on top of slimproto. -It uses the amazing work of Philippe44 who created a bridge from airplay to slimproto. -https://github.com/philippe44/LMS-Raop -""" - +"""Airplay Player provider for Music Assistant.""" from __future__ import annotations import asyncio import os import platform -import xml.etree.ElementTree as ET # noqa: N817 -from contextlib import suppress -from typing import TYPE_CHECKING +import socket +import time +from random import randint, randrange +from typing import TYPE_CHECKING, cast import aiofiles +from pyatv import connect, exceptions, interface, scan +from pyatv.conf import AppleTV as ATVConf +from pyatv.const import DeviceModel, DeviceState, PowerState, Protocol +from pyatv.convert import model_str +from pyatv.interface import AppleTV as AppleTVInterface +from pyatv.interface import DeviceListener +from pyatv.protocols.raop import RaopStream +from zeroconf import ServiceInfo -from music_assistant.common.models.config_entries import ConfigEntry, ConfigValueType -from music_assistant.common.models.enums import ConfigEntryType, ProviderFeature +from music_assistant.common.helpers.datetime import utc +from music_assistant.common.helpers.util import get_ip_pton +from music_assistant.common.models.config_entries import ( + CONF_ENTRY_CROSSFADE, + CONF_ENTRY_CROSSFADE_DURATION, + ConfigEntry, + ConfigValueType, +) +from music_assistant.common.models.enums import ( + ConfigEntryType, + ContentType, + PlayerFeature, + PlayerState, + PlayerType, + ProviderFeature, +) +from music_assistant.common.models.media_items import AudioFormat from music_assistant.common.models.player import DeviceInfo, Player -from music_assistant.constants import CONF_LOG_LEVEL, CONF_PLAYERS +from music_assistant.server.helpers.process import AsyncProcess, check_output +from music_assistant.server.helpers.util import create_tempfile from music_assistant.server.models.player_provider import PlayerProvider if TYPE_CHECKING: - from music_assistant.common.models.config_entries import PlayerConfig, ProviderConfig + from music_assistant.common.models.config_entries import ProviderConfig from music_assistant.common.models.provider import ProviderManifest from music_assistant.common.models.queue_item import QueueItem from music_assistant.server import MusicAssistant from music_assistant.server.controllers.streams import MultiClientStreamJob from music_assistant.server.models import ProviderInstanceType - from music_assistant.server.providers.slimproto import SlimprotoProvider - +CONF_LATENCY = "latency" +CONF_ENCRYPTION = "encryption" +CONF_ALAC_ENCODE = "alac_encode" +CONF_VOLUME_START = "volume_start" +CONF_SYNC_ADJUST = "sync_adjust" PLAYER_CONFIG_ENTRIES = ( + CONF_ENTRY_CROSSFADE, + CONF_ENTRY_CROSSFADE_DURATION, ConfigEntry( - key="airplay_header", - type=ConfigEntryType.DIVIDER, - label="Airplay specific settings", - description="Configure Airplay specific settings. " - "Note that changing any airplay specific setting, will reconnect all players.", - advanced=True, - ), - ConfigEntry( - key="read_ahead", + key=CONF_LATENCY, type=ConfigEntryType.INTEGER, range=(200, 3000), - default_value=1000, - label="Read ahead buffer", + default_value=1200, + label="Latency", description="Sets the number of milliseconds of audio buffer in the player. " "This is important to absorb network throughput jitter. " "Note that the resume after pause will be skipping that amount of time " @@ -54,7 +70,7 @@ PLAYER_CONFIG_ENTRIES = ( advanced=True, ), ConfigEntry( - key="encryption", + key=CONF_ENCRYPTION, type=ConfigEntryType.BOOLEAN, default_value=False, label="Enable encryption", @@ -63,7 +79,7 @@ PLAYER_CONFIG_ENTRIES = ( advanced=True, ), ConfigEntry( - key="alac_encode", + key=CONF_ALAC_ENCODE, type=ConfigEntryType.BOOLEAN, default_value=True, label="Enable compression", @@ -72,27 +88,30 @@ PLAYER_CONFIG_ENTRIES = ( advanced=True, ), ConfigEntry( - key="remove_timeout", + key=CONF_VOLUME_START, + type=ConfigEntryType.BOOLEAN, + default_value=False, + label="Send volume at playback start", + description="Some players require to send/confirm the volume when playback starts. \n" + "Enable this setting if the playback volume does not match the MA interface.", + advanced=True, + ), + ConfigEntry( + key=CONF_SYNC_ADJUST, type=ConfigEntryType.INTEGER, + range=(-500, 500), default_value=0, - range=(-1, 3600), - label="Remove timeout", - description="Player discovery is managed using mDNS protocol, " - "which means that a player sends regular keep-alive messages and a bye when " - "disconnecting. Some faulty mDNS stack implementations (e.g. Riva) do not always" - "send keep-alive messages, so the Airplay bridge is disconnecting them regularly. \n\n" - "As a workaround, a timer can be set so that the bridge does not immediately remove " - "the player from LMS when missing a keep-alive, waiting for it to reconnect. \n\n\n" - "A value of -1 will disable this feature and never remove the player. \n\n" - "A value of 0 (the default) disabled the player when keep-alive is missed or " - "when a bye message is received. \n\n" - "Any other value means to disable the player after missing keep-alive for " - "this number of seconds.", + label="Audio synchronization delay correction", + description="If this player is playing audio synced with other players " + "and you always hear the audio too early or late on this player, " + "you can shift the audio a bit.", advanced=True, ), ) +BACKOFF_TIME_LOWER_LIMIT = 15 # seconds +BACKOFF_TIME_UPPER_LIMIT = 300 # Five minutes -NEED_BRIDGE_RESTART = {"values/read_ahead", "values/encryption", "values/alac_encode", "enabled"} +CONF_CREDENTIALS = "credentials" async def setup( @@ -121,88 +140,505 @@ async def get_config_entries( return () # we do not have any config entries (yet) +def convert_airplay_volume(value: float) -> int: + """Remap Airplay Volume to 0..100 scale.""" + airplay_min = -30 + airplay_max = 0 + normal_min = 0 + normal_max = 100 + portion = (value - airplay_min) * (normal_max - normal_min) / (airplay_max - airplay_min) + return int(portion + normal_min) + + +class AirPlayPlayer(DeviceListener): + """Holds the connection to the apyatv instance and the cliraop.""" + + def __init__( + self, mass: MusicAssistant, player_id: str, discovery_info: interface.BaseConfig + ) -> None: + """Initialize power manager.""" + self.player_id = player_id + self.discovery_info = discovery_info + self.mass = mass + self.atv: AppleTVInterface | None = None + self.is_on = False + self.connected = False + self._connection_attempts = 0 + self._connection_was_lost = False + self._task = None + self._playing: interface.Playing | None = None + self.logger = self.mass.players.logger.getChild("airplay").getChild(self.player_id) + self.cliraop_proc: AsyncProcess | None = None + self.active_remote_id = str(randint(1000, 8000)) + self.optimistic_state: PlayerState = PlayerState.IDLE + + def connection_lost(self, _): + """Device was unexpectedly disconnected. + + This is a callback function from pyatv.interface.DeviceListener. + """ + self.logger.warning('Connection lost to Apple TV "%s"', self.discovery_info.name) + self._connection_was_lost = True + self._handle_disconnect() + + def connection_closed(self): + """Device connection was (intentionally) closed. + + This is a callback function from pyatv.interface.DeviceListener. + """ + self.connected = False + self._handle_disconnect() + + def _handle_disconnect(self): + """Handle that the device disconnected and restart connect loop.""" + self.connected = False + if self.atv: + self.atv.close() + self.atv = None + self._start_connect_loop() + + async def connect(self): + """Connect to device.""" + self.is_on = True + if self.connected: + return + self._start_connect_loop() + await asyncio.sleep(2) + + async def disconnect(self): + """Disconnect from device.""" + self.logger.debug("Disconnecting from device") + self.is_on = False + self.connected = False + try: + if self.atv: + self.atv.close() + self.atv = None + if self._task: + self._task.cancel() + self._task = None + except Exception: # pylint: disable=broad-except + self.logger.exception("An error occurred while disconnecting") + + async def stop(self): + """Stop playback and cleanup any running CLIRaop Process.""" + if self.cliraop_proc and not self.cliraop_proc.closed: + # prefer interactive command to our streamer + await self.send_cli_command("ACTION=STOP") + await self.cliraop_proc.wait() + self.optimistic_state = PlayerState.IDLE + self.update_attributes() + elif atv := self.atv: + await atv.remote_control.stop() + + async def send_cli_command(self, command: str) -> None: + """Send an interactive command to the running CLIRaop binary.""" + if not self.cliraop_proc or self.cliraop_proc.closed: + return + + named_pipe = f"/tmp/fifo-{self.active_remote_id}" # noqa: S108 + if not command.endswith("\n"): + command += "\n" + + def send_data(): + with open(named_pipe, "w") as f: + f.write(command) + f.flush() + + self.logger.debug("sending command %s", command) + await self.mass.create_task(send_data) + + def _start_connect_loop(self): + """Start background connect loop to device.""" + if not self._task and self.atv is None and self.is_on: + self._task = asyncio.create_task(self._connect_loop()) + else: + self.logger.debug("Not starting connect loop (%s, %s)", self.atv is None, self.is_on) + + async def connect_once(self) -> None: + """Try to connect once.""" + try: + if conf := await self._scan(): + await self._connect(conf) + except exceptions.AuthenticationError: + await self.disconnect() + self.logger.exception( + "Authentication failed for %s, try reconfiguring device", + self.discovery_info.name, + ) + return + except asyncio.CancelledError: + pass + except Exception: # pylint: disable=broad-except + self.logger.exception("Failed to connect") + self.atv = None + + async def _connect_loop(self): + """Connect loop background task function.""" + self.logger.debug("Starting connect loop") + + # Try to find device and connect as long as the user has said that + # we are allowed to connect and we are not already connected. + while self.is_on and self.atv is None: + await self.connect_once() + if self.atv is not None: + break + self._connection_attempts += 1 + backoff = min( + max( + BACKOFF_TIME_LOWER_LIMIT, + randrange(2**self._connection_attempts), + ), + BACKOFF_TIME_UPPER_LIMIT, + ) + + self.logger.debug("Reconnecting in %d seconds", backoff) + await asyncio.sleep(backoff) + + self.logger.debug("Connect loop ended") + self._task = None + + async def _scan(self) -> ATVConf | None: + """Try to find device by scanning for it.""" + address: str = self.discovery_info.address + + self.logger.debug("Discovering device %s", self.discovery_info.name) + atvs = await scan( + self.mass.loop, + identifier=self.discovery_info.identifier, + hosts=[address], + ) + if atvs: + return cast(ATVConf, atvs[0]) + + self.logger.debug( + "Failed to find device %s with address %s", + self.discovery_info.name, + address, + ) + # We no longer multicast scan for the device since as soon as async_step_zeroconf runs, + # it will update the address and reload the config entry when the device is found. + return None + + async def _connect(self, conf: ATVConf) -> None: + """Connect to device.""" + credentials: dict[int, str | None] = self.mass.config.get_raw_player_config_value( + self.player_id, CONF_CREDENTIALS, {} + ) + name: str = self.discovery_info.name + missing_protocols = [] + for protocol_int, creds in credentials.items(): + protocol = Protocol(int(protocol_int)) + if conf.get_service(protocol) is not None: + conf.set_credentials(protocol, creds) # type: ignore[arg-type] + else: + missing_protocols.append(protocol.name) + + if missing_protocols: + missing_protocols_str = ", ".join(missing_protocols) + self.logger.info( + "Protocol(s) %s not yet found for %s, trying later", + missing_protocols_str, + name, + ) + return + + self.logger.debug("Connecting to device %s", name) + session = self.mass.http_session + + self.atv = await connect(conf, self.mass.loop, session=session) + self.connected = True + self.atv.power.listener = self + self.atv.listener = self + self.atv.audio.listener = self + if self.atv.features.in_state( + interface.FeatureState.Available, interface.FeatureName.PushUpdates + ): + self.atv.push_updater.listener = self + self.atv.push_updater.start() + + self.address_updated(str(conf.address)) + + self._setup_device() + self.update_attributes() + + self._connection_attempts = 0 + if self._connection_was_lost: + self.logger.info( + 'Connection was re-established to device "%s"', + name, + ) + self._connection_was_lost = False + + def _setup_device(self): + if not (mass_player := self.mass.players.get(self.player_id)): + mass_player = Player( + player_id=self.player_id, + provider="airplay", + type=PlayerType.PLAYER, + name=self.discovery_info.name, + available=True, + powered=False, + device_info=DeviceInfo( + model=self.discovery_info.device_info.raw_model, + manufacturer="Apple", + address=str(self.discovery_info.address), + ), + supported_features=( + PlayerFeature.PAUSE, + PlayerFeature.SYNC, + PlayerFeature.VOLUME_SET, + PlayerFeature.POWER, + ), + max_sample_rate=44100, + supports_24bit=False, + ) + if self.atv: + dev_info = self.atv.device_info + mass_player.device_info = DeviceInfo( + model=( + dev_info.raw_model + if dev_info.model == DeviceModel.Unknown and dev_info.raw_model + else model_str(dev_info.model) + ), + manufacturer="Apple", + address=str(self.discovery_info.address), + ) + self.mass.players.register_or_update(mass_player) + + def playstatus_update(self, _, playstatus: interface.Playing) -> None: + """Inform about changes to what is currently playing.""" + self.logger.debug("Playstatus received: %s", playstatus) + self._playing = playstatus + self.update_attributes() + + def playstatus_error(self, updater, exception: Exception) -> None: + """Inform about an error when updating play status.""" + self.logger.debug("Playstatus error received", exc_info=exception) + self._playing = None + self.update_attributes() + + def powerstate_update(self, old_state: PowerState, new_state: PowerState) -> None: + """Update power state when it changes.""" + self.update_attributes() + + def volume_update(self, old_level: float, new_level: float) -> None: + """Update volume when it changes.""" + self.update_attributes() + + def update_attributes(self) -> None: + """Update the player attributes.""" + mass_player = self.mass.players.get(self.player_id) + mass_player.volume_level = int(self.atv.audio.volume) + mass_player.powered = self.is_on + if self.cliraop_proc and not self.cliraop_proc.closed: + mass_player.state = self.optimistic_state + # NOTE: alapsed time is pushed from cliraop + elif self.atv is None or not self.connected: + mass_player.powered = False + mass_player.state = PlayerState.IDLE + elif self._playing: + state = self._playing.device_state + if state in (DeviceState.Idle, DeviceState.Loading): + mass_player.state = PlayerState.IDLE + elif state == DeviceState.Playing: + mass_player.state = PlayerState.PLAYING + elif state in (DeviceState.Paused, DeviceState.Seeking, DeviceState.Stopped): + mass_player.state = PlayerState.PAUSED + else: + mass_player.state = PlayerState.IDLE + mass_player.elapsed_time = self._playing.position or 0 + mass_player.elapsed_time_last_updated = time.time() + mass_player.current_item_id = self._playing.content_identifier + else: + mass_player.state = PlayerState.IDLE + self.mass.players.update(self.player_id) + + @property + def is_connecting(self): + """Return true if connection is in progress.""" + return self._task is not None + + def address_updated(self, address): + """Update cached address in config entry.""" + self.logger.debug("Changing address to %s", address) + self._setup_device() + + class AirplayProvider(PlayerProvider): - """Player provider for Airplay based players, using the slimproto bridge.""" + """Player provider for Airplay based players.""" - _bridge_bin: str | None = None - _bridge_proc: asyncio.subprocess.Process | None = None - _timer_handle: asyncio.TimerHandle | None = None - _closing: bool = False - _config_file: str | None = None - _log_reader_task: asyncio.Task | None = None - _removed_players: set[str] | None = None + _atv_players: dict[str, AirPlayPlayer] + _discovery_running: bool = False + _cliraop_bin: str | None = None + _stream_tasks: dict[str, asyncio.Task] @property def supported_features(self) -> tuple[ProviderFeature, ...]: """Return the features supported by this Provider.""" - # for now do not allow creation of airplay groups - # in preparation of new airplay provider coming up soon - # return (ProviderFeature.SYNC_PLAYERS,) - return () + return (ProviderFeature.SYNC_PLAYERS,) async def handle_setup(self) -> None: """Handle async initialization of the provider.""" - self._removed_players = set() - self._config_file = os.path.join(self.mass.storage_path, "airplay_bridge.xml") - # locate the raopbridge binary (will raise if that fails) - self._bridge_bin = await self._get_bridge_binary() - # make sure that slimproto provider is loaded - slimproto_prov: SlimprotoProvider = self.mass.get_provider("slimproto") - assert slimproto_prov, "This provider depends on the SlimProto provider." - # register as virtual provider on slimproto provider - slimproto_prov.register_virtual_provider( - "RaopBridge", - self._handle_player_register_callback, - self._handle_player_update_callback, + self._atv_players = {} + self._stream_tasks = {} + self._cliraop_bin = await self.get_cliraop_binary() + self.mass.create_task(self._run_discovery()) + dacp_port = 49831 + self.dacp_id = dacp_id = f"{randrange(2 ** 64):X}" + self.logger.debug("Starting DACP ActiveRemote %s on port %s", dacp_id, dacp_port) + await asyncio.start_server(self._handle_dacp_request, "0.0.0.0", dacp_port) + zeroconf_type = "_dacp._tcp.local." + server_id = f"iTunes_Ctrl_{dacp_id}.{zeroconf_type}" + info = ServiceInfo( + zeroconf_type, + name=server_id, + addresses=[await get_ip_pton(self.mass.streams.publish_ip)], + port=dacp_port, + properties={ + "txtvers": "1", + "Ver": "63B5E5C0C201542E", + "DbId": "63B5E5C0C201542E", + "OSsi": "0x1F5", + }, + server=f"{socket.gethostname()}.local", ) - await self._check_config_xml() - # start running the bridge - asyncio.create_task(self._bridge_process_runner(slimproto_prov)) + await self.mass.zeroconf.async_register_service(info) + + async def _handle_dacp_request( # noqa: PLR0915 + self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter + ) -> None: + """Handle new connection on the socket.""" + try: + raw_request = b"" + while recv := await reader.read(1024): + raw_request += recv + if len(recv) < 1024: + break + + request = raw_request.decode("UTF-8") + headers_raw, body = request.split("\r\n\r\n", 1) + headers_raw = headers_raw.split("\r\n") + headers = {} + for line in headers_raw[1:]: + x, y = line.split(":", 1) + headers[x.strip()] = y.strip() + active_remote = headers.get("Active-Remote") + _, path, _ = headers_raw[0].split(" ") + atv_player = next( + (x for x in self._atv_players.values() if x.active_remote_id == active_remote), None + ) + self.logger.debug( + "DACP request for %s (%s): %s -- %s", + atv_player.discovery_info.name if atv_player else "UNKNOWN PLAYER", + active_remote, + path, + body, + ) + if not atv_player: + return + + player_id = atv_player.player_id + if path == "/ctrl-int/1/nextitem": + self.mass.create_task(self.mass.player_queues.next(player_id)) + elif path == "/ctrl-int/1/previtem": + self.mass.create_task(self.mass.player_queues.previous(player_id)) + elif path == "/ctrl-int/1/play": + self.mass.create_task(self.mass.player_queues.play(player_id)) + elif path == "/ctrl-int/1/playpause": + self.mass.create_task(self.mass.player_queues.play_pause(player_id)) + elif path == "/ctrl-int/1/stop": + self.mass.create_task(self.cmd_stop(player_id)) + elif path == "/ctrl-int/1/volumeup": + self.mass.create_task(self.mass.players.cmd_volume_up(player_id)) + elif path == "/ctrl-int/1/volumedown": + self.mass.create_task(self.mass.players.cmd_volume_down(player_id)) + elif path == "/ctrl-int/1/shuffle_songs": + queue = self.mass.player_queues.get(player_id) + self.mass.create_task( + self.mass.player_queues.set_shuffle(player_id, not queue.shuffle_enabled) + ) + elif path in ("/ctrl-int/1/pause", "/ctrl-int/1/discrete-pause"): + self.mass.create_task(self.mass.player_queues.pause(player_id)) + elif "dmcp.device-volume=" in path: + raop_volume = float(path.split("dmcp.device-volume=", 1)[-1]) + volume = convert_airplay_volume(raop_volume) + if abs(volume - int(atv_player.atv.audio.volume)) > 2: + self.mass.create_task(self.cmd_volume_set(player_id, volume)) + elif "dmcp.volume=" in path: + volume = int(path.split("dmcp.volume=", 1)[-1]) + if abs(volume - int(atv_player.atv.audio.volume)) > 2: + self.mass.create_task(self.cmd_volume_set(player_id, volume)) + else: + self.logger.warning( + "Unknown DACP request for %s: %s", + atv_player.discovery_info.name, + path, + ) - async def unload(self) -> None: - """Handle close/cleanup of the provider.""" - self._closing = True - if slimproto_prov := self.mass.get_provider("slimproto"): - slimproto_prov.unregister_virtual_provider("RaopBridge") - await self._stop_bridge() + # send response + date_str = utc().strftime("%a, %-d %b %Y %H:%M:%S") + response = ( + f"HTTP/1.0 204 No Content\r\nDate: {date_str} " + "GMT\r\nDAAP-Server: iTunes/7.6.2 (Windows; N;)\r\nContent-Type: " + "application/x-dmap-tagged\r\nContent-Length: 0\r\n" + "Connection: close\r\n\r\n" + ) + writer.write(response.encode()) + await writer.drain() + finally: + writer.close() async def get_player_config_entries(self, player_id: str) -> tuple[ConfigEntry, ...]: """Return all (provider/player specific) Config Entries for the given player (if any).""" - slimproto_prov = self.mass.get_provider("slimproto") - base_entries = await slimproto_prov.get_player_config_entries(player_id) - return base_entries + PLAYER_CONFIG_ENTRIES - - def on_player_config_changed(self, config: PlayerConfig, changed_keys: set[str]) -> None: - """Call (by config manager) when the configuration of a player changes.""" - super().on_player_config_changed(config, changed_keys) - # forward to slimproto too - slimproto_prov = self.mass.get_provider("slimproto") - slimproto_prov.on_player_config_changed(config, changed_keys) - - async def update_config() -> None: - # stop bridge (it will be auto restarted) - if changed_keys.intersection(NEED_BRIDGE_RESTART): - self.restart_bridge() - - asyncio.create_task(update_config()) - - def on_player_config_removed(self, player_id: str) -> None: - """Call (by config manager) when the configuration of a player is removed.""" - super().on_player_config_removed(player_id) - self._removed_players.add(player_id) - self.restart_bridge() + entries = await super().get_player_config_entries(player_id) + return entries + PLAYER_CONFIG_ENTRIES async def cmd_stop(self, player_id: str) -> None: - """Send STOP command to given player.""" - # simply forward to underlying slimproto player - slimproto_prov = self.mass.get_provider("slimproto") - await slimproto_prov.cmd_stop(player_id) + """Send STOP command to given player. + + - player_id: player_id of the player to handle the command. + """ + if stream_task := self._stream_tasks.pop(player_id, None): + if not stream_task.done(): + stream_task.cancel() + + # forward command to player and any connected sync members + async with asyncio.TaskGroup() as tg: + for atv_player in self._get_sync_clients(player_id): + tg.create_task(atv_player.stop()) async def cmd_play(self, player_id: str) -> None: - """Send PLAY command to given player.""" - # simply forward to underlying slimproto player - slimproto_prov = self.mass.get_provider("slimproto") - await slimproto_prov.cmd_play(player_id) + """Send PLAY (unpause) command to given player. + + - player_id: player_id of the player to handle the command. + """ + # forward command to player and any connected sync members + async with asyncio.TaskGroup() as tg: + for atv_player in self._get_sync_clients(player_id): + if atv_player.cliraop_proc and not atv_player.cliraop_proc.closed: + # prefer interactive command to our streamer + tg.create_task(atv_player.send_cli_command("ACTION=PLAY")) + atv_player.optimistic_state = PlayerState.PLAYING + atv_player.update_attributes() + elif atv := atv_player.atv: + tg.create_task(atv.remote_control.play()) + + async def cmd_pause(self, player_id: str) -> None: + """Send PAUSE command to given player. + + - player_id: player_id of the player to handle the command. + """ + # forward command to player and any connected sync members + async with asyncio.TaskGroup() as tg: + for atv_player in self._get_sync_clients(player_id): + if atv_player.cliraop_proc and not atv_player.cliraop_proc.closed: + # prefer interactive command to our streamer + tg.create_task(atv_player.send_cli_command("ACTION=PAUSE")) + atv_player.optimistic_state = PlayerState.PAUSED + atv_player.update_attributes() + elif atv := atv_player.atv: + tg.create_task(atv.remote_control.pause()) async def play_media( self, @@ -221,300 +657,402 @@ class AirplayProvider(PlayerProvider): - seek_position: Optional seek to this position. - fade_in: Optionally fade in the item at playback start. """ - # simply forward to underlying slimproto player - slimproto_prov = self.mass.get_provider("slimproto") - await slimproto_prov.play_media( - player_id, - queue_item=queue_item, - seek_position=seek_position, - fade_in=fade_in, - ) + # stop existing streams first + await self.cmd_stop(player_id) + await self.cmd_power(player_id, True) + atv_player = self._atv_players[player_id] + player = self.mass.players.get(player_id) + + if player.synced_to: + # should not happen, but just in case + raise RuntimeError("Player is synced") + + # NOTE: Although the pyatv library is perfectly capable of playback + # to not only raop targets but also airplay 1 + 2, its not suitable + # for synced playback to multiple clients at once. + # Also the performance is horrible. Python is not suitable for realtime + # audio streaming. + # So, I've decided to a combined route here. I've created a small binary + # written in C based on libraop to do the actual timestamped playback. + # the raw pcm audio is fed to the stdin of this cliraop binary and we can + # send some commands over a named pipe. + + # get current ntp before we start + _, stdout = await check_output(f"{self._cliraop_bin} -ntp") + ntp = int(stdout.strip()) + + # setup Raop process for player and its sync childs + async with asyncio.TaskGroup() as tg: + for atv_player in self._get_sync_clients(player_id): + if not atv_player.atv: + # just in case... + await atv_player.connect() + tg.create_task(self._init_cliraop(atv_player, ntp)) + + async def _streamer() -> None: + queue = self.mass.player_queues.get(queue_item.queue_id) + player.current_item_id = f"{queue_item.queue_id}.{queue_item.queue_item_id}" + player.elapsed_time = 0 + player.elapsed_time_last_updated = time.time() + player.state = PlayerState.PLAYING + self.mass.players.register_or_update(player) + prev_metadata_checksum: str = "" + pcm_format = AudioFormat( + content_type=ContentType.PCM_S16LE, + sample_rate=44100, + bit_depth=16, + channels=2, + ) + try: + async for pcm_chunk in self.mass.streams.get_flow_stream( + queue, + start_queue_item=queue_item, + pcm_format=pcm_format, + seek_position=seek_position, + fade_in=fade_in, + ): + # send metadata to player(s) if needed + # NOTE: this must all be done in separate tasks to not disturb audio + if queue and queue.current_item and queue.current_item.streamdetails: + metadata_checksum = ( + queue.current_item.streamdetails.stream_title + or queue.current_item.queue_item_id + ) + if prev_metadata_checksum != metadata_checksum: + prev_metadata_checksum = metadata_checksum + self.mass.create_task(self._send_metadata(player_id)) + # send progress metadata + for atv_player in self._get_sync_clients(player_id): + self.mass.create_task( + atv_player.send_cli_command(f"PROGRESS={int(queue.elapsed_time)}\n") + ) + + # send audio chunk to player(s) + async with asyncio.TaskGroup() as tg: + available_clients = 0 + for atv_player in self._get_sync_clients(player_id): + if not atv_player.cliraop_proc or atv_player.cliraop_proc.closed: + # this may not happen, but just in case + continue + available_clients += 1 + tg.create_task(atv_player.cliraop_proc.write(pcm_chunk)) + if not available_clients: + return + + finally: + self.logger.debug("Streamer task ended for player %s", queue.display_name) + for atv_player in self._get_sync_clients(player_id): + if atv_player.cliraop_proc and not atv_player.cliraop_proc.closed: + atv_player.cliraop_proc.write_eof() + + # start streaming the queue (pcm) audio in a background task + self._stream_tasks[player_id] = asyncio.create_task(_streamer()) async def play_stream(self, player_id: str, stream_job: MultiClientStreamJob) -> None: """Handle PLAY STREAM on given player. This is a special feature from the Universal Group provider. """ - # simply forward to underlying slimproto player - slimproto_prov = self.mass.get_provider("slimproto") - await slimproto_prov.play_stream(player_id, stream_job) - - async def enqueue_next_queue_item(self, player_id: str, queue_item: QueueItem) -> None: - """Handle enqueuing of the next queue item on the player.""" - # simply forward to underlying slimproto player - slimproto_prov = self.mass.get_provider("slimproto") - await slimproto_prov.enqueue_next_queue_item(player_id, queue_item) - - async def cmd_pause(self, player_id: str) -> None: - """Send PAUSE command to given player.""" - # simply forward to underlying slimproto player - slimproto_prov = self.mass.get_provider("slimproto") - await slimproto_prov.cmd_pause(player_id) + raise NotImplementedError async def cmd_power(self, player_id: str, powered: bool) -> None: - """Send POWER command to given player.""" - # simply forward to underlying slimproto player - slimproto_prov = self.mass.get_provider("slimproto") - await slimproto_prov.cmd_power(player_id, powered) + """Send POWER command to given player. + + - player_id: player_id of the player to handle the command. + - powered: bool if player should be powered on or off. + """ + atv_player = self._atv_players[player_id] + mass_player = self.mass.players.get(player_id) + if powered: + await atv_player.connect() + elif not powered: + await self.cmd_stop(player_id) + await atv_player.disconnect() + mass_player.powered = powered + self.mass.players.update(player_id) async def cmd_volume_set(self, player_id: str, volume_level: int) -> None: - """Send VOLUME_SET command to given player.""" - # simply forward to underlying slimproto player - slimproto_prov = self.mass.get_provider("slimproto") - await slimproto_prov.cmd_volume_set(player_id, volume_level) + """Send VOLUME_SET command to given player. - async def cmd_volume_mute(self, player_id: str, muted: bool) -> None: - """Send VOLUME MUTE command to given player.""" - # simply forward to underlying slimproto player - slimproto_prov = self.mass.get_provider("slimproto") - await slimproto_prov.cmd_volume_mute(player_id, muted) + - player_id: player_id of the player to handle the command. + - volume_level: volume level (0..100) to set on the player. + """ + atv_player = self._atv_players[player_id] + if atv_player.cliraop_proc: + # prefer interactive command to our streamer + await atv_player.send_cli_command(f"VOLUME={volume_level}") + elif atv := atv_player.atv: + await atv.audio.set_volume(volume_level) async def cmd_sync(self, player_id: str, target_player: str) -> None: - """Handle SYNC command for given player.""" - # simply forward to underlying slimproto player - slimproto_prov = self.mass.get_provider("slimproto") - await slimproto_prov.cmd_sync(player_id, target_player) + """Handle SYNC command for given player. + + Join/add the given player(id) to the given (master) player/sync group. + + - player_id: player_id of the player to handle the command. + - target_player: player_id of the syncgroup master or group player. + """ + player = self.mass.players.get(player_id, raise_unavailable=True) + group_leader = self.mass.players.get(target_player, raise_unavailable=True) + if group_leader.synced_to: + raise RuntimeError("Player is already synced") + player.synced_to = target_player + group_leader.group_childs.add(player_id) + self.mass.players.update(target_player) + if group_leader.powered: + await self.cmd_power(player_id, True) + active_queue = self.mass.player_queues.get_active_queue(group_leader.player_id) + if active_queue.state == PlayerState.PLAYING: + self.mass.create_task(self.mass.player_queues.resume(active_queue.queue_id)) async def cmd_unsync(self, player_id: str) -> None: - """Handle UNSYNC command for given player.""" - # simply forward to underlying slimproto player - slimproto_prov = self.mass.get_provider("slimproto") - await slimproto_prov.cmd_unsync(player_id) - - async def _handle_player_register_callback(self, player: Player) -> None: - """Handle player register callback from slimproto source player.""" - # TODO: Can we get better device info from mDNS ? - player.provider = self.instance_id - player.device_info = DeviceInfo( - model="Airplay device", - address=player.device_info.address, - manufacturer="Generic", + """Handle UNSYNC command for given player. + + Remove the given player from any syncgroups it currently is synced to. + + - player_id: player_id of the player to handle the command. + """ + player = self.mass.players.get(player_id, raise_unavailable=True) + if not player.synced_to: + return + group_leader = self.mass.players.get(player.synced_to, raise_unavailable=True) + group_leader.group_childs.remove(player_id) + player.synced_to = None + if player.state == PlayerState.PLAYING: + await self.cmd_stop(player_id) + self.mass.players.update(player_id) + + async def _run_discovery(self) -> None: + """Discover Airplay players on the network.""" + if self._discovery_running: + return + try: + self._discovery_running = True + self.logger.debug("Airplay discovery started...") + discovered_devices = await scan(self.mass.loop, timeout=5) + + if not discovered_devices: + self.logger.debug("No devices found") + return + + for dev in discovered_devices: + self.mass.create_task(self._player_discovered(dev)) + + finally: + self._discovery_running = False + + def reschedule(): + self.mass.create_task(self._run_discovery()) + + # reschedule self once finished + self.mass.loop.call_later(300, reschedule) + + async def _player_discovered(self, discovery_info: interface.BaseConfig) -> None: + """Handle discovered Airplay player on mdns.""" + player_id = f"ap{discovery_info.identifier.lower().replace(':', '')}" + if player_id in self._atv_players: + atv_player = self._atv_players[player_id] + if discovery_info.address != atv_player.discovery_info.address: + atv_player.address_updated(discovery_info.address) + return + self.logger.info(f"Connecting to {discovery_info.address}") + self._atv_players[player_id] = atv_player = AirPlayPlayer( + self.mass, player_id, discovery_info ) - player.supports_24bit = False - # disable sonos by default - if "sonos" in player.name.lower() or "rincon" in player.name.lower(): - player.enabled_by_default = False - - # extend info from the discovery xml - async with aiofiles.open(self._config_file, "r") as _file: - xml_data = await _file.read() - with suppress(ET.ParseError): - xml_root = ET.XML(xml_data) - for device_elem in xml_root.findall("device"): - player_id = device_elem.find("mac").text - if player_id != player.player_id: - continue - # prefer name from UDN because default name is often wrong - udn = device_elem.find("udn").text - udn_name = udn.split("@")[1].split("._")[0] - player.name = udn_name - break + atv_player._setup_device() + for player in self.players: + player.can_sync_with = tuple(x for x in self._atv_players if x != player.player_id) + self.mass.players.update(player.player_id) - def _handle_player_update_callback(self, player: Player) -> None: - """Handle player update callback from slimproto source player.""" + def _is_feature_available( + self, atv_player: interface.AppleTV, feature: interface.FeatureName + ) -> bool: + """Return if a feature is available.""" + mass_player = self.mass.players.get(atv_player.device_info.output_device_id) + if atv_player and mass_player.state == PlayerState.PLAYING: + return atv_player.features.in_state(interface.FeatureState.Available, feature) + return False - async def _get_bridge_binary(self): - """Find the correct bridge binary belonging to the platform.""" + async def get_cliraop_binary(self): + """Find the correct raop/airplay binary belonging to the platform.""" # ruff: noqa: SIM102 + if self._cliraop_bin is not None: + return self._cliraop_bin - async def check_bridge_binary(bridge_binary_path: str) -> str | None: + async def check_binary(cliraop_path: str) -> str | None: try: - bridge_binary = await asyncio.create_subprocess_exec( - *[bridge_binary_path, "-t", "-x", self._config_file], + cliraop = await asyncio.create_subprocess_exec( + *[cliraop_path, "-check"], stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.STDOUT, ) - stdout, _ = await bridge_binary.communicate() - if ( - bridge_binary.returncode == 1 - and b"This program is free software: you can redistribute it and/or modify" - in stdout - ): - self._bridge_bin = bridge_binary_path - return bridge_binary_path - except OSError as err: - self.logger.exception(err) + stdout, _ = await cliraop.communicate() + stdout = stdout.strip().decode() + if cliraop.returncode == 0 and stdout == "cliraop check": + self._cliraop_bin = cliraop_path + return cliraop_path + except OSError: return None base_path = os.path.join(os.path.dirname(__file__), "bin") - - system = platform.system().lower() + system = platform.system().lower().replace("darwin", "macos") architecture = platform.machine().lower() - if bridge_binary := await check_bridge_binary( - os.path.join(base_path, f"squeeze2raop-{system}-{architecture}-static") + if bridge_binary := await check_binary( + os.path.join(base_path, f"cliraop-{system}-{architecture}") ): return bridge_binary - msg = f"Unable to locate RaopBridge for {system}/{architecture}" + msg = f"Unable to locate RAOP Play binary for {system}/{architecture}" raise RuntimeError(msg) - async def _bridge_process_runner(self, slimproto_prov: SlimprotoProvider) -> None: - """Run the bridge binary in the background.""" - self.logger.debug( - "Starting Airplay bridge using config file %s", - self._config_file, + def _get_sync_clients(self, player_id: str) -> list[AirPlayPlayer]: + """Get all sync clients for a player.""" + mass_player = self.mass.players.get(player_id, True) + sync_clients: list[AirPlayPlayer] = [] + # we need to return the player itself too + group_child_ids = {player_id} + group_child_ids.update(mass_player.group_childs) + for child_id in group_child_ids: + if client := self._atv_players.get(child_id): + sync_clients.append(client) + return sync_clients + + async def _init_cliraop(self, atv_player: AirPlayPlayer, ntp: int) -> None: + """Initiatlize CLIRaop process for a player.""" + stream: RaopStream | None = next( + (x for x in atv_player.atv.stream.instances if isinstance(x, RaopStream)), None ) - conf_log_level = self.config.get_value(CONF_LOG_LEVEL) - enable_debug_log = conf_log_level == "DEBUG" - args = [ - self._bridge_bin, - "-s", - f"localhost:{slimproto_prov.port}", - "-x", - self._config_file, - "-I", - "-Z", - "-d", - f'all={"debug" if enable_debug_log else "warn"}', - # filter out apple tv's for now until we fix auth - "-m", - "apple-tv,appletv", - ] - start_success = False - while True: - try: - self._bridge_proc = await asyncio.create_subprocess_shell( - " ".join(args), - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.STDOUT, - ) - self._log_reader_task = asyncio.create_task(self._log_reader()) - await self._bridge_proc.wait() - except Exception as err: - if not start_success: - raise - self.logger.exception("Error in Airplay bridge", exc_info=err) - if self._closing: - break - await asyncio.sleep(10) + if stream is None: + raise RuntimeError("RAOP Not available") - async def _stop_bridge(self) -> None: - """Stop the bridge process.""" - if self._bridge_proc: - try: - self.logger.info("Stopping bridge process...") - self._bridge_proc.terminate() - await self._bridge_proc.wait() - self.logger.info("Bridge process stopped.") - await asyncio.sleep(5) - except ProcessLookupError: - pass - if self._log_reader_task and not self._log_reader_task.done(): - self._log_reader_task.cancel() - - async def _check_config_xml(self, recreate: bool = False) -> None: - """Check the bridge config XML file.""" - # ruff: noqa: PLR0915 - if recreate or not os.path.isfile(self._config_file): - if os.path.isfile(self._config_file): - os.remove(self._config_file) - # discover players and create default config file - args = [ - self._bridge_bin, - "-i", - self._config_file, - ] - proc = await asyncio.create_subprocess_shell( - " ".join(args), - stdout=asyncio.subprocess.DEVNULL, - stderr=asyncio.subprocess.DEVNULL, + async def log_watcher(cliraop_proc: AsyncProcess) -> None: + """Monitor stderr for a running CLIRaop process.""" + mass_player = self.mass.players.get(atv_player.player_id) + logger = self.logger.getChild(atv_player.player_id) + async for line in cliraop_proc._proc.stderr: + line = line.decode().strip() # noqa: PLW2901 + if "set pause" in line: + atv_player.optimistic_state = PlayerState.PAUSED + atv_player.update_attributes() + if "Restarted at" in line: + atv_player.optimistic_state = PlayerState.PLAYING + atv_player.update_attributes() + elif "after start), played" in line: + millis = int(line.split("played ")[1].split(" ")[0]) + mass_player.elapsed_time = millis / 1000 + mass_player.elapsed_time_last_updated = time.time() + else: + logger.debug(line) + # if we reach this point, the process exited + if cliraop_proc._proc.returncode is not None: + cliraop_proc.closed = True + logger.debug( + "CLIRaop process stopped with errorcode %s", + cliraop_proc._proc.returncode, ) - await proc.wait() - # read xml file's data - async with aiofiles.open(self._config_file, "r") as _file: - xml_data = await _file.read() + extra_args = [] + latency = self.mass.config.get_raw_player_config_value( + atv_player.player_id, CONF_LATENCY, 1200 + ) + extra_args += ["-l", str(latency)] + if self.mass.config.get_raw_player_config_value( + atv_player.player_id, CONF_ENCRYPTION, False + ): + extra_args += ["-et"] + if self.mass.config.get_raw_player_config_value( + atv_player.player_id, CONF_ALAC_ENCODE, False + ): + extra_args += ["-a"] + if self.mass.config.get_raw_player_config_value( + atv_player.player_id, CONF_VOLUME_START, False + ): + extra_args += ["-v", str(int(atv_player.atv.audio.volume))] + sync_adjust = self.mass.config.get_raw_player_config_value( + atv_player.player_id, CONF_SYNC_ADJUST, 0 + ) - try: - xml_root = ET.XML(xml_data) - except ET.ParseError: - if recreate: - raise - await self._check_config_xml(True) - return + atv_player.optimistic_state = PlayerState.PLAYING + # always generate a new active remote id to prevent race conditions + # with the named pipe used to send commands + atv_player.active_remote_id = str(randint(1000, 8000)) + args = [ + self._cliraop_bin, + "-n", + str(ntp), + "-p", + str(stream.core.service.port), + "-w", + str(2000 + sync_adjust), + *extra_args, + "-dacp", + self.dacp_id, + "-ar", + atv_player.active_remote_id, + "-md", + atv_player.discovery_info.properties["_raop._tcp.local"]["md"], + "-et", + atv_player.discovery_info.properties["_raop._tcp.local"]["et"], + str(atv_player.discovery_info.address), + "-", + ] + if platform.system() == "Darwin": + os.environ["DYLD_LIBRARY_PATH"] = "/usr/local/lib" + atv_player.cliraop_proc = AsyncProcess( + args, enable_stdin=True, enable_stdout=False, enable_stderr=True + ) + await atv_player.cliraop_proc.start() + # send empty command to unblock named pipe + await atv_player.send_cli_command("\n") + atv_player.cliraop_proc.attach_task(log_watcher(atv_player.cliraop_proc)) - # set common/global values - common_elem = xml_root.find("common") - for key, value in { - "codecs": "flc,pcm", - "sample_rate": "44100", - "resample": "0", - }.items(): - xml_elem = common_elem.find(key) - xml_elem.text = value - - # default values for players - for conf_entry in PLAYER_CONFIG_ENTRIES: - if conf_entry.type == ConfigEntryType.LABEL: - continue - conf_val = conf_entry.default_value - xml_elem = common_elem.find(conf_entry.key) - if xml_elem is None: - xml_elem = ET.SubElement(common_elem, conf_entry.key) - if conf_entry.type == ConfigEntryType.BOOLEAN: - xml_elem.text = "1" if conf_val else "0" + async def _send_metadata(self, player_id: str) -> None: + """Send metadata to player (and connected sync childs).""" + queue = self.mass.player_queues.get_active_queue(player_id) + if not queue or not queue.current_item: + return + duration = min(queue.current_item.duration or 0, 3600) + title = queue.current_item.name + artist = "" + album = "" + if queue.current_item.streamdetails and queue.current_item.streamdetails.stream_title: + # stream title from radio station + stream_title = queue.current_item.streamdetails.stream_title + if " - " in stream_title: + artist, title = stream_title.split(" - ", 1) else: - xml_elem.text = str(conf_val) - - # get/set all device configs - for device_elem in xml_root.findall("device"): - player_id = device_elem.find("mac").text - if player_id in self._removed_players: - xml_root.remove(device_elem) - self._removed_players.remove(player_id) - continue - # use raw config values because players are not - # yet available at startup/init (race condition) - raw_player_conf = self.mass.config.get(f"{CONF_PLAYERS}/{player_id}") - if not raw_player_conf: - continue - device_elem.find("enabled").text = "1" if raw_player_conf["enabled"] else "0" - - # set some values that are not (yet) configurable - for key, value in { - "player_volume": "-1", - "prevent_playback": "off", - }.items(): - xml_elem = device_elem.find(key) - if xml_elem is None: - xml_elem = ET.SubElement(device_elem, key) - xml_elem.text = value - - # set values based on config entries - for conf_entry in PLAYER_CONFIG_ENTRIES: - if conf_entry.type == ConfigEntryType.LABEL: - continue - conf_val = raw_player_conf["values"].get(conf_entry.key, conf_entry.default_value) - xml_elem = device_elem.find(conf_entry.key) - if xml_elem is None: - xml_elem = ET.SubElement(device_elem, conf_entry.key) - if conf_entry.type == ConfigEntryType.BOOLEAN: - xml_elem.text = "1" if conf_val else "0" - else: - xml_elem.text = str(conf_val) - - # save config file - async with aiofiles.open(self._config_file, "w") as _file: - xml_str = ET.tostring(xml_root) - await _file.write(xml_str.decode()) - - async def _log_reader(self) -> None: - """Read log output from bridge process.""" - bridge_logger = self.logger.getChild("squeeze2raop") - while self._bridge_proc.returncode is None: - async for line in self._bridge_proc.stdout: - bridge_logger.debug(line.decode().strip()) - - def restart_bridge(self) -> None: - """Schedule restart of bridge process.""" - if self._timer_handle is not None: - self._timer_handle.cancel() - self._timer_handle = None - - async def restart_bridge() -> None: - self.logger.info("Restarting Airplay bridge (due to config changes)") - await self._stop_bridge() - await self._check_config_xml() - - # schedule the action for later - self._timer_handle = self.mass.loop.call_later(10, self.mass.create_task, restart_bridge) + title = stream_title + # set album to radio station name + album = queue.current_item.name + if media_item := queue.current_item.media_item: + if artist_str := getattr(media_item, "artist_str", None): + artist = artist_str + if _album := getattr(media_item, "album", None): + album = _album.name + + cmd = f"TITLE={title or 'Music Assistant'}\nARTIST={artist}\nALBUM={album}\n" + cmd += f"DURATION={duration}\nACTION=SENDMETA\n" + + async with asyncio.TaskGroup() as tg: + for atv_player in self._get_sync_clients(player_id): + tg.create_task(atv_player.send_cli_command(cmd)) + + # get image + if not queue.current_item.image: + return + image_path = create_tempfile() + image_data = await self.mass.metadata.get_thumbnail( + queue.current_item.image.path, + 512, + queue.current_item.image.provider, + ) + async with aiofiles.open(image_path.name, "wb") as outfile: + await outfile.write(image_data) + async with asyncio.TaskGroup() as tg: + for atv_player in self._get_sync_clients(player_id): + if image_path: + tg.create_task(atv_player.send_cli_command(f"ARTWORK={image_path.name}\n")) + # make sure the temp file gets deleted again + await asyncio.sleep(5) + image_path.close() diff --git a/music_assistant/server/providers/airplay/bin/cliraop-linux-aarch64 b/music_assistant/server/providers/airplay/bin/cliraop-linux-aarch64 new file mode 100755 index 00000000..ef4c4eb4 Binary files /dev/null and b/music_assistant/server/providers/airplay/bin/cliraop-linux-aarch64 differ diff --git a/music_assistant/server/providers/airplay/bin/cliraop-linux-x86_64 b/music_assistant/server/providers/airplay/bin/cliraop-linux-x86_64 new file mode 100755 index 00000000..ba25f08a Binary files /dev/null and b/music_assistant/server/providers/airplay/bin/cliraop-linux-x86_64 differ diff --git a/music_assistant/server/providers/airplay/bin/cliraop-macos-arm64 b/music_assistant/server/providers/airplay/bin/cliraop-macos-arm64 new file mode 100755 index 00000000..4a91d903 Binary files /dev/null and b/music_assistant/server/providers/airplay/bin/cliraop-macos-arm64 differ diff --git a/music_assistant/server/providers/airplay/bin/squeeze2raop-darwin-arm64-static b/music_assistant/server/providers/airplay/bin/squeeze2raop-darwin-arm64-static deleted file mode 100755 index ca6cb7a5..00000000 Binary files a/music_assistant/server/providers/airplay/bin/squeeze2raop-darwin-arm64-static and /dev/null differ diff --git a/music_assistant/server/providers/airplay/bin/squeeze2raop-darwin-x86_64-static b/music_assistant/server/providers/airplay/bin/squeeze2raop-darwin-x86_64-static deleted file mode 100755 index 69c2027a..00000000 Binary files a/music_assistant/server/providers/airplay/bin/squeeze2raop-darwin-x86_64-static and /dev/null differ diff --git a/music_assistant/server/providers/airplay/bin/squeeze2raop-linux-aarch64-static b/music_assistant/server/providers/airplay/bin/squeeze2raop-linux-aarch64-static deleted file mode 100755 index ace5d071..00000000 Binary files a/music_assistant/server/providers/airplay/bin/squeeze2raop-linux-aarch64-static and /dev/null differ diff --git a/music_assistant/server/providers/airplay/bin/squeeze2raop-linux-arm-static b/music_assistant/server/providers/airplay/bin/squeeze2raop-linux-arm-static deleted file mode 100755 index 0b3da4d0..00000000 Binary files a/music_assistant/server/providers/airplay/bin/squeeze2raop-linux-arm-static and /dev/null differ diff --git a/music_assistant/server/providers/airplay/bin/squeeze2raop-linux-x86-static b/music_assistant/server/providers/airplay/bin/squeeze2raop-linux-x86-static deleted file mode 100755 index 0593e700..00000000 Binary files a/music_assistant/server/providers/airplay/bin/squeeze2raop-linux-x86-static and /dev/null differ diff --git a/music_assistant/server/providers/airplay/bin/squeeze2raop-linux-x86_64-static b/music_assistant/server/providers/airplay/bin/squeeze2raop-linux-x86_64-static deleted file mode 100755 index 0e2e4da7..00000000 Binary files a/music_assistant/server/providers/airplay/bin/squeeze2raop-linux-x86_64-static and /dev/null differ diff --git a/music_assistant/server/providers/airplay/manifest.json b/music_assistant/server/providers/airplay/manifest.json index 9460f5e0..9e5fb135 100644 --- a/music_assistant/server/providers/airplay/manifest.json +++ b/music_assistant/server/providers/airplay/manifest.json @@ -9,6 +9,5 @@ "multi_instance": false, "builtin": false, "load_by_default": true, - "depends_on": "slimproto", - "icon":"cast-variant" + "icon": "cast-variant" } diff --git a/pyproject.toml b/pyproject.toml index 14e124e2..27237055 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,13 +1,8 @@ [project] name = "music_assistant" # The version is set by GH action on release -version = "0.0.0" -license = { text = "Apache-2.0" } -description = "Music Assistant" -readme = "README.md" -requires-python = ">=3.11" authors = [ - { name = "The Music Assistant Authors", email = "marcelveldt@users.noreply.github.com" }, + {name = "The Music Assistant Authors", email = "marcelveldt@users.noreply.github.com"}, ] classifiers = [ "Environment :: Console", @@ -15,6 +10,11 @@ classifiers = [ "Programming Language :: Python :: 3.12", ] dependencies = ["aiohttp", "orjson", "mashumaro"] +description = "Music Assistant" +license = {text = "Apache-2.0"} +readme = "README.md" +requires-python = ">=3.11" +version = "0.0.0" [project.optional-dependencies] server = [ @@ -63,10 +63,10 @@ mass = "music_assistant.__main__:main" ignore-words-list = "provid,hass,followings,childs" [tool.setuptools] +include-package-data = true +packages = ["music_assistant"] platforms = ["any"] zip-safe = false -packages = ["music_assistant"] -include-package-data = true [tool.setuptools.package-data] music_assistant = ["py.typed"] @@ -78,19 +78,17 @@ show-fixes = true line-length = 100 target-version = "py311" - [tool.ruff.lint.pydocstyle] # Use Google-style docstrings. convention = "pep257" [tool.ruff.lint.pylint] +max-args = 10 max-branches = 25 max-returns = 15 -max-args = 10 max-statements = 50 - [tool.mypy] platform = "linux" python_version = "3.11" @@ -134,13 +132,10 @@ disable = [ "duplicate-code", "format", "unsubscriptable-object", - "unused-argument", # handled by ruff - "unspecified-encoding", # handled by ruff + "unused-argument", # handled by ruff + "unspecified-encoding", # handled by ruff "isinstance-second-argument-not-valid-type", # conflict with ruff - "fixme", # we're still developing - - # TEMPORARY DISABLED rules - # The below rules must be enabled later one-by-one ! + "fixme", # we're still developing # TEMPORARY DISABLED rules # The below rules must be enabled later one-by-one ! "too-many-return-statements", "unsupported-assignment-operation", "invalid-name", @@ -165,12 +160,10 @@ disable = [ "no-else-raise", "undefined-loop-variable", "too-many-nested-blocks", - "too-many-public-methods", # unavoidable? - "too-many-arguments", # unavoidable? - "too-many-branches", # unavoidable? - "too-many-instance-attributes", # unavoidable? - - + "too-many-public-methods", # unavoidable? + "too-many-arguments", # unavoidable? + "too-many-branches", # unavoidable? + "too-many-instance-attributes", # unavoidable? ] [tool.pylint.SIMILARITIES] @@ -185,27 +178,24 @@ asyncio_mode = "auto" [tool.ruff.lint] ignore = [ - "ANN002", # Just annoying, not really useful - "ANN003", # Just annoying, not really useful - "ANN101", # Self... explanatory - "ANN401", # Opinioated warning on disallowing dynamically typed expressions - "D203", # Conflicts with other rules - "D213", # Conflicts with other rules - "D417", # False positives in some occasions - "FIX002", # Just annoying, not really useful + "ANN002", # Just annoying, not really useful + "ANN003", # Just annoying, not really useful + "ANN101", # Self... explanatory + "ANN401", # Opinioated warning on disallowing dynamically typed expressions + "D203", # Conflicts with other rules + "D213", # Conflicts with other rules + "D417", # False positives in some occasions + "EM101", # Just annoying, not really useful + "FIX002", # Just annoying, not really useful "PLR2004", # Just annoying, not really useful - "PD011", # Just annoying, not really useful - "S101", # assert is often used to satisfy type checking - "TD002", # Just annoying, not really useful - "TD003", # Just annoying, not really useful - "TD004", # Just annoying, not really useful - - # Conflicts with the Ruff formatter - "COM812", - "ISC001", - - # TEMPORARY DISABLED rules - # The below rules must be enabled later one-by-one ! + "PD011", # Just annoying, not really useful + "S101", # assert is often used to satisfy type checking + "TD002", # Just annoying, not really useful + "TD003", # Just annoying, not really useful + "TD004", # Just annoying, not really useful + "TRY003", # Just annoying, not really useful + "COM812", # Conflicts with the Ruff formatter + "ISC001", # TEMPORARY DISABLED rules # The below rules must be enabled later one-by-one ! "BLE001", "FBT001", "FBT002",