From: Marcel van der Veldt Date: Tue, 20 Feb 2024 20:50:37 +0000 (+0100) Subject: More improvements for the Airplay provider (#1100) X-Git-Url: https://git.kitaultman.com/?a=commitdiff_plain;h=593e72fc8784d040c76919afb9b27c085b16c15a;p=music-assistant-server.git More improvements for the Airplay provider (#1100) --- diff --git a/music_assistant/common/models/provider.py b/music_assistant/common/models/provider.py index 114a09d5..404052a3 100644 --- a/music_assistant/common/models/provider.py +++ b/music_assistant/common/models/provider.py @@ -48,6 +48,8 @@ class ProviderManifest(DataClassORJSONMixin): # if this attribute is omitted and an icon_dark.svg is found in the provider # folder, the file contents will be read instead. icon_svg_dark: str | None = None + # mdns_discovery: list of mdns types to discover + mdns_discovery: list[str] | None = None @classmethod async def parse(cls: ProviderManifest, manifest_file: str) -> ProviderManifest: diff --git a/music_assistant/server/controllers/metadata.py b/music_assistant/server/controllers/metadata.py index 0de9f97a..0e8bc06c 100644 --- a/music_assistant/server/controllers/metadata.py +++ b/music_assistant/server/controllers/metadata.py @@ -330,24 +330,35 @@ class MetaDataController(CoreController): return None def get_image_url( - self, image: MediaItemImage, size: int = 0, prefer_proxy: bool = False + self, + image: MediaItemImage, + size: int = 0, + prefer_proxy: bool = False, + image_format: str = "png", ) -> str: """Get (proxied) URL for MediaItemImage.""" if image.provider != "url" or prefer_proxy or size: # return imageproxy url for images that need to be resolved # the original path is double encoded encoded_url = urllib.parse.quote(urllib.parse.quote(image.path)) - return f"{self.mass.streams.base_url}/imageproxy?path={encoded_url}&provider={image.provider}&size={size}" # noqa: E501 + return f"{self.mass.streams.base_url}/imageproxy?path={encoded_url}&provider={image.provider}&size={size}&fmt={image_format}" # noqa: E501 return image.path async def get_thumbnail( - self, path: str, size: int | None = None, provider: str = "url", base64: bool = False + self, + path: str, + size: int | None = None, + provider: str = "url", + base64: bool = False, + image_format: str = "png", ) -> bytes | str: """Get/create thumbnail image for path (image url or local path).""" - thumbnail = await get_image_thumb(self.mass, path, size=size, provider=provider) + thumbnail = await get_image_thumb( + self.mass, path, size=size, provider=provider, image_format=image_format + ) if base64: enc_image = b64encode(thumbnail).decode() - thumbnail = f"data:image/png;base64,{enc_image}" + thumbnail = f"data:image/{image_format};base64,{enc_image}" return thumbnail async def handle_imageproxy(self, request: web.Request) -> web.Response: @@ -355,17 +366,20 @@ class MetaDataController(CoreController): path = request.query["path"] provider = request.query.get("provider", "url") size = int(request.query.get("size", "0")) + image_format = request.query.get("fmt", "png") if "%" in path: # assume (double) encoded url, decode it path = urllib.parse.unquote(path) with suppress(FileNotFoundError): - image_data = await self.get_thumbnail(path, size=size, provider=provider) + image_data = await self.get_thumbnail( + path, size=size, provider=provider, image_format=image_format + ) # we set the cache header to 1 year (forever) # the client can use the checksum value to refresh when content changes return web.Response( body=image_data, headers={"Cache-Control": "max-age=31536000"}, - content_type="image/png", + content_type=f"image/{image_format}", ) return web.Response(status=404) diff --git a/music_assistant/server/controllers/streams.py b/music_assistant/server/controllers/streams.py index f678a61a..8d46f959 100644 --- a/music_assistant/server/controllers/streams.py +++ b/music_assistant/server/controllers/streams.py @@ -25,7 +25,7 @@ from music_assistant.common.models.config_entries import ( ConfigValueOption, ConfigValueType, ) -from music_assistant.common.models.enums import ConfigEntryType, ContentType +from music_assistant.common.models.enums import ConfigEntryType, ContentType, MediaType from music_assistant.common.models.errors import MediaNotFoundError, QueueEmpty from music_assistant.common.models.media_items import AudioFormat from music_assistant.constants import ( @@ -515,7 +515,7 @@ class StreamsController(CoreController): # feed stdin with pcm audio chunks from origin async def read_audio() -> None: try: - async for chunk in get_media_stream( + async for _, chunk in get_media_stream( self.mass, streamdetails=queue_item.streamdetails, pcm_format=pcm_format, @@ -777,6 +777,8 @@ class StreamsController(CoreController): use_crossfade = self.mass.config.get_raw_player_config_value( queue.queue_id, CONF_CROSSFADE, False ) + if start_queue_item.media_type != MediaType.TRACK: + use_crossfade = False pcm_sample_size = int(pcm_format.sample_rate * (pcm_format.bit_depth / 8) * 2) self.logger.info( "Start Queue Flow stream for Queue %s - crossfade: %s", @@ -814,12 +816,13 @@ class StreamsController(CoreController): ) crossfade_size = int(pcm_sample_size * crossfade_duration) queue_track.streamdetails.seconds_skipped = seek_position - buffer_size = crossfade_size if use_crossfade else int(pcm_sample_size * 2) + buffer_size = int(pcm_sample_size * 2) # 2 seconds + if use_crossfade: + buffer_size += crossfade_size bytes_written = 0 buffer = b"" - chunk_num = 0 # handle incoming audio chunks - async for chunk in get_media_stream( + async for is_last_chunk, chunk in get_media_stream( self.mass, queue_track.streamdetails, pcm_format=pcm_format, @@ -829,23 +832,24 @@ class StreamsController(CoreController): strip_silence_begin=use_crossfade, strip_silence_end=use_crossfade, ): - chunk_num += 1 - - # throttle buffer, do not allow more than 30 seconds in buffer + # throttle buffer, do not allow more than 30 seconds in player's own buffer seconds_buffered = (total_bytes_written + bytes_written) / pcm_sample_size player = self.mass.players.get(queue.queue_id) if seconds_buffered > 60 and player.corrected_elapsed_time > 30: while (seconds_buffered - player.corrected_elapsed_time) > 30: await asyncio.sleep(1) - #### HANDLE FIRST PART OF TRACK + # ALWAYS APPEND CHUNK TO BUFFER + buffer += chunk + if not is_last_chunk and len(buffer) < buffer_size: + # buffer is not full enough, move on + continue - # buffer full for crossfade - if last_fadeout_part and (len(buffer) >= buffer_size): - first_part = buffer + chunk + #### HANDLE CROSSFADE OF PREVIOUS TRACK AND NEW TRACK + if not is_last_chunk and last_fadeout_part: # perform crossfade - fadein_part = first_part[:crossfade_size] - remaining_bytes = first_part[crossfade_size:] + fadein_part = buffer[:crossfade_size] + remaining_bytes = buffer[crossfade_size:] crossfade_part = await crossfade_pcm_parts( fadein_part, last_fadeout_part, @@ -855,39 +859,37 @@ class StreamsController(CoreController): # send crossfade_part yield crossfade_part bytes_written += len(crossfade_part) - # also write the leftover bytes from the strip action + # also write the leftover bytes from the crossfade action if remaining_bytes: yield remaining_bytes bytes_written += len(remaining_bytes) - + del remaining_bytes # clear vars last_fadeout_part = b"" buffer = b"" - continue - # enough data in buffer, feed to output - if len(buffer) >= (buffer_size * 2): - yield buffer[:buffer_size] - bytes_written += buffer_size - buffer = buffer[buffer_size:] + chunk - continue + #### HANDLE END OF TRACK + elif is_last_chunk: + if use_crossfade: + # if crossfade is enabled, save fadeout part to pickup for next track + last_fadeout_part = buffer[-crossfade_size:] + remaining_bytes = buffer[:-crossfade_size] + yield remaining_bytes + bytes_written += len(remaining_bytes) + del remaining_bytes + else: + # no crossfade enabled, just yield the (entire) buffer last part + yield buffer + bytes_written += len(buffer) + # clear vars + buffer = b"" - # all other: fill buffer - buffer += chunk - continue - - #### HANDLE END OF TRACK - - if buffer and use_crossfade: - # if crossfade is enabled, save fadeout part to pickup for next track - last_fadeout_part = buffer[-crossfade_size:] - remaining_bytes = buffer[:-crossfade_size] - yield remaining_bytes - bytes_written += len(remaining_bytes) - elif buffer: - # no crossfade enabled, just yield the buffer last part - yield buffer - bytes_written += len(buffer) + #### OTHER: enough data in buffer, feed to output + else: + chunk_size = len(chunk) + yield buffer[:chunk_size] + bytes_written += chunk_size + buffer = buffer[chunk_size:] # update duration details based on the actual pcm data we sent # this also accounts for crossfade and silence stripping diff --git a/music_assistant/server/helpers/audio.py b/music_assistant/server/helpers/audio.py index a072ce7f..1f5ac3ca 100644 --- a/music_assistant/server/helpers/audio.py +++ b/music_assistant/server/helpers/audio.py @@ -395,8 +395,8 @@ async def get_media_stream( # noqa: PLR0915 seek_position: int = 0, fade_in: bool = False, strip_silence_begin: bool = False, - strip_silence_end: bool = True, -) -> AsyncGenerator[bytes, None]: + strip_silence_end: bool = False, +) -> AsyncGenerator[tuple[bool, bytes], None]: """ Get the (raw PCM) audio stream for the given streamdetails. @@ -458,7 +458,7 @@ async def get_media_stream( # noqa: PLR0915 sample_rate=pcm_format.sample_rate, bit_depth=pcm_format.bit_depth, ) - yield stripped_audio + yield (False, stripped_audio) bytes_sent += len(stripped_audio) prev_chunk = b"" del stripped_audio @@ -470,7 +470,7 @@ async def get_media_stream( # noqa: PLR0915 # middle part of the track, send previous chunk and collect current chunk if prev_chunk: - yield prev_chunk + yield (False, prev_chunk) bytes_sent += len(prev_chunk) prev_chunk = chunk @@ -484,11 +484,11 @@ async def get_media_stream( # noqa: PLR0915 bit_depth=pcm_format.bit_depth, reverse=True, ) - yield stripped_audio + yield (True, stripped_audio) bytes_sent += len(stripped_audio) del stripped_audio else: - yield prev_chunk + yield (True, prev_chunk) bytes_sent += len(prev_chunk) del prev_chunk @@ -551,7 +551,7 @@ async def resolve_radio_stream(mass: MusicAssistant, url: str) -> tuple[str, boo LOGGER.debug("Error while parsing radio URL %s: %s", url, err) result = (url, supports_icy) - await mass.cache.set(cache_key, result) + await mass.cache.set(cache_key, result, expiration=86400) return result diff --git a/music_assistant/server/helpers/images.py b/music_assistant/server/helpers/images.py index 4e54a1a5..ed7ba6e2 100644 --- a/music_assistant/server/helpers/images.py +++ b/music_assistant/server/helpers/images.py @@ -35,7 +35,11 @@ async def get_image_data(mass: MusicAssistant, path_or_url: str, provider: str = async def get_image_thumb( - mass: MusicAssistant, path_or_url: str, size: int | None, provider: str = "url" + mass: MusicAssistant, + path_or_url: str, + size: int | None, + provider: str = "url", + image_format: str = "PNG", ) -> bytes: """Get (optimized) PNG thumbnail from image url.""" img_data = await get_image_data(mass, path_or_url, provider) @@ -45,7 +49,7 @@ async def get_image_thumb( img = Image.open(BytesIO(img_data)) if size: img.thumbnail((size, size), Image.LANCZOS) # pylint: disable=no-member - img.convert("RGB").save(data, "PNG", optimize=True) + img.convert("RGB").save(data, image_format, optimize=True) return data.getvalue() return await asyncio.to_thread(_create_image) diff --git a/music_assistant/server/models/provider.py b/music_assistant/server/models/provider.py index 9ae02459..3ad79e64 100644 --- a/music_assistant/server/models/provider.py +++ b/music_assistant/server/models/provider.py @@ -8,6 +8,9 @@ from typing import TYPE_CHECKING from music_assistant.constants import CONF_LOG_LEVEL, ROOT_LOGGER_NAME if TYPE_CHECKING: + from zeroconf import ServiceStateChange + from zeroconf.asyncio import AsyncServiceInfo + from music_assistant.common.models.config_entries import ProviderConfig from music_assistant.common.models.enums import ProviderFeature, ProviderType from music_assistant.common.models.provider import ProviderInstance, ProviderManifest @@ -25,7 +28,7 @@ class Provider: self.manifest = manifest self.config = config mass_logger = logging.getLogger(ROOT_LOGGER_NAME) - self.logger = mass_logger.getChild(f"providers.{self.instance_id}") + self.logger = mass_logger.getChild(f"providers.{self.domain}") log_level = config.get_value(CONF_LOG_LEVEL) if log_level == "GLOBAL": self.logger.setLevel(mass_logger.level) @@ -61,6 +64,11 @@ class Provider: Called when provider is deregistered (e.g. MA exiting or config reloading). """ + async def on_mdns_service_state_change( + self, name: str, state_change: ServiceStateChange, info: AsyncServiceInfo | None + ) -> None: + """Handle MDNS service state callback.""" + @property def type(self) -> ProviderType: """Return type of this provider.""" diff --git a/music_assistant/server/providers/airplay/__init__.py b/music_assistant/server/providers/airplay/__init__.py index 23dedfc0..387bdbf4 100644 --- a/music_assistant/server/providers/airplay/__init__.py +++ b/music_assistant/server/providers/airplay/__init__.py @@ -8,16 +8,12 @@ import platform import socket import time from collections.abc import AsyncGenerator +from contextlib import suppress +from dataclasses import dataclass from random import randint, randrange -from typing import TYPE_CHECKING, cast - -from pyatv import connect, interface, scan -from pyatv.conf import AppleTV as ATVConf -from pyatv.const import DeviceModel, DeviceState, PowerState, Protocol -from pyatv.convert import model_str -from pyatv.interface import AppleTV as AppleTVInterface -from pyatv.interface import DeviceListener -from pyatv.protocols.raop import RaopStream +from typing import TYPE_CHECKING + +from zeroconf import ServiceStateChange from zeroconf.asyncio import AsyncServiceInfo from music_assistant.common.helpers.datetime import utc @@ -39,7 +35,7 @@ from music_assistant.common.models.enums import ( from music_assistant.common.models.media_items import AudioFormat from music_assistant.common.models.player import DeviceInfo, Player from music_assistant.common.models.player_queue import PlayerQueue -from music_assistant.server.helpers.process import AsyncProcess, check_output +from music_assistant.server.helpers.process import check_output from music_assistant.server.models.player_provider import PlayerProvider if TYPE_CHECKING: @@ -53,6 +49,7 @@ if TYPE_CHECKING: DOMAIN = "airplay" CONF_LATENCY = "latency" +DEFAULT_LATENCY = 2000 CONF_ENCRYPTION = "encryption" CONF_ALAC_ENCODE = "alac_encode" CONF_VOLUME_START = "volume_start" @@ -64,13 +61,13 @@ PLAYER_CONFIG_ENTRIES = ( ConfigEntry( key=CONF_LATENCY, type=ConfigEntryType.INTEGER, - range=(200, 3000), - default_value=1200, + range=(500, 4000), + default_value=DEFAULT_LATENCY, label="Latency", description="Sets the number of milliseconds of audio buffer in the player. " - "This is important to absorb network throughput jitter. " - "Note that the resume after pause will be skipping that amount of time " - "and volume changes will be delayed by the same amount, when using digital volume.", + "This is important to absorb network throughput jitter. \n" + "Increase this value if you notice network dropouts at the cost of a slower " + "response to commands.", advanced=True, ), ConfigEntry( @@ -91,15 +88,6 @@ PLAYER_CONFIG_ENTRIES = ( "(lossless) ALAC at the cost of a bit CPU.", advanced=True, ), - ConfigEntry( - key=CONF_VOLUME_START, - type=ConfigEntryType.BOOLEAN, - default_value=False, - label="Send volume at playback start", - description="Some players require to send/confirm the volume when playback starts. \n" - "Enable this setting if the playback volume does not match the MA interface.", - advanced=True, - ), ConfigEntry( key=CONF_SYNC_ADJUST, type=ConfigEntryType.INTEGER, @@ -125,6 +113,8 @@ BACKOFF_TIME_LOWER_LIMIT = 15 # seconds BACKOFF_TIME_UPPER_LIMIT = 300 # Five minutes CONF_CREDENTIALS = "credentials" +CACHE_KEY_PREV_VOLUME = "airplay_prev_volume" +FALLBACK_VOLUME = 20 async def setup( @@ -163,88 +153,116 @@ def convert_airplay_volume(value: float) -> int: return int(portion + normal_min) -class AirPlayPlayer(DeviceListener): - """Holds the connection to the apyatv instance and the cliraop.""" +def get_model_from_am(am_property: str | None) -> tuple[str, str]: + """Return Manufacturer and Model name from mdns AM property.""" + manufacturer = "Unknown" + model = "Generic Airplay device" + if not am_property: + return (manufacturer, model) + if isinstance(am_property, bytes): + am_property = am_property.decode("utf-8") + if am_property == "AudioAccessory5,1": + model = "HomePod" + manufacturer = "Apple" + elif "AppleTV" in am_property: + model = "Apple TV" + manufacturer = "Apple" + else: + model = am_property + return (manufacturer, model) + + +class AirplayStreamJob: + """Object that holds the details of a stream job.""" + + def __init__(self, prov: AirplayProvider, airplay_player: AirPlayPlayer) -> None: + """Initialize AirplayStreamJob.""" + self.prov = prov + self.mass = prov.mass + self.airplay_player = airplay_player + # always generate a new active remote id to prevent race conditions + # with the named pipe used to send commands + self.active_remote_id: str = str(randint(1000, 8000)) + self.start_ntp: int | None = None # use as checksum + self._log_reader_task: asyncio.Task | None = None + self._cliraop_proc: asyncio.subprocess.Process | None = None - def __init__( - self, mass: MusicAssistant, player_id: str, discovery_info: interface.BaseConfig - ) -> None: - """Initialize power manager.""" - self.player_id = player_id - self.discovery_info = discovery_info - self.mass = mass - self.atv: AppleTVInterface | None = None - self.connected = False - self._connection_attempts = 0 - self._connection_was_lost = False - self._playing: interface.Playing | None = None - self.logger = self.mass.players.logger.getChild("airplay").getChild(self.player_id) - self.cliraop_proc: AsyncProcess | None = None - self.active_remote_id = str(randint(1000, 8000)) - self.optimistic_state: PlayerState = PlayerState.IDLE - - def connection_lost(self, _): - """Device was unexpectedly disconnected. - - This is a callback function from pyatv.interface.DeviceListener. - """ - self.logger.warning('Connection lost to Apple TV "%s"', self.discovery_info.name) - self._connection_was_lost = True - self._handle_disconnect() + @property + def running(self) -> bool: + """Return bool if we're running.""" + return self._cliraop_proc and self._cliraop_proc.returncode is None - def connection_closed(self): - """Device connection was (intentionally) closed. + async def init_cliraop(self, start_ntp: int) -> None: + """Initialize CLIRaop process for a player.""" + self.start_ntp = start_ntp + extra_args = [] + player_id = self.airplay_player.player_id + mass_player = self.mass.players.get(player_id) + latency = self.mass.config.get_raw_player_config_value( + player_id, CONF_LATENCY, DEFAULT_LATENCY + ) + extra_args += ["-l", str(latency)] + if self.mass.config.get_raw_player_config_value(player_id, CONF_ENCRYPTION, False): + extra_args += ["-e"] + if self.mass.config.get_raw_player_config_value(player_id, CONF_ALAC_ENCODE, True): + extra_args += ["-a"] + sync_adjust = self.mass.config.get_raw_player_config_value(player_id, CONF_SYNC_ADJUST, 0) + if device_password := self.mass.config.get_raw_player_config_value( + player_id, CONF_PASSWORD, None + ): + # NOTE: This may not work as we might need to do + # some fancy hashing with the plain password first?! + extra_args += ["-P", device_password] + if self.airplay_player.logger.level == logging.DEBUG: + extra_args += ["-d", "5"] - This is a callback function from pyatv.interface.DeviceListener. - """ - self.connected = False - self._handle_disconnect() - - def _handle_disconnect(self): - """Handle that the device disconnected and restart connect loop.""" - self.connected = False - if self.atv: - self.atv.close() - self.atv = None - - async def connect(self): - """Connect to device.""" - if self.connected: - return - try: - await self._connect(self.discovery_info) - except Exception: - # retry with scanning for the device - if conf := await self._scan(): - await self._connect(conf) - raise - - async def disconnect(self): - """Disconnect from device.""" - self.logger.debug("Disconnecting from device") - self.is_on = False - self.connected = False - try: - if self.atv: - self.atv.close() - self.atv = None - except Exception: # pylint: disable=broad-except - self.logger.exception("An error occurred while disconnecting") + args = [ + self.prov.cliraop_bin, + "-n", + str(start_ntp), + "-p", + str(self.airplay_player.discovery_info.port), + "-w", + str(2500 - sync_adjust), + "-v", + str(mass_player.volume_level), + *extra_args, + "-dacp", + self.prov.dacp_id, + "-ar", + self.active_remote_id, + "-md", + self.airplay_player.discovery_info.decoded_properties["md"], + "-et", + self.airplay_player.discovery_info.decoded_properties["et"], + str(self.airplay_player.discovery_info.parsed_addresses()[0]), + "-", + ] + if platform.system() == "Darwin": + os.environ["DYLD_LIBRARY_PATH"] = "/usr/local/lib" + self._cliraop_proc = await asyncio.create_subprocess_exec( + *args, + stdin=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + close_fds=True, + ) + self._log_reader_task = asyncio.create_task(self._log_watcher()) async def stop(self): - """Stop playback and cleanup any running CLIRaop Process.""" - if self.cliraop_proc and not self.cliraop_proc.closed: - # prefer interactive command to our streamer - await self.send_cli_command("ACTION=STOP") - await self.cliraop_proc.wait() - self.optimistic_state = PlayerState.IDLE - self.update_attributes() - elif atv := self.atv: - await atv.remote_control.stop() + """Stop playback and cleanup.""" + if not self.running: + return + # prefer interactive command to our streamer + await self.send_cli_command("ACTION=STOP") + # use communicate to clear stdin/stdout and wait for exit + await self._cliraop_proc.wait() + # stop background task + if self._log_reader_task and not self._log_reader_task.done(): + self._log_reader_task.cancel() async def send_cli_command(self, command: str) -> None: """Send an interactive command to the running CLIRaop binary.""" - if not self.cliraop_proc or self.cliraop_proc.closed: + if not self.running: return named_pipe = f"/tmp/fifo-{self.active_remote_id}" # noqa: S108 @@ -255,175 +273,83 @@ class AirPlayPlayer(DeviceListener): with open(named_pipe, "w") as f: f.write(command) - self.logger.debug("sending command %s", command) + self.airplay_player.logger.debug("sending command %s", command) await self.mass.create_task(send_data) - async def _scan(self) -> ATVConf | None: - """Try to find device by scanning for it.""" - address: str = self.discovery_info.address - - self.logger.debug("Discovering device %s", self.discovery_info.name) - atvs = await scan( - self.mass.loop, - identifier=self.discovery_info.identifier, - aiozc=self.mass.aiozc, - hosts=[address], + async def _log_watcher(self) -> None: + """Monitor stderr for the running CLIRaop process.""" + airplay_player = self.airplay_player + mass_player = self.mass.players.get(airplay_player.player_id) + logger = airplay_player.logger + airplay_player.logger.debug("Starting log watcher task...") + async for line in self._cliraop_proc.stderr: + line = line.decode().strip() # noqa: PLW2901 + if not line: + continue + logger.debug(line) + if "set pause" in line: + mass_player.state = PlayerState.PAUSED + self.mass.players.update(airplay_player.player_id) + elif "Restarted at" in line: + mass_player.state = PlayerState.PLAYING + self.mass.players.update(airplay_player.player_id) + elif "after start), played" in line: + millis = int(line.split("played ")[1].split(" ")[0]) + mass_player.elapsed_time = millis / 1000 + mass_player.elapsed_time_last_updated = time.time() + elif "restarting w/o pause" in line: + # streaming has started + mass_player.state = PlayerState.PLAYING + mass_player.elapsed_time = 0 + mass_player.elapsed_time_last_updated = time.time() + self.mass.players.update(airplay_player.player_id) + + # if we reach this point, the process exited + airplay_player.logger.debug("Log watcher task finished...") + mass_player.state = PlayerState.IDLE + self.mass.players.update(airplay_player.player_id) + logger.debug( + "CLIRaop process stopped with errorcode %s", + self._cliraop_proc.returncode, ) - if atvs: - return cast(ATVConf, atvs[0]) - self.logger.debug( - "Failed to find device %s with address %s", - self.discovery_info.name, - address, - ) - return None + async def write_chunk(self, data: bytes) -> None: + """Write a chunk of (pcm) data to the stdin of CLIRaop.""" + if not self.running or not self._cliraop_proc.stdin.can_write_eof(): + return + self._cliraop_proc.stdin.write(data) + if not self.running or not self._cliraop_proc.stdin.can_write_eof(): + return + with suppress(BrokenPipeError): + await self._cliraop_proc.stdin.drain() - async def _connect(self, conf: ATVConf) -> None: - """Connect to device.""" - credentials: dict[int, str | None] = self.mass.config.get_raw_player_config_value( - self.player_id, CONF_CREDENTIALS, {} - ) - name: str = self.discovery_info.name - missing_protocols = [] - for protocol_int, creds in credentials.items(): - protocol = Protocol(int(protocol_int)) - if conf.get_service(protocol) is not None: - conf.set_credentials(protocol, creds) # type: ignore[arg-type] - else: - missing_protocols.append(protocol.name) - - if missing_protocols: - missing_protocols_str = ", ".join(missing_protocols) - self.logger.info( - "Protocol(s) %s not yet found for %s, trying later", - missing_protocols_str, - name, - ) + async def write_eof(self, data: bytes) -> None: + """Write a chunk of (pcm) data to the stdin of CLIRaop.""" + if not self.running or not self._cliraop_proc.stdin.can_write_eof(): + return + self._cliraop_proc.stdin.write_eof() + if not self.running or not self._cliraop_proc.stdin.can_write_eof(): return + with suppress(BrokenPipeError): + await self._cliraop_proc.stdin.drain() - self.logger.debug("Connecting to device %s", name) - session = self.mass.http_session - self.atv = await connect(conf, self.mass.loop, session=session) - self.connected = True - self.atv.power.listener = self - self.atv.listener = self - self.atv.audio.listener = self - if self.atv.features.in_state( - interface.FeatureState.Available, interface.FeatureName.PushUpdates - ): - self.atv.push_updater.listener = self - self.atv.push_updater.start() - self.address_updated(str(conf.address)) +@dataclass +class AirPlayPlayer: + """Holds the details of the (discovered) Airplay (RAOP) player.""" - self._setup_device() - self.update_attributes() - - self._connection_attempts = 0 - if self._connection_was_lost: - self.logger.info( - 'Connection was (re)established to device "%s"', - name, - ) - self._connection_was_lost = False - - def _setup_device(self): - if not (mass_player := self.mass.players.get(self.player_id)): - mass_player = Player( - player_id=self.player_id, - provider=DOMAIN, - type=PlayerType.PLAYER, - name=self.discovery_info.name, - available=True, - powered=False, - device_info=DeviceInfo( - model=self.discovery_info.device_info.raw_model, - manufacturer="Apple", - address=str(self.discovery_info.address), - ), - supported_features=( - PlayerFeature.PAUSE, - PlayerFeature.SYNC, - PlayerFeature.VOLUME_SET, - PlayerFeature.POWER, - ), - max_sample_rate=44100, - supports_24bit=False, - ) - if self.atv: - dev_info = self.atv.device_info - mass_player.device_info = DeviceInfo( - model=( - dev_info.raw_model - if dev_info.model == DeviceModel.Unknown and dev_info.raw_model - else model_str(dev_info.model) - ), - manufacturer="Apple", - address=str(self.discovery_info.address), - ) - self.mass.players.register_or_update(mass_player) - - def playstatus_update(self, _, playstatus: interface.Playing) -> None: - """Inform about changes to what is currently playing.""" - self.logger.debug("Playstatus received: %s", playstatus) - self._playing = playstatus - self.update_attributes() - - def playstatus_error(self, updater, exception: Exception) -> None: - """Inform about an error when updating play status.""" - self.logger.debug("Playstatus error received", exc_info=exception) - self._playing = None - self.update_attributes() - - def powerstate_update(self, old_state: PowerState, new_state: PowerState) -> None: - """Update power state when it changes.""" - self.update_attributes() - - def volume_update(self, old_level: float, new_level: float) -> None: - """Update volume when it changes.""" - self.update_attributes() - - def update_attributes(self) -> None: - """Update the player attributes.""" - mass_player = self.mass.players.get(self.player_id) - mass_player.volume_level = int(self.atv.audio.volume) - mass_player.powered = self.connected or self.cliraop_proc and not self.cliraop_proc.closed - if self.cliraop_proc and not self.cliraop_proc.closed: - mass_player.state = self.optimistic_state - # NOTE: alapsed time is pushed from cliraop - elif self.atv is None or not self.connected: - mass_player.powered = False - mass_player.state = PlayerState.IDLE - elif self._playing: - state = self._playing.device_state - if state in (DeviceState.Idle, DeviceState.Loading): - mass_player.state = PlayerState.IDLE - elif state == DeviceState.Playing: - mass_player.state = PlayerState.PLAYING - elif state in (DeviceState.Paused, DeviceState.Seeking, DeviceState.Stopped): - mass_player.state = PlayerState.PAUSED - else: - mass_player.state = PlayerState.IDLE - mass_player.elapsed_time = self._playing.position or 0 - mass_player.elapsed_time_last_updated = time.time() - mass_player.current_item_id = self._playing.content_identifier - else: - mass_player.state = PlayerState.IDLE - self.mass.players.update(self.player_id) - - def address_updated(self, address): - """Update cached address in config entry.""" - self.logger.debug("Changing address to %s", address) - self._setup_device() + player_id: str + discovery_info: AsyncServiceInfo + logger: logging.Logger + active_stream: AirplayStreamJob | None = None class AirplayProvider(PlayerProvider): """Player provider for Airplay based players.""" - _atv_players: dict[str, AirPlayPlayer] + cliraop_bin: str | None = None + _players: dict[str, AirPlayPlayer] _discovery_running: bool = False - _cliraop_bin: str | None = None _stream_tasks: dict[str, asyncio.Task] _dacp_server: asyncio.Server = None _dacp_info: AsyncServiceInfo = None @@ -435,12 +361,10 @@ class AirplayProvider(PlayerProvider): async def handle_async_init(self) -> None: """Handle async initialization of the provider.""" - self._atv_players = {} + self._players = {} self._stream_tasks = {} - self._cliraop_bin = await self.get_cliraop_binary() + self.cliraop_bin = await self._getcliraop_binary() dacp_port = await select_free_port(39831, 49831) - # the pyatv logger is way to noisy, silence it a bit - logging.getLogger("pyatv").setLevel(self.logger.level + 10) self.dacp_id = dacp_id = f"{randrange(2 ** 64):X}" self.logger.debug("Starting DACP ActiveRemote %s on port %s", dacp_id, dacp_port) self._dacp_server = await asyncio.start_server( @@ -463,14 +387,48 @@ class AirplayProvider(PlayerProvider): ) await self.mass.aiozc.async_register_service(self._dacp_info) - async def loaded_in_mass(self) -> None: - """Call after the provider has been loaded.""" - await self._run_discovery() + async def on_mdns_service_state_change( + self, name: str, state_change: ServiceStateChange, info: AsyncServiceInfo | None + ) -> None: + """Handle MDNS service state callback.""" + raw_id, display_name = name.split(".")[0].split("@", 1) + player_id = f"ap{raw_id.lower()}" + # handle removed player + if state_change == ServiceStateChange.Removed: + self.logger.debug("Airplay device %s removed", name) + if mass_player := self.mass.players.get(player_id): + # the player has become unavailable + self.logger.info("Player removed %s", display_name) + mass_player.available = False + self.mass.players.update(player_id) + return + # handle update for existing device + if airplay_player := self._players.get(player_id): + if mass_player := self.mass.players.get(player_id): + cur_address = info.parsed_addresses()[0] + prev_address = airplay_player.discovery_info.parsed_addresses()[0] + if cur_address != prev_address: + airplay_player.logger.info( + "Address updated from %s to %s", prev_address, cur_address + ) + mass_player.device_info = DeviceInfo( + model=mass_player.device_info.model, + manufacturer=mass_player.device_info.manufacturer, + address=str(cur_address), + ) + if not mass_player.available: + mass_player.available = True + # always update the latest discovery info + airplay_player.discovery_info = info + self.mass.players.update(player_id) + return + # handle new player + await self._setup_player(player_id, display_name, info) async def unload(self) -> None: """Handle close/cleanup of the provider.""" - # power off all players (will disconnct and close cliraop) - for player_id in self._atv_players: + # power off all players (will disconnect and close cliraop) + for player_id in self._players: await self.cmd_power(player_id, False) # shutdown DACP server if self._dacp_server: @@ -479,90 +437,6 @@ class AirplayProvider(PlayerProvider): if self._dacp_info: await self.mass.aiozc.async_unregister_service(self._dacp_info) - async def _handle_dacp_request( # noqa: PLR0915 - self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter - ) -> None: - """Handle new connection on the socket.""" - try: - raw_request = b"" - while recv := await reader.read(1024): - raw_request += recv - if len(recv) < 1024: - break - - request = raw_request.decode("UTF-8") - headers_raw, body = request.split("\r\n\r\n", 1) - headers_raw = headers_raw.split("\r\n") - headers = {} - for line in headers_raw[1:]: - x, y = line.split(":", 1) - headers[x.strip()] = y.strip() - active_remote = headers.get("Active-Remote") - _, path, _ = headers_raw[0].split(" ") - atv_player = next( - (x for x in self._atv_players.values() if x.active_remote_id == active_remote), None - ) - self.logger.debug( - "DACP request for %s (%s): %s -- %s", - atv_player.discovery_info.name if atv_player else "UNKNOWN PLAYER", - active_remote, - path, - body, - ) - if not atv_player: - return - - player_id = atv_player.player_id - if path == "/ctrl-int/1/nextitem": - self.mass.create_task(self.mass.player_queues.next(player_id)) - elif path == "/ctrl-int/1/previtem": - self.mass.create_task(self.mass.player_queues.previous(player_id)) - elif path == "/ctrl-int/1/play": - self.mass.create_task(self.mass.player_queues.play(player_id)) - elif path == "/ctrl-int/1/playpause": - self.mass.create_task(self.mass.player_queues.play_pause(player_id)) - elif path == "/ctrl-int/1/stop": - self.mass.create_task(self.cmd_stop(player_id)) - elif path == "/ctrl-int/1/volumeup": - self.mass.create_task(self.mass.players.cmd_volume_up(player_id)) - elif path == "/ctrl-int/1/volumedown": - self.mass.create_task(self.mass.players.cmd_volume_down(player_id)) - elif path == "/ctrl-int/1/shuffle_songs": - queue = self.mass.player_queues.get(player_id) - self.mass.create_task( - self.mass.player_queues.set_shuffle(player_id, not queue.shuffle_enabled) - ) - elif path in ("/ctrl-int/1/pause", "/ctrl-int/1/discrete-pause"): - self.mass.create_task(self.mass.player_queues.pause(player_id)) - elif "dmcp.device-volume=" in path: - raop_volume = float(path.split("dmcp.device-volume=", 1)[-1]) - volume = convert_airplay_volume(raop_volume) - if abs(volume - int(atv_player.atv.audio.volume)) > 2: - self.mass.create_task(self.cmd_volume_set(player_id, volume)) - elif "dmcp.volume=" in path: - volume = int(path.split("dmcp.volume=", 1)[-1]) - if abs(volume - int(atv_player.atv.audio.volume)) > 2: - self.mass.create_task(self.cmd_volume_set(player_id, volume)) - else: - self.logger.debug( - "Unknown DACP request for %s: %s", - atv_player.discovery_info.name, - path, - ) - - # send response - date_str = utc().strftime("%a, %-d %b %Y %H:%M:%S") - response = ( - f"HTTP/1.0 204 No Content\r\nDate: {date_str} " - "GMT\r\nDAAP-Server: iTunes/7.6.2 (Windows; N;)\r\nContent-Type: " - "application/x-dmap-tagged\r\nContent-Length: 0\r\n" - "Connection: close\r\n\r\n" - ) - writer.write(response.encode()) - await writer.drain() - finally: - writer.close() - async def get_player_config_entries(self, player_id: str) -> tuple[ConfigEntry, ...]: """Return all (provider/player specific) Config Entries for the given player (if any).""" entries = await super().get_player_config_entries(player_id) @@ -573,14 +447,18 @@ class AirplayProvider(PlayerProvider): - player_id: player_id of the player to handle the command. """ - if stream_task := self._stream_tasks.pop(player_id, None): - if not stream_task.done(): - stream_task.cancel() + + async def stop_player(airplay_player: AirPlayPlayer) -> None: + if airplay_player.active_stream: + await airplay_player.active_stream.stop() + mass_player = self.mass.players.get(airplay_player.player_id) + mass_player.state = PlayerState.IDLE + self.mass.players.update(airplay_player.player_id) # forward command to player and any connected sync members async with asyncio.TaskGroup() as tg: - for atv_player in self._get_sync_clients(player_id): - tg.create_task(atv_player.stop()) + for airplay_player in self._get_sync_clients(player_id): + tg.create_task(stop_player(airplay_player)) async def cmd_play(self, player_id: str) -> None: """Send PLAY (unpause) command to given player. @@ -589,14 +467,10 @@ class AirplayProvider(PlayerProvider): """ # forward command to player and any connected sync members async with asyncio.TaskGroup() as tg: - for atv_player in self._get_sync_clients(player_id): - if atv_player.cliraop_proc and not atv_player.cliraop_proc.closed: + for airplay_player in self._get_sync_clients(player_id): + if airplay_player.active_stream and airplay_player.active_stream.running: # prefer interactive command to our streamer - tg.create_task(atv_player.send_cli_command("ACTION=PLAY")) - atv_player.optimistic_state = PlayerState.PLAYING - atv_player.update_attributes() - elif atv := atv_player.atv: - tg.create_task(atv.remote_control.play()) + tg.create_task(airplay_player.active_stream.send_cli_command("ACTION=PLAY")) async def cmd_pause(self, player_id: str) -> None: """Send PAUSE command to given player. @@ -605,14 +479,10 @@ class AirplayProvider(PlayerProvider): """ # forward command to player and any connected sync members async with asyncio.TaskGroup() as tg: - for atv_player in self._get_sync_clients(player_id): - if atv_player.cliraop_proc and not atv_player.cliraop_proc.closed: + for airplay_player in self._get_sync_clients(player_id): + if airplay_player.active_stream and airplay_player.active_stream.running: # prefer interactive command to our streamer - tg.create_task(atv_player.send_cli_command("ACTION=PAUSE")) - atv_player.optimistic_state = PlayerState.PAUSED - atv_player.update_attributes() - elif atv := atv_player.atv: - tg.create_task(atv.remote_control.pause()) + tg.create_task(airplay_player.active_stream.send_cli_command("ACTION=PAUSE")) async def play_media( self, @@ -631,10 +501,8 @@ class AirplayProvider(PlayerProvider): - seek_position: Optional seek to this position. - fade_in: Optionally fade in the item at playback start. """ - # stop existing streams first + # always stop existing stream first await self.cmd_stop(player_id) - # power on player if needed - await self.cmd_power(player_id, True) # start streaming the queue (pcm) audio in a background task queue = self.mass.player_queues.get_active_queue(player_id) self._stream_tasks[player_id] = asyncio.create_task( @@ -661,10 +529,8 @@ class AirplayProvider(PlayerProvider): This is a special feature from the Universal Group provider. """ - # stop existing streams first + # always stop existing stream first await self.cmd_stop(player_id) - # power on player if needed - await self.cmd_power(player_id, True) if stream_job.pcm_format.bit_depth != 16 or stream_job.pcm_format.sample_rate != 44100: # TODO: resample on the fly here ? raise RuntimeError("Unsupported PCM format") @@ -686,83 +552,70 @@ class AirplayProvider(PlayerProvider): if player.synced_to: # should not happen, but just in case raise RuntimeError("Player is synced") - player.elapsed_time = 0 - player.elapsed_time_last_updated = time.time() - player.state = PlayerState.PLAYING - self.mass.players.update(player_id) - # NOTE: Although the pyatv library is perfectly capable of playback - # to not only raop targets but also airplay 1 + 2, its not suitable - # for synced playback to multiple clients at once. - # Also the performance is horrible. Python is not suitable for realtime - # audio streaming. - # So, I've decided to a combined route here. I've created a small binary + + # Python is not suitable for realtime audio streaming. + # So, I've decided to go the fancy route here. I've created a small binary # written in C based on libraop to do the actual timestamped playback. # the raw pcm audio is fed to the stdin of this cliraop binary and we can # send some commands over a named pipe. # get current ntp before we start - _, stdout = await check_output(f"{self._cliraop_bin} -ntp") - ntp = int(stdout.strip()) + _, stdout = await check_output(f"{self.cliraop_bin} -ntp") + start_ntp = int(stdout.strip()) # setup Raop process for player and its sync childs - async with asyncio.TaskGroup() as tg: - for atv_player in self._get_sync_clients(player_id): - if not atv_player.atv: - # should not be possible, but just in case... - await atv_player.connect() - tg.create_task(self._init_cliraop(atv_player, ntp)) + for airplay_player in self._get_sync_clients(player_id): + # make sure that existing stream is stopped + if airplay_player.active_stream: + await airplay_player.active_stream.stop() + airplay_player.active_stream = AirplayStreamJob(self, airplay_player) + await airplay_player.active_stream.init_cliraop(start_ntp) prev_metadata_checksum: str = "" - try: - async for pcm_chunk in audio_iterator: - # send metadata to player(s) if needed - # NOTE: this must all be done in separate tasks to not disturb audio - if queue and queue.current_item and queue.current_item.streamdetails: - metadata_checksum = ( - queue.current_item.streamdetails.stream_title - or queue.current_item.queue_item_id + async for pcm_chunk in audio_iterator: + # send audio chunk to player(s) + available_clients = 0 + async with asyncio.TaskGroup() as tg: + for airplay_player in self._get_sync_clients(player_id): + if ( + not airplay_player.active_stream + or not airplay_player.active_stream.running + or airplay_player.active_stream.start_ntp != start_ntp + ): + # catch when this stream is no longer active on the player + continue + available_clients += 1 + tg.create_task(airplay_player.active_stream.write_chunk(pcm_chunk)) + # always send the progress + tg.create_task( + airplay_player.active_stream.send_cli_command( + f"PROGRESS={int(queue.elapsed_time)}\n" + ) ) - if prev_metadata_checksum != metadata_checksum: - prev_metadata_checksum = metadata_checksum - self.mass.create_task(self._send_metadata(player_id)) - - async with asyncio.TaskGroup() as tg: - # send progress metadata - if queue.elapsed_time: - for atv_player in self._get_sync_clients(player_id): - tg.create_task( - atv_player.send_cli_command(f"PROGRESS={int(queue.elapsed_time)}\n") - ) - # send audio chunk to player(s) - available_clients = 0 - for atv_player in self._get_sync_clients(player_id): - if not atv_player.cliraop_proc or atv_player.cliraop_proc.closed: - # this may not happen, but just in case - continue - available_clients += 1 - tg.create_task(atv_player.cliraop_proc.write(pcm_chunk)) - if not available_clients: - return - finally: - self.logger.debug("Streaming ended for player %s", player.display_name) - for atv_player in self._get_sync_clients(player_id): - if atv_player.cliraop_proc and not atv_player.cliraop_proc.closed: - atv_player.cliraop_proc.write_eof() - - async def cmd_power(self, player_id: str, powered: bool) -> None: - """Send POWER command to given player. + if not available_clients: + # this streamjob is no longer active + return - - player_id: player_id of the player to handle the command. - - powered: bool if player should be powered on or off. - """ - atv_player = self._atv_players[player_id] - mass_player = self.mass.players.get(player_id) - if powered: - await atv_player.connect() - elif not powered: - await self.cmd_stop(player_id) - await atv_player.disconnect() - mass_player.powered = powered - self.mass.players.update(player_id) + # send metadata to player(s) if needed + # NOTE: this must all be done in separate tasks to not disturb audio + if queue and queue.current_item and queue.current_item.streamdetails: + metadata_checksum = ( + queue.current_item.streamdetails.stream_title + or queue.current_item.queue_item_id + ) + if prev_metadata_checksum != metadata_checksum: + prev_metadata_checksum = metadata_checksum + self.mass.create_task(self._send_metadata(player_id, queue)) + + # end of stream reached - write eof + for airplay_player in self._get_sync_clients(player_id): + if ( + not airplay_player.active_stream + or not airplay_player.active_stream.running + or airplay_player.active_stream.start_ntp != start_ntp + ): + # this may not happen, but guard just in case + continue + await airplay_player.active_stream.write_eof() async def cmd_volume_set(self, player_id: str, volume_level: int) -> None: """Send VOLUME_SET command to given player. @@ -770,12 +623,14 @@ class AirplayProvider(PlayerProvider): - player_id: player_id of the player to handle the command. - volume_level: volume level (0..100) to set on the player. """ - atv_player = self._atv_players[player_id] - if atv_player.cliraop_proc: - # prefer interactive command to our streamer - await atv_player.send_cli_command(f"VOLUME={volume_level}\n") - if atv := atv_player.atv: - await atv.audio.set_volume(volume_level) + airplay_player = self._players[player_id] + if airplay_player.active_stream: + await airplay_player.active_stream.send_cli_command(f"VOLUME={volume_level}\n") + mass_player = self.mass.players.get(player_id) + mass_player.volume_level = volume_level + self.mass.players.update(player_id) + # store last state in cache + await self.mass.cache.set(f"{CACHE_KEY_PREV_VOLUME}.{player_id}", volume_level) async def cmd_sync(self, player_id: str, target_player: str) -> None: """Handle SYNC command for given player. @@ -793,7 +648,7 @@ class AirplayProvider(PlayerProvider): group_leader.group_childs.add(player_id) self.mass.players.update(target_player) if group_leader.powered: - await self.cmd_power(player_id, True) + await self.mass.players.cmd_power(player_id, True) active_queue = self.mass.player_queues.get_active_queue(group_leader.player_id) if active_queue.state == PlayerState.PLAYING: self.mass.create_task(self.mass.player_queues.resume(active_queue.queue_id)) @@ -814,67 +669,11 @@ class AirplayProvider(PlayerProvider): await self.cmd_stop(player_id) self.mass.players.update(player_id) - async def _run_discovery(self) -> None: - """Discover Airplay players on the network.""" - if self._discovery_running: - return - try: - self._discovery_running = True - self.logger.debug("Airplay discovery started...") - discovered_devices = await scan(self.mass.loop, protocol=Protocol.RAOP, timeout=30) - - if not discovered_devices: - self.logger.debug("No devices found") - return - - for dev in discovered_devices: - self.mass.create_task(self._player_discovered(dev)) - - finally: - self._discovery_running = False - - def reschedule(): - self.mass.create_task(self._run_discovery()) - - # reschedule self once finished - self.mass.loop.call_later(300, reschedule) - - async def _player_discovered(self, discovery_info: interface.BaseConfig) -> None: - """Handle discovered Airplay player on mdns.""" - player_id = f"ap{discovery_info.identifier.lower().replace(':', '')}" - if player_id in self._atv_players: - atv_player = self._atv_players[player_id] - if discovery_info.address != atv_player.discovery_info.address: - atv_player.address_updated(discovery_info.address) - return - if "_raop._tcp.local" not in discovery_info.properties: - # skip players without raop - return - self.logger.debug( - "Discovered Airplay device %s on %s", discovery_info.name, discovery_info.address - ) - self._atv_players[player_id] = atv_player = AirPlayPlayer( - self.mass, player_id, discovery_info - ) - atv_player._setup_device() - for player in self.players: - player.can_sync_with = tuple(x for x in self._atv_players if x != player.player_id) - self.mass.players.update(player.player_id) - - def _is_feature_available( - self, atv_player: interface.AppleTV, feature: interface.FeatureName - ) -> bool: - """Return if a feature is available.""" - mass_player = self.mass.players.get(atv_player.device_info.output_device_id) - if atv_player and mass_player.state == PlayerState.PLAYING: - return atv_player.features.in_state(interface.FeatureState.Available, feature) - return False - - async def get_cliraop_binary(self): + async def _getcliraop_binary(self): """Find the correct raop/airplay binary belonging to the platform.""" # ruff: noqa: SIM102 - if self._cliraop_bin is not None: - return self._cliraop_bin + if self.cliraop_bin is not None: + return self.cliraop_bin async def check_binary(cliraop_path: str) -> str | None: try: @@ -886,7 +685,7 @@ class AirplayProvider(PlayerProvider): stdout, _ = await cliraop.communicate() stdout = stdout.strip().decode() if cliraop.returncode == 0 and stdout == "cliraop check": - self._cliraop_bin = cliraop_path + self.cliraop_bin = cliraop_path return cliraop_path except OSError: return None @@ -911,112 +710,162 @@ class AirplayProvider(PlayerProvider): group_child_ids = {player_id} group_child_ids.update(mass_player.group_childs) for child_id in group_child_ids: - if client := self._atv_players.get(child_id): + if client := self._players.get(child_id): sync_clients.append(client) return sync_clients - async def _init_cliraop(self, atv_player: AirPlayPlayer, ntp: int) -> None: # noqa: PLR0915 - """Initiatlize CLIRaop process for a player.""" - stream: RaopStream | None = next( - (x for x in atv_player.atv.stream.instances if isinstance(x, RaopStream)), None + async def _setup_player( + self, player_id: str, display_name: str, info: AsyncServiceInfo + ) -> None: + """Handle setup of a new player that is discovered using mdns.""" + address = info.parsed_addresses()[0] + # some guards if our info is valid/complete + if address == "127.0.0.1": + return + if "md" not in info.decoded_properties: + return + if "et" not in info.decoded_properties: + return + self.logger.debug("Discovered Airplay device %s on %s", display_name, address) + self._players[player_id] = AirPlayPlayer( + player_id, discovery_info=info, logger=self.logger.getChild(player_id) ) - if stream is None: - raise RuntimeError("RAOP Not available") - - async def log_watcher(cliraop_proc: AsyncProcess) -> None: - """Monitor stderr for a running CLIRaop process.""" - mass_player = self.mass.players.get(atv_player.player_id) - logger = self.logger.getChild(atv_player.player_id) - async for line in cliraop_proc._proc.stderr: - line = line.decode().strip() # noqa: PLW2901 - if not line: - continue - if "set pause" in line: - atv_player.optimistic_state = PlayerState.PAUSED - atv_player.update_attributes() - logger.info(line) - elif "Restarted at" in line: - atv_player.optimistic_state = PlayerState.PLAYING - atv_player.update_attributes() - logger.info(line) - elif "after start), played" in line: - millis = int(line.split("played ")[1].split(" ")[0]) - mass_player.elapsed_time = millis / 1000 - mass_player.elapsed_time_last_updated = time.time() - else: - logger.debug(line) - # if we reach this point, the process exited - if cliraop_proc._proc.returncode is not None: - cliraop_proc.closed = True - atv_player.optimistic_state = PlayerState.IDLE - atv_player.update_attributes() - logger.debug( - "CLIRaop process stopped with errorcode %s", - cliraop_proc._proc.returncode, + manufacturer, model = get_model_from_am(info.decoded_properties.get("am")) + if "apple tv" in model.lower(): + # For now, we ignore the Apple TV until we implement the authentication. + # maybe we can simply use pyatv only for this part? + # the cliraop application has already been prepared to accept the secret. + self.logger.debug( + "Ignoring %s in discovery due to authentication requirement.", display_name ) - - extra_args = [] - latency = self.mass.config.get_raw_player_config_value( - atv_player.player_id, CONF_LATENCY, 1200 - ) - extra_args += ["-l", str(latency)] - if self.mass.config.get_raw_player_config_value( - atv_player.player_id, CONF_ENCRYPTION, False - ): - extra_args += ["-u"] - if self.mass.config.get_raw_player_config_value( - atv_player.player_id, CONF_ALAC_ENCODE, True - ): - extra_args += ["-a"] - if self.mass.config.get_raw_player_config_value( - atv_player.player_id, CONF_VOLUME_START, False - ): - extra_args += ["-v", str(int(atv_player.atv.audio.volume))] - sync_adjust = self.mass.config.get_raw_player_config_value( - atv_player.player_id, CONF_SYNC_ADJUST, 0 + return + if not self.mass.config.get_raw_player_config_value(player_id, "enabled", True): + self.logger.debug("Ignoring %s in discovery as it is disabled.", display_name) + return + if not (volume := await self.mass.cache.get(f"{CACHE_KEY_PREV_VOLUME}.{player_id}")): + volume = FALLBACK_VOLUME + mass_player = Player( + player_id=player_id, + provider=self.instance_id, + type=PlayerType.PLAYER, + name=display_name, + available=True, + powered=False, + device_info=DeviceInfo( + model=model, + manufacturer=manufacturer, + address=address, + ), + supported_features=( + PlayerFeature.PAUSE, + PlayerFeature.SYNC, + PlayerFeature.VOLUME_SET, + ), + max_sample_rate=44100, + supports_24bit=False, + can_sync_with=tuple(x for x in self._players if x != player_id), + volume_level=volume, ) - if device_password := self.mass.config.get_raw_player_config_value( - atv_player.player_id, CONF_PASSWORD, None - ): - extra_args += ["-P", device_password] - if self.logger.level == logging.DEBUG: - extra_args += ["-d", "5"] + self.mass.players.register_or_update(mass_player) + # update can_sync_with field of all other players + for player in self.players: + if player.player_id == player_id: + continue + player.can_sync_with = tuple(x for x in self._players if x != player.player_id) + self.mass.players.update(player.player_id) - atv_player.optimistic_state = PlayerState.PLAYING - # always generate a new active remote id to prevent race conditions - # with the named pipe used to send commands - atv_player.active_remote_id = str(randint(1000, 8000)) - args = [ - self._cliraop_bin, - "-n", - str(ntp), - "-p", - str(stream.core.service.port), - "-w", - str(2000 + sync_adjust), - *extra_args, - "-dacp", - self.dacp_id, - "-ar", - atv_player.active_remote_id, - "-md", - atv_player.discovery_info.properties["_raop._tcp.local"]["md"], - "-et", - atv_player.discovery_info.properties["_raop._tcp.local"]["et"], - str(atv_player.discovery_info.address), - "-", - ] - if platform.system() == "Darwin": - os.environ["DYLD_LIBRARY_PATH"] = "/usr/local/lib" - atv_player.cliraop_proc = AsyncProcess( - args, enable_stdin=True, enable_stdout=False, enable_stderr=True - ) - await atv_player.cliraop_proc.start() - atv_player.cliraop_proc.attach_task(log_watcher(atv_player.cliraop_proc)) + async def _handle_dacp_request( # noqa: PLR0915 + self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter + ) -> None: + """Handle new connection on the socket.""" + try: + raw_request = b"" + while recv := await reader.read(1024): + raw_request += recv + if len(recv) < 1024: + break + + request = raw_request.decode("UTF-8") + headers_raw, body = request.split("\r\n\r\n", 1) + headers_raw = headers_raw.split("\r\n") + headers = {} + for line in headers_raw[1:]: + x, y = line.split(":", 1) + headers[x.strip()] = y.strip() + active_remote = headers.get("Active-Remote") + _, path, _ = headers_raw[0].split(" ") + airplay_player = next( + ( + x + for x in self._players.values() + if x.active_stream and x.active_stream.active_remote_id == active_remote + ), + None, + ) + self.logger.debug( + "DACP request for %s (%s): %s -- %s", + airplay_player.discovery_info.name if airplay_player else "UNKNOWN PLAYER", + active_remote, + path, + body, + ) + if not airplay_player: + return + + player_id = airplay_player.player_id + mass_player = self.mass.players.get(player_id) + if path == "/ctrl-int/1/nextitem": + self.mass.create_task(self.mass.player_queues.next(player_id)) + elif path == "/ctrl-int/1/previtem": + self.mass.create_task(self.mass.player_queues.previous(player_id)) + elif path == "/ctrl-int/1/play": + self.mass.create_task(self.mass.player_queues.play(player_id)) + elif path == "/ctrl-int/1/playpause": + self.mass.create_task(self.mass.player_queues.play_pause(player_id)) + elif path == "/ctrl-int/1/stop": + self.mass.create_task(self.cmd_stop(player_id)) + elif path == "/ctrl-int/1/volumeup": + self.mass.create_task(self.mass.players.cmd_volume_up(player_id)) + elif path == "/ctrl-int/1/volumedown": + self.mass.create_task(self.mass.players.cmd_volume_down(player_id)) + elif path == "/ctrl-int/1/shuffle_songs": + queue = self.mass.player_queues.get(player_id) + self.mass.create_task( + self.mass.player_queues.set_shuffle(player_id, not queue.shuffle_enabled) + ) + elif path in ("/ctrl-int/1/pause", "/ctrl-int/1/discrete-pause"): + self.mass.create_task(self.mass.player_queues.pause(player_id)) + elif "dmcp.device-volume=" in path: + raop_volume = float(path.split("dmcp.device-volume=", 1)[-1]) + volume = convert_airplay_volume(raop_volume) + if abs(volume - mass_player.volume_level) > 2: + self.mass.create_task(self.cmd_volume_set(player_id, volume)) + elif "dmcp.volume=" in path: + volume = int(path.split("dmcp.volume=", 1)[-1]) + if abs(volume - mass_player.volume_level) > 2: + self.mass.create_task(self.cmd_volume_set(player_id, volume)) + else: + self.logger.debug( + "Unknown DACP request for %s: %s", + airplay_player.discovery_info.name, + path, + ) + + # send response + date_str = utc().strftime("%a, %-d %b %Y %H:%M:%S") + response = ( + f"HTTP/1.0 204 No Content\r\nDate: {date_str} " + "GMT\r\nDAAP-Server: iTunes/7.6.2 (Windows; N;)\r\nContent-Type: " + "application/x-dmap-tagged\r\nContent-Length: 0\r\n" + "Connection: close\r\n\r\n" + ) + writer.write(response.encode()) + await writer.drain() + finally: + writer.close() - async def _send_metadata(self, player_id: str) -> None: + async def _send_metadata(self, player_id: str, queue: PlayerQueue) -> None: """Send metadata to player (and connected sync childs).""" - queue = self.mass.player_queues.get_active_queue(player_id) if not queue or not queue.current_item: return duration = min(queue.current_item.duration or 0, 3600) @@ -1041,15 +890,20 @@ class AirplayProvider(PlayerProvider): cmd = f"TITLE={title or 'Music Assistant'}\nARTIST={artist}\nALBUM={album}\n" cmd += f"DURATION={duration}\nACTION=SENDMETA\n" - for atv_player in self._get_sync_clients(player_id): - await atv_player.send_cli_command(cmd) + for airplay_player in self._get_sync_clients(player_id): + if not airplay_player.active_stream: + continue + await airplay_player.active_stream.send_cli_command(cmd) # get image if not queue.current_item.image: return + # the image format needs to be 500x500 jpeg for maximum compatibility with players image_url = self.mass.metadata.get_image_url( - queue.current_item.image, size=512, prefer_proxy=True + queue.current_item.image, size=500, prefer_proxy=True, image_format="jpeg" ) - for atv_player in self._get_sync_clients(player_id): - await atv_player.send_cli_command(f"ARTWORK={image_url}\n") + for airplay_player in self._get_sync_clients(player_id): + if not airplay_player.active_stream: + continue + await airplay_player.active_stream.send_cli_command(f"ARTWORK={image_url}\n") diff --git a/music_assistant/server/providers/airplay/bin/cliraop-linux-aarch64 b/music_assistant/server/providers/airplay/bin/cliraop-linux-aarch64 index e2713b51..f80b3deb 100755 Binary files a/music_assistant/server/providers/airplay/bin/cliraop-linux-aarch64 and b/music_assistant/server/providers/airplay/bin/cliraop-linux-aarch64 differ diff --git a/music_assistant/server/providers/airplay/bin/cliraop-linux-x86_64 b/music_assistant/server/providers/airplay/bin/cliraop-linux-x86_64 index cf1ff1d9..de944030 100755 Binary files a/music_assistant/server/providers/airplay/bin/cliraop-linux-x86_64 and b/music_assistant/server/providers/airplay/bin/cliraop-linux-x86_64 differ diff --git a/music_assistant/server/providers/airplay/bin/cliraop-macos-arm64 b/music_assistant/server/providers/airplay/bin/cliraop-macos-arm64 index b955a34d..f66534a5 100755 Binary files a/music_assistant/server/providers/airplay/bin/cliraop-macos-arm64 and b/music_assistant/server/providers/airplay/bin/cliraop-macos-arm64 differ diff --git a/music_assistant/server/providers/airplay/manifest.json b/music_assistant/server/providers/airplay/manifest.json index 9e5fb135..b2c8f183 100644 --- a/music_assistant/server/providers/airplay/manifest.json +++ b/music_assistant/server/providers/airplay/manifest.json @@ -3,11 +3,16 @@ "domain": "airplay", "name": "Airplay", "description": "Support for players that support the Airplay protocol.", - "codeowners": ["@music-assistant"], + "codeowners": [ + "@music-assistant" + ], "requirements": [], "documentation": "https://music-assistant.github.io/player-support/airplay/", "multi_instance": false, "builtin": false, "load_by_default": true, - "icon": "cast-variant" + "icon": "cast-variant", + "mdns_discovery": [ + "_raop._tcp.local." + ] } diff --git a/music_assistant/server/providers/chromecast/manifest.json b/music_assistant/server/providers/chromecast/manifest.json index 2540b8d7..09dbdb6c 100644 --- a/music_assistant/server/providers/chromecast/manifest.json +++ b/music_assistant/server/providers/chromecast/manifest.json @@ -3,11 +3,18 @@ "domain": "chromecast", "name": "Chromecast", "description": "Support for Chromecast based players.", - "codeowners": ["@music-assistant"], - "requirements": ["PyChromecast==13.1.0"], + "codeowners": [ + "@music-assistant" + ], + "requirements": [ + "PyChromecast==13.1.0" + ], "documentation": "https://music-assistant.github.io/player-support/google-cast/", "multi_instance": false, "builtin": false, "load_by_default": true, - "icon": "cast" + "icon": "cast", + "mdns_discovery": [ + "_googlecast._tcp.local." + ] } diff --git a/music_assistant/server/server.py b/music_assistant/server/server.py index 48300dad..7be62ed3 100644 --- a/music_assistant/server/server.py +++ b/music_assistant/server/server.py @@ -78,7 +78,6 @@ class MusicAssistant: loop: asyncio.AbstractEventLoop http_session: ClientSession aiozc: AsyncZeroconf - aiobrowser: AsyncServiceBrowser config: ConfigController webserver: WebserverController cache: CacheController @@ -87,6 +86,7 @@ class MusicAssistant: players: PlayerController player_queues: PlayerQueuesController streams: StreamsController + _aiobrowser: AsyncServiceBrowser def __init__(self, storage_path: str) -> None: """Initialize the MusicAssistant Server.""" @@ -109,11 +109,6 @@ class MusicAssistant: # create shared zeroconf instance # TODO: enumerate interfaces and enable IPv6 support self.aiozc = AsyncZeroconf(ip_version=IPVersion.V4Only) - # self.aiobrowser = AsyncServiceBrowser( - # self.aiozc.zeroconf, - # [], - # handlers=[self._on_mdns_service_state_change], - # ) # create shared aiohttp ClientSession self.http_session = ClientSession( loop=self.loop, @@ -154,10 +149,10 @@ class MusicAssistant: await self.streams.setup(await self.config.get_core_config("streams")) # register all api commands (methods with decorator) self._register_api_commands() - # setup discovery - self.create_task(self._setup_discovery()) # load providers await self._load_providers() + # setup discovery + self.create_task(self._setup_discovery()) async def stop(self) -> None: """Stop running the music assistant server.""" @@ -454,6 +449,10 @@ class MusicAssistant: async def unload_provider(self, instance_id: str) -> None: """Unload a provider.""" if provider := self._providers.get(instance_id): + # remove mdns discovery if needed + if provider.manifest.mdns_discovery: + for mdns_type in provider.manifest.mdns_discovery: + self.aiobrowser.types.discard(mdns_type) # make sure to stop any running sync tasks first for sync_task in self.music.in_progress_syncs: if sync_task.provider_instance == instance_id: @@ -573,10 +572,20 @@ class MusicAssistant: tg.create_task(load_provider_manifest(dir_str, dir_path)) async def _setup_discovery(self) -> None: - """Make this Music Assistant instance discoverable on the network.""" + """Handle setup of MDNS discovery.""" + # create a global mdns browser + all_types: set[str] = set() + for prov_manifest in self._provider_manifests.values(): + if prov_manifest.mdns_discovery: + all_types.update(prov_manifest.mdns_discovery) + self._aiobrowser = AsyncServiceBrowser( + self.aiozc.zeroconf, + list(all_types), + handlers=[self._on_mdns_service_state_change], + ) + # register MA itself on mdns to be discovered zeroconf_type = "_mass._tcp.local." server_id = self.server_id - # register MA on mdns to be discovered LOGGER.debug("Starting Zeroconf broadcast...") info = AsyncServiceInfo( zeroconf_type, @@ -607,6 +616,21 @@ class MusicAssistant: ) -> None: """Handle MDNS service state callback.""" + async def process_mdns_state_change(prov: ProviderInstanceType): + if state_change == ServiceStateChange.Removed: + info = None + else: + info = AsyncServiceInfo(service_type, name) + await info.async_request(zeroconf, 3000) + await prov.on_mdns_service_state_change(name, state_change, info) + + LOGGER.debug(f"Service {name} of type {service_type} state changed: {state_change}") + for prov in self._providers.values(): + if not prov.manifest.mdns_discovery: + continue + if service_type in prov.manifest.mdns_discovery: + self.create_task(process_mdns_state_change(prov)) + async def __aenter__(self) -> Self: """Return Context manager.""" await self.start()