More improvements for the Airplay provider (#1100)
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Tue, 20 Feb 2024 20:50:37 +0000 (21:50 +0100)
committerGitHub <noreply@github.com>
Tue, 20 Feb 2024 20:50:37 +0000 (21:50 +0100)
13 files changed:
music_assistant/common/models/provider.py
music_assistant/server/controllers/metadata.py
music_assistant/server/controllers/streams.py
music_assistant/server/helpers/audio.py
music_assistant/server/helpers/images.py
music_assistant/server/models/provider.py
music_assistant/server/providers/airplay/__init__.py
music_assistant/server/providers/airplay/bin/cliraop-linux-aarch64
music_assistant/server/providers/airplay/bin/cliraop-linux-x86_64
music_assistant/server/providers/airplay/bin/cliraop-macos-arm64
music_assistant/server/providers/airplay/manifest.json
music_assistant/server/providers/chromecast/manifest.json
music_assistant/server/server.py

index 114a09d55ee0481743720edd29450e20a0b12c32..404052a3b0c867bb60123b9df90cf4f9bafd58b6 100644 (file)
@@ -48,6 +48,8 @@ class ProviderManifest(DataClassORJSONMixin):
     # if this attribute is omitted and an icon_dark.svg is found in the provider
     # folder, the file contents will be read instead.
     icon_svg_dark: str | None = None
+    # mdns_discovery: list of mdns types to discover
+    mdns_discovery: list[str] | None = None
 
     @classmethod
     async def parse(cls: ProviderManifest, manifest_file: str) -> ProviderManifest:
index 0de9f97ac9ab3d2eaf92fae8e447e96fef51b313..0e8bc06c010e1646f6ffdc3fb71602942c1702a6 100644 (file)
@@ -330,24 +330,35 @@ class MetaDataController(CoreController):
         return None
 
     def get_image_url(
-        self, image: MediaItemImage, size: int = 0, prefer_proxy: bool = False
+        self,
+        image: MediaItemImage,
+        size: int = 0,
+        prefer_proxy: bool = False,
+        image_format: str = "png",
     ) -> str:
         """Get (proxied) URL for MediaItemImage."""
         if image.provider != "url" or prefer_proxy or size:
             # return imageproxy url for images that need to be resolved
             # the original path is double encoded
             encoded_url = urllib.parse.quote(urllib.parse.quote(image.path))
-            return f"{self.mass.streams.base_url}/imageproxy?path={encoded_url}&provider={image.provider}&size={size}"  # noqa: E501
+            return f"{self.mass.streams.base_url}/imageproxy?path={encoded_url}&provider={image.provider}&size={size}&fmt={image_format}"  # noqa: E501
         return image.path
 
     async def get_thumbnail(
-        self, path: str, size: int | None = None, provider: str = "url", base64: bool = False
+        self,
+        path: str,
+        size: int | None = None,
+        provider: str = "url",
+        base64: bool = False,
+        image_format: str = "png",
     ) -> bytes | str:
         """Get/create thumbnail image for path (image url or local path)."""
-        thumbnail = await get_image_thumb(self.mass, path, size=size, provider=provider)
+        thumbnail = await get_image_thumb(
+            self.mass, path, size=size, provider=provider, image_format=image_format
+        )
         if base64:
             enc_image = b64encode(thumbnail).decode()
-            thumbnail = f"data:image/png;base64,{enc_image}"
+            thumbnail = f"data:image/{image_format};base64,{enc_image}"
         return thumbnail
 
     async def handle_imageproxy(self, request: web.Request) -> web.Response:
@@ -355,17 +366,20 @@ class MetaDataController(CoreController):
         path = request.query["path"]
         provider = request.query.get("provider", "url")
         size = int(request.query.get("size", "0"))
+        image_format = request.query.get("fmt", "png")
         if "%" in path:
             # assume (double) encoded url, decode it
             path = urllib.parse.unquote(path)
 
         with suppress(FileNotFoundError):
-            image_data = await self.get_thumbnail(path, size=size, provider=provider)
+            image_data = await self.get_thumbnail(
+                path, size=size, provider=provider, image_format=image_format
+            )
             # we set the cache header to 1 year (forever)
             # the client can use the checksum value to refresh when content changes
             return web.Response(
                 body=image_data,
                 headers={"Cache-Control": "max-age=31536000"},
-                content_type="image/png",
+                content_type=f"image/{image_format}",
             )
         return web.Response(status=404)
index f678a61a55846437e8f4d79911b51ec4ed5e140a..8d46f9591602d7b965d7800ed769dea24e8bf822 100644 (file)
@@ -25,7 +25,7 @@ from music_assistant.common.models.config_entries import (
     ConfigValueOption,
     ConfigValueType,
 )
-from music_assistant.common.models.enums import ConfigEntryType, ContentType
+from music_assistant.common.models.enums import ConfigEntryType, ContentType, MediaType
 from music_assistant.common.models.errors import MediaNotFoundError, QueueEmpty
 from music_assistant.common.models.media_items import AudioFormat
 from music_assistant.constants import (
@@ -515,7 +515,7 @@ class StreamsController(CoreController):
             # feed stdin with pcm audio chunks from origin
             async def read_audio() -> None:
                 try:
-                    async for chunk in get_media_stream(
+                    async for _, chunk in get_media_stream(
                         self.mass,
                         streamdetails=queue_item.streamdetails,
                         pcm_format=pcm_format,
@@ -777,6 +777,8 @@ class StreamsController(CoreController):
         use_crossfade = self.mass.config.get_raw_player_config_value(
             queue.queue_id, CONF_CROSSFADE, False
         )
+        if start_queue_item.media_type != MediaType.TRACK:
+            use_crossfade = False
         pcm_sample_size = int(pcm_format.sample_rate * (pcm_format.bit_depth / 8) * 2)
         self.logger.info(
             "Start Queue Flow stream for Queue %s - crossfade: %s",
@@ -814,12 +816,13 @@ class StreamsController(CoreController):
             )
             crossfade_size = int(pcm_sample_size * crossfade_duration)
             queue_track.streamdetails.seconds_skipped = seek_position
-            buffer_size = crossfade_size if use_crossfade else int(pcm_sample_size * 2)
+            buffer_size = int(pcm_sample_size * 2)  # 2 seconds
+            if use_crossfade:
+                buffer_size += crossfade_size
             bytes_written = 0
             buffer = b""
-            chunk_num = 0
             # handle incoming audio chunks
-            async for chunk in get_media_stream(
+            async for is_last_chunk, chunk in get_media_stream(
                 self.mass,
                 queue_track.streamdetails,
                 pcm_format=pcm_format,
@@ -829,23 +832,24 @@ class StreamsController(CoreController):
                 strip_silence_begin=use_crossfade,
                 strip_silence_end=use_crossfade,
             ):
-                chunk_num += 1
-
-                # throttle buffer, do not allow more than 30 seconds in buffer
+                # throttle buffer, do not allow more than 30 seconds in player's own buffer
                 seconds_buffered = (total_bytes_written + bytes_written) / pcm_sample_size
                 player = self.mass.players.get(queue.queue_id)
                 if seconds_buffered > 60 and player.corrected_elapsed_time > 30:
                     while (seconds_buffered - player.corrected_elapsed_time) > 30:
                         await asyncio.sleep(1)
 
-                ####  HANDLE FIRST PART OF TRACK
+                # ALWAYS APPEND CHUNK TO BUFFER
+                buffer += chunk
+                if not is_last_chunk and len(buffer) < buffer_size:
+                    # buffer is not full enough, move on
+                    continue
 
-                # buffer full for crossfade
-                if last_fadeout_part and (len(buffer) >= buffer_size):
-                    first_part = buffer + chunk
+                ####  HANDLE CROSSFADE OF PREVIOUS TRACK AND NEW TRACK
+                if not is_last_chunk and last_fadeout_part:
                     # perform crossfade
-                    fadein_part = first_part[:crossfade_size]
-                    remaining_bytes = first_part[crossfade_size:]
+                    fadein_part = buffer[:crossfade_size]
+                    remaining_bytes = buffer[crossfade_size:]
                     crossfade_part = await crossfade_pcm_parts(
                         fadein_part,
                         last_fadeout_part,
@@ -855,39 +859,37 @@ class StreamsController(CoreController):
                     # send crossfade_part
                     yield crossfade_part
                     bytes_written += len(crossfade_part)
-                    # also write the leftover bytes from the strip action
+                    # also write the leftover bytes from the crossfade action
                     if remaining_bytes:
                         yield remaining_bytes
                         bytes_written += len(remaining_bytes)
-
+                        del remaining_bytes
                     # clear vars
                     last_fadeout_part = b""
                     buffer = b""
-                    continue
 
-                # enough data in buffer, feed to output
-                if len(buffer) >= (buffer_size * 2):
-                    yield buffer[:buffer_size]
-                    bytes_written += buffer_size
-                    buffer = buffer[buffer_size:] + chunk
-                    continue
+                #### HANDLE END OF TRACK
+                elif is_last_chunk:
+                    if use_crossfade:
+                        # if crossfade is enabled, save fadeout part to pickup for next track
+                        last_fadeout_part = buffer[-crossfade_size:]
+                        remaining_bytes = buffer[:-crossfade_size]
+                        yield remaining_bytes
+                        bytes_written += len(remaining_bytes)
+                        del remaining_bytes
+                    else:
+                        # no crossfade enabled, just yield the (entire) buffer last part
+                        yield buffer
+                        bytes_written += len(buffer)
+                    # clear vars
+                    buffer = b""
 
-                # all other: fill buffer
-                buffer += chunk
-                continue
-
-            #### HANDLE END OF TRACK
-
-            if buffer and use_crossfade:
-                # if crossfade is enabled, save fadeout part to pickup for next track
-                last_fadeout_part = buffer[-crossfade_size:]
-                remaining_bytes = buffer[:-crossfade_size]
-                yield remaining_bytes
-                bytes_written += len(remaining_bytes)
-            elif buffer:
-                # no crossfade enabled, just yield the buffer last part
-                yield buffer
-                bytes_written += len(buffer)
+                #### OTHER: enough data in buffer, feed to output
+                else:
+                    chunk_size = len(chunk)
+                    yield buffer[:chunk_size]
+                    bytes_written += chunk_size
+                    buffer = buffer[chunk_size:]
 
             # update duration details based on the actual pcm data we sent
             # this also accounts for crossfade and silence stripping
index a072ce7f443d3540a127bcf0e254f6a5799fe405..1f5ac3cab451e5f6ec9b00cf9e39631eafbaf157 100644 (file)
@@ -395,8 +395,8 @@ async def get_media_stream(  # noqa: PLR0915
     seek_position: int = 0,
     fade_in: bool = False,
     strip_silence_begin: bool = False,
-    strip_silence_end: bool = True,
-) -> AsyncGenerator[bytes, None]:
+    strip_silence_end: bool = False,
+) -> AsyncGenerator[tuple[bool, bytes], None]:
     """
     Get the (raw PCM) audio stream for the given streamdetails.
 
@@ -458,7 +458,7 @@ async def get_media_stream(  # noqa: PLR0915
                         sample_rate=pcm_format.sample_rate,
                         bit_depth=pcm_format.bit_depth,
                     )
-                    yield stripped_audio
+                    yield (False, stripped_audio)
                     bytes_sent += len(stripped_audio)
                     prev_chunk = b""
                     del stripped_audio
@@ -470,7 +470,7 @@ async def get_media_stream(  # noqa: PLR0915
 
                 # middle part of the track, send previous chunk and collect current chunk
                 if prev_chunk:
-                    yield prev_chunk
+                    yield (False, prev_chunk)
                     bytes_sent += len(prev_chunk)
 
                 prev_chunk = chunk
@@ -484,11 +484,11 @@ async def get_media_stream(  # noqa: PLR0915
                     bit_depth=pcm_format.bit_depth,
                     reverse=True,
                 )
-                yield stripped_audio
+                yield (True, stripped_audio)
                 bytes_sent += len(stripped_audio)
                 del stripped_audio
             else:
-                yield prev_chunk
+                yield (True, prev_chunk)
                 bytes_sent += len(prev_chunk)
 
             del prev_chunk
@@ -551,7 +551,7 @@ async def resolve_radio_stream(mass: MusicAssistant, url: str) -> tuple[str, boo
             LOGGER.debug("Error while parsing radio URL %s: %s", url, err)
 
     result = (url, supports_icy)
-    await mass.cache.set(cache_key, result)
+    await mass.cache.set(cache_key, result, expiration=86400)
     return result
 
 
index 4e54a1a5990ebf5c733d571235d247bae953fc63..ed7ba6e2b525079549d629988246340a515ac0e1 100644 (file)
@@ -35,7 +35,11 @@ async def get_image_data(mass: MusicAssistant, path_or_url: str, provider: str =
 
 
 async def get_image_thumb(
-    mass: MusicAssistant, path_or_url: str, size: int | None, provider: str = "url"
+    mass: MusicAssistant,
+    path_or_url: str,
+    size: int | None,
+    provider: str = "url",
+    image_format: str = "PNG",
 ) -> bytes:
     """Get (optimized) PNG thumbnail from image url."""
     img_data = await get_image_data(mass, path_or_url, provider)
@@ -45,7 +49,7 @@ async def get_image_thumb(
         img = Image.open(BytesIO(img_data))
         if size:
             img.thumbnail((size, size), Image.LANCZOS)  # pylint: disable=no-member
-        img.convert("RGB").save(data, "PNG", optimize=True)
+        img.convert("RGB").save(data, image_format, optimize=True)
         return data.getvalue()
 
     return await asyncio.to_thread(_create_image)
index 9ae02459ba1a59a3819846fb0de8d9ed386e5fe7..3ad79e645b097a3660eedf4d3455eaadfdb7c64d 100644 (file)
@@ -8,6 +8,9 @@ from typing import TYPE_CHECKING
 from music_assistant.constants import CONF_LOG_LEVEL, ROOT_LOGGER_NAME
 
 if TYPE_CHECKING:
+    from zeroconf import ServiceStateChange
+    from zeroconf.asyncio import AsyncServiceInfo
+
     from music_assistant.common.models.config_entries import ProviderConfig
     from music_assistant.common.models.enums import ProviderFeature, ProviderType
     from music_assistant.common.models.provider import ProviderInstance, ProviderManifest
@@ -25,7 +28,7 @@ class Provider:
         self.manifest = manifest
         self.config = config
         mass_logger = logging.getLogger(ROOT_LOGGER_NAME)
-        self.logger = mass_logger.getChild(f"providers.{self.instance_id}")
+        self.logger = mass_logger.getChild(f"providers.{self.domain}")
         log_level = config.get_value(CONF_LOG_LEVEL)
         if log_level == "GLOBAL":
             self.logger.setLevel(mass_logger.level)
@@ -61,6 +64,11 @@ class Provider:
         Called when provider is deregistered (e.g. MA exiting or config reloading).
         """
 
+    async def on_mdns_service_state_change(
+        self, name: str, state_change: ServiceStateChange, info: AsyncServiceInfo | None
+    ) -> None:
+        """Handle MDNS service state callback."""
+
     @property
     def type(self) -> ProviderType:
         """Return type of this provider."""
index 23dedfc0c5b65ba41ff0efb982714a01eb70bd9c..387bdbf48223d08854490fa2b735f04160e106c2 100644 (file)
@@ -8,16 +8,12 @@ import platform
 import socket
 import time
 from collections.abc import AsyncGenerator
+from contextlib import suppress
+from dataclasses import dataclass
 from random import randint, randrange
-from typing import TYPE_CHECKING, cast
-
-from pyatv import connect, interface, scan
-from pyatv.conf import AppleTV as ATVConf
-from pyatv.const import DeviceModel, DeviceState, PowerState, Protocol
-from pyatv.convert import model_str
-from pyatv.interface import AppleTV as AppleTVInterface
-from pyatv.interface import DeviceListener
-from pyatv.protocols.raop import RaopStream
+from typing import TYPE_CHECKING
+
+from zeroconf import ServiceStateChange
 from zeroconf.asyncio import AsyncServiceInfo
 
 from music_assistant.common.helpers.datetime import utc
@@ -39,7 +35,7 @@ from music_assistant.common.models.enums import (
 from music_assistant.common.models.media_items import AudioFormat
 from music_assistant.common.models.player import DeviceInfo, Player
 from music_assistant.common.models.player_queue import PlayerQueue
-from music_assistant.server.helpers.process import AsyncProcess, check_output
+from music_assistant.server.helpers.process import check_output
 from music_assistant.server.models.player_provider import PlayerProvider
 
 if TYPE_CHECKING:
@@ -53,6 +49,7 @@ if TYPE_CHECKING:
 DOMAIN = "airplay"
 
 CONF_LATENCY = "latency"
+DEFAULT_LATENCY = 2000
 CONF_ENCRYPTION = "encryption"
 CONF_ALAC_ENCODE = "alac_encode"
 CONF_VOLUME_START = "volume_start"
@@ -64,13 +61,13 @@ PLAYER_CONFIG_ENTRIES = (
     ConfigEntry(
         key=CONF_LATENCY,
         type=ConfigEntryType.INTEGER,
-        range=(200, 3000),
-        default_value=1200,
+        range=(500, 4000),
+        default_value=DEFAULT_LATENCY,
         label="Latency",
         description="Sets the number of milliseconds of audio buffer in the player. "
-        "This is important to absorb network throughput jitter. "
-        "Note that the resume after pause will be skipping that amount of time "
-        "and volume changes will be delayed by the same amount, when using digital volume.",
+        "This is important to absorb network throughput jitter. \n"
+        "Increase this value if you notice network dropouts at the cost of a slower "
+        "response to commands.",
         advanced=True,
     ),
     ConfigEntry(
@@ -91,15 +88,6 @@ PLAYER_CONFIG_ENTRIES = (
         "(lossless) ALAC at the cost of a bit CPU.",
         advanced=True,
     ),
-    ConfigEntry(
-        key=CONF_VOLUME_START,
-        type=ConfigEntryType.BOOLEAN,
-        default_value=False,
-        label="Send volume at playback start",
-        description="Some players require to send/confirm the volume when playback starts. \n"
-        "Enable this setting if the playback volume does not match the MA interface.",
-        advanced=True,
-    ),
     ConfigEntry(
         key=CONF_SYNC_ADJUST,
         type=ConfigEntryType.INTEGER,
@@ -125,6 +113,8 @@ BACKOFF_TIME_LOWER_LIMIT = 15  # seconds
 BACKOFF_TIME_UPPER_LIMIT = 300  # Five minutes
 
 CONF_CREDENTIALS = "credentials"
+CACHE_KEY_PREV_VOLUME = "airplay_prev_volume"
+FALLBACK_VOLUME = 20
 
 
 async def setup(
@@ -163,88 +153,116 @@ def convert_airplay_volume(value: float) -> int:
     return int(portion + normal_min)
 
 
-class AirPlayPlayer(DeviceListener):
-    """Holds the connection to the apyatv instance and the cliraop."""
+def get_model_from_am(am_property: str | None) -> tuple[str, str]:
+    """Return Manufacturer and Model name from mdns AM property."""
+    manufacturer = "Unknown"
+    model = "Generic Airplay device"
+    if not am_property:
+        return (manufacturer, model)
+    if isinstance(am_property, bytes):
+        am_property = am_property.decode("utf-8")
+    if am_property == "AudioAccessory5,1":
+        model = "HomePod"
+        manufacturer = "Apple"
+    elif "AppleTV" in am_property:
+        model = "Apple TV"
+        manufacturer = "Apple"
+    else:
+        model = am_property
+    return (manufacturer, model)
+
+
+class AirplayStreamJob:
+    """Object that holds the details of a stream job."""
+
+    def __init__(self, prov: AirplayProvider, airplay_player: AirPlayPlayer) -> None:
+        """Initialize AirplayStreamJob."""
+        self.prov = prov
+        self.mass = prov.mass
+        self.airplay_player = airplay_player
+        # always generate a new active remote id to prevent race conditions
+        # with the named pipe used to send commands
+        self.active_remote_id: str = str(randint(1000, 8000))
+        self.start_ntp: int | None = None  # use as checksum
+        self._log_reader_task: asyncio.Task | None = None
+        self._cliraop_proc: asyncio.subprocess.Process | None = None
 
-    def __init__(
-        self, mass: MusicAssistant, player_id: str, discovery_info: interface.BaseConfig
-    ) -> None:
-        """Initialize power manager."""
-        self.player_id = player_id
-        self.discovery_info = discovery_info
-        self.mass = mass
-        self.atv: AppleTVInterface | None = None
-        self.connected = False
-        self._connection_attempts = 0
-        self._connection_was_lost = False
-        self._playing: interface.Playing | None = None
-        self.logger = self.mass.players.logger.getChild("airplay").getChild(self.player_id)
-        self.cliraop_proc: AsyncProcess | None = None
-        self.active_remote_id = str(randint(1000, 8000))
-        self.optimistic_state: PlayerState = PlayerState.IDLE
-
-    def connection_lost(self, _):
-        """Device was unexpectedly disconnected.
-
-        This is a callback function from pyatv.interface.DeviceListener.
-        """
-        self.logger.warning('Connection lost to Apple TV "%s"', self.discovery_info.name)
-        self._connection_was_lost = True
-        self._handle_disconnect()
+    @property
+    def running(self) -> bool:
+        """Return bool if we're running."""
+        return self._cliraop_proc and self._cliraop_proc.returncode is None
 
-    def connection_closed(self):
-        """Device connection was (intentionally) closed.
+    async def init_cliraop(self, start_ntp: int) -> None:
+        """Initialize CLIRaop process for a player."""
+        self.start_ntp = start_ntp
+        extra_args = []
+        player_id = self.airplay_player.player_id
+        mass_player = self.mass.players.get(player_id)
+        latency = self.mass.config.get_raw_player_config_value(
+            player_id, CONF_LATENCY, DEFAULT_LATENCY
+        )
+        extra_args += ["-l", str(latency)]
+        if self.mass.config.get_raw_player_config_value(player_id, CONF_ENCRYPTION, False):
+            extra_args += ["-e"]
+        if self.mass.config.get_raw_player_config_value(player_id, CONF_ALAC_ENCODE, True):
+            extra_args += ["-a"]
+        sync_adjust = self.mass.config.get_raw_player_config_value(player_id, CONF_SYNC_ADJUST, 0)
+        if device_password := self.mass.config.get_raw_player_config_value(
+            player_id, CONF_PASSWORD, None
+        ):
+            # NOTE: This may not work as we might need to do
+            # some fancy hashing with the plain password first?!
+            extra_args += ["-P", device_password]
+        if self.airplay_player.logger.level == logging.DEBUG:
+            extra_args += ["-d", "5"]
 
-        This is a callback function from pyatv.interface.DeviceListener.
-        """
-        self.connected = False
-        self._handle_disconnect()
-
-    def _handle_disconnect(self):
-        """Handle that the device disconnected and restart connect loop."""
-        self.connected = False
-        if self.atv:
-            self.atv.close()
-            self.atv = None
-
-    async def connect(self):
-        """Connect to device."""
-        if self.connected:
-            return
-        try:
-            await self._connect(self.discovery_info)
-        except Exception:
-            # retry with scanning for the device
-            if conf := await self._scan():
-                await self._connect(conf)
-            raise
-
-    async def disconnect(self):
-        """Disconnect from device."""
-        self.logger.debug("Disconnecting from device")
-        self.is_on = False
-        self.connected = False
-        try:
-            if self.atv:
-                self.atv.close()
-                self.atv = None
-        except Exception:  # pylint: disable=broad-except
-            self.logger.exception("An error occurred while disconnecting")
+        args = [
+            self.prov.cliraop_bin,
+            "-n",
+            str(start_ntp),
+            "-p",
+            str(self.airplay_player.discovery_info.port),
+            "-w",
+            str(2500 - sync_adjust),
+            "-v",
+            str(mass_player.volume_level),
+            *extra_args,
+            "-dacp",
+            self.prov.dacp_id,
+            "-ar",
+            self.active_remote_id,
+            "-md",
+            self.airplay_player.discovery_info.decoded_properties["md"],
+            "-et",
+            self.airplay_player.discovery_info.decoded_properties["et"],
+            str(self.airplay_player.discovery_info.parsed_addresses()[0]),
+            "-",
+        ]
+        if platform.system() == "Darwin":
+            os.environ["DYLD_LIBRARY_PATH"] = "/usr/local/lib"
+        self._cliraop_proc = await asyncio.create_subprocess_exec(
+            *args,
+            stdin=asyncio.subprocess.PIPE,
+            stderr=asyncio.subprocess.PIPE,
+            close_fds=True,
+        )
+        self._log_reader_task = asyncio.create_task(self._log_watcher())
 
     async def stop(self):
-        """Stop playback and cleanup any running CLIRaop Process."""
-        if self.cliraop_proc and not self.cliraop_proc.closed:
-            # prefer interactive command to our streamer
-            await self.send_cli_command("ACTION=STOP")
-            await self.cliraop_proc.wait()
-            self.optimistic_state = PlayerState.IDLE
-            self.update_attributes()
-        elif atv := self.atv:
-            await atv.remote_control.stop()
+        """Stop playback and cleanup."""
+        if not self.running:
+            return
+        # prefer interactive command to our streamer
+        await self.send_cli_command("ACTION=STOP")
+        # use communicate to clear stdin/stdout and wait for exit
+        await self._cliraop_proc.wait()
+        # stop background task
+        if self._log_reader_task and not self._log_reader_task.done():
+            self._log_reader_task.cancel()
 
     async def send_cli_command(self, command: str) -> None:
         """Send an interactive command to the running CLIRaop binary."""
-        if not self.cliraop_proc or self.cliraop_proc.closed:
+        if not self.running:
             return
 
         named_pipe = f"/tmp/fifo-{self.active_remote_id}"  # noqa: S108
@@ -255,175 +273,83 @@ class AirPlayPlayer(DeviceListener):
             with open(named_pipe, "w") as f:
                 f.write(command)
 
-        self.logger.debug("sending command %s", command)
+        self.airplay_player.logger.debug("sending command %s", command)
         await self.mass.create_task(send_data)
 
-    async def _scan(self) -> ATVConf | None:
-        """Try to find device by scanning for it."""
-        address: str = self.discovery_info.address
-
-        self.logger.debug("Discovering device %s", self.discovery_info.name)
-        atvs = await scan(
-            self.mass.loop,
-            identifier=self.discovery_info.identifier,
-            aiozc=self.mass.aiozc,
-            hosts=[address],
+    async def _log_watcher(self) -> None:
+        """Monitor stderr for the running CLIRaop process."""
+        airplay_player = self.airplay_player
+        mass_player = self.mass.players.get(airplay_player.player_id)
+        logger = airplay_player.logger
+        airplay_player.logger.debug("Starting log watcher task...")
+        async for line in self._cliraop_proc.stderr:
+            line = line.decode().strip()  # noqa: PLW2901
+            if not line:
+                continue
+            logger.debug(line)
+            if "set pause" in line:
+                mass_player.state = PlayerState.PAUSED
+                self.mass.players.update(airplay_player.player_id)
+            elif "Restarted at" in line:
+                mass_player.state = PlayerState.PLAYING
+                self.mass.players.update(airplay_player.player_id)
+            elif "after start), played" in line:
+                millis = int(line.split("played ")[1].split(" ")[0])
+                mass_player.elapsed_time = millis / 1000
+                mass_player.elapsed_time_last_updated = time.time()
+            elif "restarting w/o pause" in line:
+                # streaming has started
+                mass_player.state = PlayerState.PLAYING
+                mass_player.elapsed_time = 0
+                mass_player.elapsed_time_last_updated = time.time()
+                self.mass.players.update(airplay_player.player_id)
+
+        # if we reach this point, the process exited
+        airplay_player.logger.debug("Log watcher task finished...")
+        mass_player.state = PlayerState.IDLE
+        self.mass.players.update(airplay_player.player_id)
+        logger.debug(
+            "CLIRaop process stopped with errorcode %s",
+            self._cliraop_proc.returncode,
         )
-        if atvs:
-            return cast(ATVConf, atvs[0])
 
-        self.logger.debug(
-            "Failed to find device %s with address %s",
-            self.discovery_info.name,
-            address,
-        )
-        return None
+    async def write_chunk(self, data: bytes) -> None:
+        """Write a chunk of (pcm) data to the stdin of CLIRaop."""
+        if not self.running or not self._cliraop_proc.stdin.can_write_eof():
+            return
+        self._cliraop_proc.stdin.write(data)
+        if not self.running or not self._cliraop_proc.stdin.can_write_eof():
+            return
+        with suppress(BrokenPipeError):
+            await self._cliraop_proc.stdin.drain()
 
-    async def _connect(self, conf: ATVConf) -> None:
-        """Connect to device."""
-        credentials: dict[int, str | None] = self.mass.config.get_raw_player_config_value(
-            self.player_id, CONF_CREDENTIALS, {}
-        )
-        name: str = self.discovery_info.name
-        missing_protocols = []
-        for protocol_int, creds in credentials.items():
-            protocol = Protocol(int(protocol_int))
-            if conf.get_service(protocol) is not None:
-                conf.set_credentials(protocol, creds)  # type: ignore[arg-type]
-            else:
-                missing_protocols.append(protocol.name)
-
-        if missing_protocols:
-            missing_protocols_str = ", ".join(missing_protocols)
-            self.logger.info(
-                "Protocol(s) %s not yet found for %s, trying later",
-                missing_protocols_str,
-                name,
-            )
+    async def write_eof(self, data: bytes) -> None:
+        """Write a chunk of (pcm) data to the stdin of CLIRaop."""
+        if not self.running or not self._cliraop_proc.stdin.can_write_eof():
+            return
+        self._cliraop_proc.stdin.write_eof()
+        if not self.running or not self._cliraop_proc.stdin.can_write_eof():
             return
+        with suppress(BrokenPipeError):
+            await self._cliraop_proc.stdin.drain()
 
-        self.logger.debug("Connecting to device %s", name)
-        session = self.mass.http_session
-        self.atv = await connect(conf, self.mass.loop, session=session)
-        self.connected = True
-        self.atv.power.listener = self
-        self.atv.listener = self
-        self.atv.audio.listener = self
-        if self.atv.features.in_state(
-            interface.FeatureState.Available, interface.FeatureName.PushUpdates
-        ):
-            self.atv.push_updater.listener = self
-            self.atv.push_updater.start()
 
-        self.address_updated(str(conf.address))
+@dataclass
+class AirPlayPlayer:
+    """Holds the details of the (discovered) Airplay (RAOP) player."""
 
-        self._setup_device()
-        self.update_attributes()
-
-        self._connection_attempts = 0
-        if self._connection_was_lost:
-            self.logger.info(
-                'Connection was (re)established to device "%s"',
-                name,
-            )
-            self._connection_was_lost = False
-
-    def _setup_device(self):
-        if not (mass_player := self.mass.players.get(self.player_id)):
-            mass_player = Player(
-                player_id=self.player_id,
-                provider=DOMAIN,
-                type=PlayerType.PLAYER,
-                name=self.discovery_info.name,
-                available=True,
-                powered=False,
-                device_info=DeviceInfo(
-                    model=self.discovery_info.device_info.raw_model,
-                    manufacturer="Apple",
-                    address=str(self.discovery_info.address),
-                ),
-                supported_features=(
-                    PlayerFeature.PAUSE,
-                    PlayerFeature.SYNC,
-                    PlayerFeature.VOLUME_SET,
-                    PlayerFeature.POWER,
-                ),
-                max_sample_rate=44100,
-                supports_24bit=False,
-            )
-        if self.atv:
-            dev_info = self.atv.device_info
-            mass_player.device_info = DeviceInfo(
-                model=(
-                    dev_info.raw_model
-                    if dev_info.model == DeviceModel.Unknown and dev_info.raw_model
-                    else model_str(dev_info.model)
-                ),
-                manufacturer="Apple",
-                address=str(self.discovery_info.address),
-            )
-        self.mass.players.register_or_update(mass_player)
-
-    def playstatus_update(self, _, playstatus: interface.Playing) -> None:
-        """Inform about changes to what is currently playing."""
-        self.logger.debug("Playstatus received: %s", playstatus)
-        self._playing = playstatus
-        self.update_attributes()
-
-    def playstatus_error(self, updater, exception: Exception) -> None:
-        """Inform about an error when updating play status."""
-        self.logger.debug("Playstatus error received", exc_info=exception)
-        self._playing = None
-        self.update_attributes()
-
-    def powerstate_update(self, old_state: PowerState, new_state: PowerState) -> None:
-        """Update power state when it changes."""
-        self.update_attributes()
-
-    def volume_update(self, old_level: float, new_level: float) -> None:
-        """Update volume when it changes."""
-        self.update_attributes()
-
-    def update_attributes(self) -> None:
-        """Update the player attributes."""
-        mass_player = self.mass.players.get(self.player_id)
-        mass_player.volume_level = int(self.atv.audio.volume)
-        mass_player.powered = self.connected or self.cliraop_proc and not self.cliraop_proc.closed
-        if self.cliraop_proc and not self.cliraop_proc.closed:
-            mass_player.state = self.optimistic_state
-            # NOTE: alapsed time is pushed from cliraop
-        elif self.atv is None or not self.connected:
-            mass_player.powered = False
-            mass_player.state = PlayerState.IDLE
-        elif self._playing:
-            state = self._playing.device_state
-            if state in (DeviceState.Idle, DeviceState.Loading):
-                mass_player.state = PlayerState.IDLE
-            elif state == DeviceState.Playing:
-                mass_player.state = PlayerState.PLAYING
-            elif state in (DeviceState.Paused, DeviceState.Seeking, DeviceState.Stopped):
-                mass_player.state = PlayerState.PAUSED
-            else:
-                mass_player.state = PlayerState.IDLE
-            mass_player.elapsed_time = self._playing.position or 0
-            mass_player.elapsed_time_last_updated = time.time()
-            mass_player.current_item_id = self._playing.content_identifier
-        else:
-            mass_player.state = PlayerState.IDLE
-        self.mass.players.update(self.player_id)
-
-    def address_updated(self, address):
-        """Update cached address in config entry."""
-        self.logger.debug("Changing address to %s", address)
-        self._setup_device()
+    player_id: str
+    discovery_info: AsyncServiceInfo
+    logger: logging.Logger
+    active_stream: AirplayStreamJob | None = None
 
 
 class AirplayProvider(PlayerProvider):
     """Player provider for Airplay based players."""
 
-    _atv_players: dict[str, AirPlayPlayer]
+    cliraop_bin: str | None = None
+    _players: dict[str, AirPlayPlayer]
     _discovery_running: bool = False
-    _cliraop_bin: str | None = None
     _stream_tasks: dict[str, asyncio.Task]
     _dacp_server: asyncio.Server = None
     _dacp_info: AsyncServiceInfo = None
@@ -435,12 +361,10 @@ class AirplayProvider(PlayerProvider):
 
     async def handle_async_init(self) -> None:
         """Handle async initialization of the provider."""
-        self._atv_players = {}
+        self._players = {}
         self._stream_tasks = {}
-        self._cliraop_bin = await self.get_cliraop_binary()
+        self.cliraop_bin = await self._getcliraop_binary()
         dacp_port = await select_free_port(39831, 49831)
-        # the pyatv logger is way to noisy, silence it a bit
-        logging.getLogger("pyatv").setLevel(self.logger.level + 10)
         self.dacp_id = dacp_id = f"{randrange(2 ** 64):X}"
         self.logger.debug("Starting DACP ActiveRemote %s on port %s", dacp_id, dacp_port)
         self._dacp_server = await asyncio.start_server(
@@ -463,14 +387,48 @@ class AirplayProvider(PlayerProvider):
         )
         await self.mass.aiozc.async_register_service(self._dacp_info)
 
-    async def loaded_in_mass(self) -> None:
-        """Call after the provider has been loaded."""
-        await self._run_discovery()
+    async def on_mdns_service_state_change(
+        self, name: str, state_change: ServiceStateChange, info: AsyncServiceInfo | None
+    ) -> None:
+        """Handle MDNS service state callback."""
+        raw_id, display_name = name.split(".")[0].split("@", 1)
+        player_id = f"ap{raw_id.lower()}"
+        # handle removed player
+        if state_change == ServiceStateChange.Removed:
+            self.logger.debug("Airplay device %s removed", name)
+            if mass_player := self.mass.players.get(player_id):
+                # the player has become unavailable
+                self.logger.info("Player removed %s", display_name)
+                mass_player.available = False
+                self.mass.players.update(player_id)
+            return
+        # handle update for existing device
+        if airplay_player := self._players.get(player_id):
+            if mass_player := self.mass.players.get(player_id):
+                cur_address = info.parsed_addresses()[0]
+                prev_address = airplay_player.discovery_info.parsed_addresses()[0]
+                if cur_address != prev_address:
+                    airplay_player.logger.info(
+                        "Address updated from %s to %s", prev_address, cur_address
+                    )
+                    mass_player.device_info = DeviceInfo(
+                        model=mass_player.device_info.model,
+                        manufacturer=mass_player.device_info.manufacturer,
+                        address=str(cur_address),
+                    )
+                if not mass_player.available:
+                    mass_player.available = True
+            # always update the latest discovery info
+            airplay_player.discovery_info = info
+            self.mass.players.update(player_id)
+            return
+        # handle new player
+        await self._setup_player(player_id, display_name, info)
 
     async def unload(self) -> None:
         """Handle close/cleanup of the provider."""
-        # power off all players (will disconnct and close cliraop)
-        for player_id in self._atv_players:
+        # power off all players (will disconnect and close cliraop)
+        for player_id in self._players:
             await self.cmd_power(player_id, False)
         # shutdown DACP server
         if self._dacp_server:
@@ -479,90 +437,6 @@ class AirplayProvider(PlayerProvider):
         if self._dacp_info:
             await self.mass.aiozc.async_unregister_service(self._dacp_info)
 
-    async def _handle_dacp_request(  # noqa: PLR0915
-        self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter
-    ) -> None:
-        """Handle new connection on the socket."""
-        try:
-            raw_request = b""
-            while recv := await reader.read(1024):
-                raw_request += recv
-                if len(recv) < 1024:
-                    break
-
-            request = raw_request.decode("UTF-8")
-            headers_raw, body = request.split("\r\n\r\n", 1)
-            headers_raw = headers_raw.split("\r\n")
-            headers = {}
-            for line in headers_raw[1:]:
-                x, y = line.split(":", 1)
-                headers[x.strip()] = y.strip()
-            active_remote = headers.get("Active-Remote")
-            _, path, _ = headers_raw[0].split(" ")
-            atv_player = next(
-                (x for x in self._atv_players.values() if x.active_remote_id == active_remote), None
-            )
-            self.logger.debug(
-                "DACP request for %s (%s): %s -- %s",
-                atv_player.discovery_info.name if atv_player else "UNKNOWN PLAYER",
-                active_remote,
-                path,
-                body,
-            )
-            if not atv_player:
-                return
-
-            player_id = atv_player.player_id
-            if path == "/ctrl-int/1/nextitem":
-                self.mass.create_task(self.mass.player_queues.next(player_id))
-            elif path == "/ctrl-int/1/previtem":
-                self.mass.create_task(self.mass.player_queues.previous(player_id))
-            elif path == "/ctrl-int/1/play":
-                self.mass.create_task(self.mass.player_queues.play(player_id))
-            elif path == "/ctrl-int/1/playpause":
-                self.mass.create_task(self.mass.player_queues.play_pause(player_id))
-            elif path == "/ctrl-int/1/stop":
-                self.mass.create_task(self.cmd_stop(player_id))
-            elif path == "/ctrl-int/1/volumeup":
-                self.mass.create_task(self.mass.players.cmd_volume_up(player_id))
-            elif path == "/ctrl-int/1/volumedown":
-                self.mass.create_task(self.mass.players.cmd_volume_down(player_id))
-            elif path == "/ctrl-int/1/shuffle_songs":
-                queue = self.mass.player_queues.get(player_id)
-                self.mass.create_task(
-                    self.mass.player_queues.set_shuffle(player_id, not queue.shuffle_enabled)
-                )
-            elif path in ("/ctrl-int/1/pause", "/ctrl-int/1/discrete-pause"):
-                self.mass.create_task(self.mass.player_queues.pause(player_id))
-            elif "dmcp.device-volume=" in path:
-                raop_volume = float(path.split("dmcp.device-volume=", 1)[-1])
-                volume = convert_airplay_volume(raop_volume)
-                if abs(volume - int(atv_player.atv.audio.volume)) > 2:
-                    self.mass.create_task(self.cmd_volume_set(player_id, volume))
-            elif "dmcp.volume=" in path:
-                volume = int(path.split("dmcp.volume=", 1)[-1])
-                if abs(volume - int(atv_player.atv.audio.volume)) > 2:
-                    self.mass.create_task(self.cmd_volume_set(player_id, volume))
-            else:
-                self.logger.debug(
-                    "Unknown DACP request for %s: %s",
-                    atv_player.discovery_info.name,
-                    path,
-                )
-
-            # send response
-            date_str = utc().strftime("%a, %-d %b %Y %H:%M:%S")
-            response = (
-                f"HTTP/1.0 204 No Content\r\nDate: {date_str} "
-                "GMT\r\nDAAP-Server: iTunes/7.6.2 (Windows; N;)\r\nContent-Type: "
-                "application/x-dmap-tagged\r\nContent-Length: 0\r\n"
-                "Connection: close\r\n\r\n"
-            )
-            writer.write(response.encode())
-            await writer.drain()
-        finally:
-            writer.close()
-
     async def get_player_config_entries(self, player_id: str) -> tuple[ConfigEntry, ...]:
         """Return all (provider/player specific) Config Entries for the given player (if any)."""
         entries = await super().get_player_config_entries(player_id)
@@ -573,14 +447,18 @@ class AirplayProvider(PlayerProvider):
 
         - player_id: player_id of the player to handle the command.
         """
-        if stream_task := self._stream_tasks.pop(player_id, None):
-            if not stream_task.done():
-                stream_task.cancel()
+
+        async def stop_player(airplay_player: AirPlayPlayer) -> None:
+            if airplay_player.active_stream:
+                await airplay_player.active_stream.stop()
+            mass_player = self.mass.players.get(airplay_player.player_id)
+            mass_player.state = PlayerState.IDLE
+            self.mass.players.update(airplay_player.player_id)
 
         # forward command to player and any connected sync members
         async with asyncio.TaskGroup() as tg:
-            for atv_player in self._get_sync_clients(player_id):
-                tg.create_task(atv_player.stop())
+            for airplay_player in self._get_sync_clients(player_id):
+                tg.create_task(stop_player(airplay_player))
 
     async def cmd_play(self, player_id: str) -> None:
         """Send PLAY (unpause) command to given player.
@@ -589,14 +467,10 @@ class AirplayProvider(PlayerProvider):
         """
         # forward command to player and any connected sync members
         async with asyncio.TaskGroup() as tg:
-            for atv_player in self._get_sync_clients(player_id):
-                if atv_player.cliraop_proc and not atv_player.cliraop_proc.closed:
+            for airplay_player in self._get_sync_clients(player_id):
+                if airplay_player.active_stream and airplay_player.active_stream.running:
                     # prefer interactive command to our streamer
-                    tg.create_task(atv_player.send_cli_command("ACTION=PLAY"))
-                    atv_player.optimistic_state = PlayerState.PLAYING
-                    atv_player.update_attributes()
-                elif atv := atv_player.atv:
-                    tg.create_task(atv.remote_control.play())
+                    tg.create_task(airplay_player.active_stream.send_cli_command("ACTION=PLAY"))
 
     async def cmd_pause(self, player_id: str) -> None:
         """Send PAUSE command to given player.
@@ -605,14 +479,10 @@ class AirplayProvider(PlayerProvider):
         """
         # forward command to player and any connected sync members
         async with asyncio.TaskGroup() as tg:
-            for atv_player in self._get_sync_clients(player_id):
-                if atv_player.cliraop_proc and not atv_player.cliraop_proc.closed:
+            for airplay_player in self._get_sync_clients(player_id):
+                if airplay_player.active_stream and airplay_player.active_stream.running:
                     # prefer interactive command to our streamer
-                    tg.create_task(atv_player.send_cli_command("ACTION=PAUSE"))
-                    atv_player.optimistic_state = PlayerState.PAUSED
-                    atv_player.update_attributes()
-                elif atv := atv_player.atv:
-                    tg.create_task(atv.remote_control.pause())
+                    tg.create_task(airplay_player.active_stream.send_cli_command("ACTION=PAUSE"))
 
     async def play_media(
         self,
@@ -631,10 +501,8 @@ class AirplayProvider(PlayerProvider):
             - seek_position: Optional seek to this position.
             - fade_in: Optionally fade in the item at playback start.
         """
-        # stop existing streams first
+        # always stop existing stream first
         await self.cmd_stop(player_id)
-        # power on player if needed
-        await self.cmd_power(player_id, True)
         # start streaming the queue (pcm) audio in a background task
         queue = self.mass.player_queues.get_active_queue(player_id)
         self._stream_tasks[player_id] = asyncio.create_task(
@@ -661,10 +529,8 @@ class AirplayProvider(PlayerProvider):
 
         This is a special feature from the Universal Group provider.
         """
-        # stop existing streams first
+        # always stop existing stream first
         await self.cmd_stop(player_id)
-        # power on player if needed
-        await self.cmd_power(player_id, True)
         if stream_job.pcm_format.bit_depth != 16 or stream_job.pcm_format.sample_rate != 44100:
             # TODO: resample on the fly here ?
             raise RuntimeError("Unsupported PCM format")
@@ -686,83 +552,70 @@ class AirplayProvider(PlayerProvider):
         if player.synced_to:
             # should not happen, but just in case
             raise RuntimeError("Player is synced")
-        player.elapsed_time = 0
-        player.elapsed_time_last_updated = time.time()
-        player.state = PlayerState.PLAYING
-        self.mass.players.update(player_id)
-        # NOTE: Although the pyatv library is perfectly capable of playback
-        # to not only raop targets but also airplay 1 + 2, its not suitable
-        # for synced playback to multiple clients at once.
-        # Also the performance is horrible. Python is not suitable for realtime
-        # audio streaming.
-        # So, I've decided to a combined route here. I've created a small binary
+
+        # Python is not suitable for realtime audio streaming.
+        # So, I've decided to go the fancy route here. I've created a small binary
         # written in C based on libraop to do the actual timestamped playback.
         # the raw pcm audio is fed to the stdin of this cliraop binary and we can
         # send some commands over a named pipe.
 
         # get current ntp before we start
-        _, stdout = await check_output(f"{self._cliraop_bin} -ntp")
-        ntp = int(stdout.strip())
+        _, stdout = await check_output(f"{self.cliraop_bin} -ntp")
+        start_ntp = int(stdout.strip())
 
         # setup Raop process for player and its sync childs
-        async with asyncio.TaskGroup() as tg:
-            for atv_player in self._get_sync_clients(player_id):
-                if not atv_player.atv:
-                    # should not be possible, but just in case...
-                    await atv_player.connect()
-                tg.create_task(self._init_cliraop(atv_player, ntp))
+        for airplay_player in self._get_sync_clients(player_id):
+            # make sure that existing stream is stopped
+            if airplay_player.active_stream:
+                await airplay_player.active_stream.stop()
+            airplay_player.active_stream = AirplayStreamJob(self, airplay_player)
+            await airplay_player.active_stream.init_cliraop(start_ntp)
         prev_metadata_checksum: str = ""
-        try:
-            async for pcm_chunk in audio_iterator:
-                # send metadata to player(s) if needed
-                # NOTE: this must all be done in separate tasks to not disturb audio
-                if queue and queue.current_item and queue.current_item.streamdetails:
-                    metadata_checksum = (
-                        queue.current_item.streamdetails.stream_title
-                        or queue.current_item.queue_item_id
+        async for pcm_chunk in audio_iterator:
+            # send audio chunk to player(s)
+            available_clients = 0
+            async with asyncio.TaskGroup() as tg:
+                for airplay_player in self._get_sync_clients(player_id):
+                    if (
+                        not airplay_player.active_stream
+                        or not airplay_player.active_stream.running
+                        or airplay_player.active_stream.start_ntp != start_ntp
+                    ):
+                        # catch when this stream is no longer active on the player
+                        continue
+                    available_clients += 1
+                    tg.create_task(airplay_player.active_stream.write_chunk(pcm_chunk))
+                    # always send the progress
+                    tg.create_task(
+                        airplay_player.active_stream.send_cli_command(
+                            f"PROGRESS={int(queue.elapsed_time)}\n"
+                        )
                     )
-                    if prev_metadata_checksum != metadata_checksum:
-                        prev_metadata_checksum = metadata_checksum
-                        self.mass.create_task(self._send_metadata(player_id))
-
-                async with asyncio.TaskGroup() as tg:
-                    # send progress metadata
-                    if queue.elapsed_time:
-                        for atv_player in self._get_sync_clients(player_id):
-                            tg.create_task(
-                                atv_player.send_cli_command(f"PROGRESS={int(queue.elapsed_time)}\n")
-                            )
-                    # send audio chunk to player(s)
-                    available_clients = 0
-                    for atv_player in self._get_sync_clients(player_id):
-                        if not atv_player.cliraop_proc or atv_player.cliraop_proc.closed:
-                            # this may not happen, but just in case
-                            continue
-                        available_clients += 1
-                        tg.create_task(atv_player.cliraop_proc.write(pcm_chunk))
-                    if not available_clients:
-                        return
-        finally:
-            self.logger.debug("Streaming ended for player %s", player.display_name)
-            for atv_player in self._get_sync_clients(player_id):
-                if atv_player.cliraop_proc and not atv_player.cliraop_proc.closed:
-                    atv_player.cliraop_proc.write_eof()
-
-    async def cmd_power(self, player_id: str, powered: bool) -> None:
-        """Send POWER command to given player.
+            if not available_clients:
+                # this streamjob is no longer active
+                return
 
-        - player_id: player_id of the player to handle the command.
-        - powered: bool if player should be powered on or off.
-        """
-        atv_player = self._atv_players[player_id]
-        mass_player = self.mass.players.get(player_id)
-        if powered:
-            await atv_player.connect()
-        elif not powered:
-            await self.cmd_stop(player_id)
-            await atv_player.disconnect()
-        mass_player.powered = powered
-        self.mass.players.update(player_id)
+            # send metadata to player(s) if needed
+            # NOTE: this must all be done in separate tasks to not disturb audio
+            if queue and queue.current_item and queue.current_item.streamdetails:
+                metadata_checksum = (
+                    queue.current_item.streamdetails.stream_title
+                    or queue.current_item.queue_item_id
+                )
+                if prev_metadata_checksum != metadata_checksum:
+                    prev_metadata_checksum = metadata_checksum
+                    self.mass.create_task(self._send_metadata(player_id, queue))
+
+        # end of stream reached - write eof
+        for airplay_player in self._get_sync_clients(player_id):
+            if (
+                not airplay_player.active_stream
+                or not airplay_player.active_stream.running
+                or airplay_player.active_stream.start_ntp != start_ntp
+            ):
+                # this may not happen, but guard just in case
+                continue
+            await airplay_player.active_stream.write_eof()
 
     async def cmd_volume_set(self, player_id: str, volume_level: int) -> None:
         """Send VOLUME_SET command to given player.
@@ -770,12 +623,14 @@ class AirplayProvider(PlayerProvider):
         - player_id: player_id of the player to handle the command.
         - volume_level: volume level (0..100) to set on the player.
         """
-        atv_player = self._atv_players[player_id]
-        if atv_player.cliraop_proc:
-            # prefer interactive command to our streamer
-            await atv_player.send_cli_command(f"VOLUME={volume_level}\n")
-        if atv := atv_player.atv:
-            await atv.audio.set_volume(volume_level)
+        airplay_player = self._players[player_id]
+        if airplay_player.active_stream:
+            await airplay_player.active_stream.send_cli_command(f"VOLUME={volume_level}\n")
+        mass_player = self.mass.players.get(player_id)
+        mass_player.volume_level = volume_level
+        self.mass.players.update(player_id)
+        # store last state in cache
+        await self.mass.cache.set(f"{CACHE_KEY_PREV_VOLUME}.{player_id}", volume_level)
 
     async def cmd_sync(self, player_id: str, target_player: str) -> None:
         """Handle SYNC command for given player.
@@ -793,7 +648,7 @@ class AirplayProvider(PlayerProvider):
         group_leader.group_childs.add(player_id)
         self.mass.players.update(target_player)
         if group_leader.powered:
-            await self.cmd_power(player_id, True)
+            await self.mass.players.cmd_power(player_id, True)
         active_queue = self.mass.player_queues.get_active_queue(group_leader.player_id)
         if active_queue.state == PlayerState.PLAYING:
             self.mass.create_task(self.mass.player_queues.resume(active_queue.queue_id))
@@ -814,67 +669,11 @@ class AirplayProvider(PlayerProvider):
         await self.cmd_stop(player_id)
         self.mass.players.update(player_id)
 
-    async def _run_discovery(self) -> None:
-        """Discover Airplay players on the network."""
-        if self._discovery_running:
-            return
-        try:
-            self._discovery_running = True
-            self.logger.debug("Airplay discovery started...")
-            discovered_devices = await scan(self.mass.loop, protocol=Protocol.RAOP, timeout=30)
-
-            if not discovered_devices:
-                self.logger.debug("No devices found")
-                return
-
-            for dev in discovered_devices:
-                self.mass.create_task(self._player_discovered(dev))
-
-        finally:
-            self._discovery_running = False
-
-        def reschedule():
-            self.mass.create_task(self._run_discovery())
-
-        # reschedule self once finished
-        self.mass.loop.call_later(300, reschedule)
-
-    async def _player_discovered(self, discovery_info: interface.BaseConfig) -> None:
-        """Handle discovered Airplay player on mdns."""
-        player_id = f"ap{discovery_info.identifier.lower().replace(':', '')}"
-        if player_id in self._atv_players:
-            atv_player = self._atv_players[player_id]
-            if discovery_info.address != atv_player.discovery_info.address:
-                atv_player.address_updated(discovery_info.address)
-            return
-        if "_raop._tcp.local" not in discovery_info.properties:
-            # skip players without raop
-            return
-        self.logger.debug(
-            "Discovered Airplay device %s on %s", discovery_info.name, discovery_info.address
-        )
-        self._atv_players[player_id] = atv_player = AirPlayPlayer(
-            self.mass, player_id, discovery_info
-        )
-        atv_player._setup_device()
-        for player in self.players:
-            player.can_sync_with = tuple(x for x in self._atv_players if x != player.player_id)
-            self.mass.players.update(player.player_id)
-
-    def _is_feature_available(
-        self, atv_player: interface.AppleTV, feature: interface.FeatureName
-    ) -> bool:
-        """Return if a feature is available."""
-        mass_player = self.mass.players.get(atv_player.device_info.output_device_id)
-        if atv_player and mass_player.state == PlayerState.PLAYING:
-            return atv_player.features.in_state(interface.FeatureState.Available, feature)
-        return False
-
-    async def get_cliraop_binary(self):
+    async def _getcliraop_binary(self):
         """Find the correct raop/airplay binary belonging to the platform."""
         # ruff: noqa: SIM102
-        if self._cliraop_bin is not None:
-            return self._cliraop_bin
+        if self.cliraop_bin is not None:
+            return self.cliraop_bin
 
         async def check_binary(cliraop_path: str) -> str | None:
             try:
@@ -886,7 +685,7 @@ class AirplayProvider(PlayerProvider):
                 stdout, _ = await cliraop.communicate()
                 stdout = stdout.strip().decode()
                 if cliraop.returncode == 0 and stdout == "cliraop check":
-                    self._cliraop_bin = cliraop_path
+                    self.cliraop_bin = cliraop_path
                     return cliraop_path
             except OSError:
                 return None
@@ -911,112 +710,162 @@ class AirplayProvider(PlayerProvider):
         group_child_ids = {player_id}
         group_child_ids.update(mass_player.group_childs)
         for child_id in group_child_ids:
-            if client := self._atv_players.get(child_id):
+            if client := self._players.get(child_id):
                 sync_clients.append(client)
         return sync_clients
 
-    async def _init_cliraop(self, atv_player: AirPlayPlayer, ntp: int) -> None:  # noqa: PLR0915
-        """Initiatlize CLIRaop process for a player."""
-        stream: RaopStream | None = next(
-            (x for x in atv_player.atv.stream.instances if isinstance(x, RaopStream)), None
+    async def _setup_player(
+        self, player_id: str, display_name: str, info: AsyncServiceInfo
+    ) -> None:
+        """Handle setup of a new player that is discovered using mdns."""
+        address = info.parsed_addresses()[0]
+        # some guards if our info is valid/complete
+        if address == "127.0.0.1":
+            return
+        if "md" not in info.decoded_properties:
+            return
+        if "et" not in info.decoded_properties:
+            return
+        self.logger.debug("Discovered Airplay device %s on %s", display_name, address)
+        self._players[player_id] = AirPlayPlayer(
+            player_id, discovery_info=info, logger=self.logger.getChild(player_id)
         )
-        if stream is None:
-            raise RuntimeError("RAOP Not available")
-
-        async def log_watcher(cliraop_proc: AsyncProcess) -> None:
-            """Monitor stderr for a running CLIRaop process."""
-            mass_player = self.mass.players.get(atv_player.player_id)
-            logger = self.logger.getChild(atv_player.player_id)
-            async for line in cliraop_proc._proc.stderr:
-                line = line.decode().strip()  # noqa: PLW2901
-                if not line:
-                    continue
-                if "set pause" in line:
-                    atv_player.optimistic_state = PlayerState.PAUSED
-                    atv_player.update_attributes()
-                    logger.info(line)
-                elif "Restarted at" in line:
-                    atv_player.optimistic_state = PlayerState.PLAYING
-                    atv_player.update_attributes()
-                    logger.info(line)
-                elif "after start), played" in line:
-                    millis = int(line.split("played ")[1].split(" ")[0])
-                    mass_player.elapsed_time = millis / 1000
-                    mass_player.elapsed_time_last_updated = time.time()
-                else:
-                    logger.debug(line)
-            # if we reach this point, the process exited
-            if cliraop_proc._proc.returncode is not None:
-                cliraop_proc.closed = True
-            atv_player.optimistic_state = PlayerState.IDLE
-            atv_player.update_attributes()
-            logger.debug(
-                "CLIRaop process stopped with errorcode %s",
-                cliraop_proc._proc.returncode,
+        manufacturer, model = get_model_from_am(info.decoded_properties.get("am"))
+        if "apple tv" in model.lower():
+            # For now, we ignore the Apple TV until we implement the authentication.
+            # maybe we can simply use pyatv only for this part?
+            # the cliraop application has already been prepared to accept the secret.
+            self.logger.debug(
+                "Ignoring %s in discovery due to authentication requirement.", display_name
             )
-
-        extra_args = []
-        latency = self.mass.config.get_raw_player_config_value(
-            atv_player.player_id, CONF_LATENCY, 1200
-        )
-        extra_args += ["-l", str(latency)]
-        if self.mass.config.get_raw_player_config_value(
-            atv_player.player_id, CONF_ENCRYPTION, False
-        ):
-            extra_args += ["-u"]
-        if self.mass.config.get_raw_player_config_value(
-            atv_player.player_id, CONF_ALAC_ENCODE, True
-        ):
-            extra_args += ["-a"]
-        if self.mass.config.get_raw_player_config_value(
-            atv_player.player_id, CONF_VOLUME_START, False
-        ):
-            extra_args += ["-v", str(int(atv_player.atv.audio.volume))]
-        sync_adjust = self.mass.config.get_raw_player_config_value(
-            atv_player.player_id, CONF_SYNC_ADJUST, 0
+            return
+        if not self.mass.config.get_raw_player_config_value(player_id, "enabled", True):
+            self.logger.debug("Ignoring %s in discovery as it is disabled.", display_name)
+            return
+        if not (volume := await self.mass.cache.get(f"{CACHE_KEY_PREV_VOLUME}.{player_id}")):
+            volume = FALLBACK_VOLUME
+        mass_player = Player(
+            player_id=player_id,
+            provider=self.instance_id,
+            type=PlayerType.PLAYER,
+            name=display_name,
+            available=True,
+            powered=False,
+            device_info=DeviceInfo(
+                model=model,
+                manufacturer=manufacturer,
+                address=address,
+            ),
+            supported_features=(
+                PlayerFeature.PAUSE,
+                PlayerFeature.SYNC,
+                PlayerFeature.VOLUME_SET,
+            ),
+            max_sample_rate=44100,
+            supports_24bit=False,
+            can_sync_with=tuple(x for x in self._players if x != player_id),
+            volume_level=volume,
         )
-        if device_password := self.mass.config.get_raw_player_config_value(
-            atv_player.player_id, CONF_PASSWORD, None
-        ):
-            extra_args += ["-P", device_password]
-        if self.logger.level == logging.DEBUG:
-            extra_args += ["-d", "5"]
+        self.mass.players.register_or_update(mass_player)
+        # update can_sync_with field of all other players
+        for player in self.players:
+            if player.player_id == player_id:
+                continue
+            player.can_sync_with = tuple(x for x in self._players if x != player.player_id)
+            self.mass.players.update(player.player_id)
 
-        atv_player.optimistic_state = PlayerState.PLAYING
-        # always generate a new active remote id to prevent race conditions
-        # with the named pipe used to send commands
-        atv_player.active_remote_id = str(randint(1000, 8000))
-        args = [
-            self._cliraop_bin,
-            "-n",
-            str(ntp),
-            "-p",
-            str(stream.core.service.port),
-            "-w",
-            str(2000 + sync_adjust),
-            *extra_args,
-            "-dacp",
-            self.dacp_id,
-            "-ar",
-            atv_player.active_remote_id,
-            "-md",
-            atv_player.discovery_info.properties["_raop._tcp.local"]["md"],
-            "-et",
-            atv_player.discovery_info.properties["_raop._tcp.local"]["et"],
-            str(atv_player.discovery_info.address),
-            "-",
-        ]
-        if platform.system() == "Darwin":
-            os.environ["DYLD_LIBRARY_PATH"] = "/usr/local/lib"
-        atv_player.cliraop_proc = AsyncProcess(
-            args, enable_stdin=True, enable_stdout=False, enable_stderr=True
-        )
-        await atv_player.cliraop_proc.start()
-        atv_player.cliraop_proc.attach_task(log_watcher(atv_player.cliraop_proc))
+    async def _handle_dacp_request(  # noqa: PLR0915
+        self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter
+    ) -> None:
+        """Handle new connection on the socket."""
+        try:
+            raw_request = b""
+            while recv := await reader.read(1024):
+                raw_request += recv
+                if len(recv) < 1024:
+                    break
+
+            request = raw_request.decode("UTF-8")
+            headers_raw, body = request.split("\r\n\r\n", 1)
+            headers_raw = headers_raw.split("\r\n")
+            headers = {}
+            for line in headers_raw[1:]:
+                x, y = line.split(":", 1)
+                headers[x.strip()] = y.strip()
+            active_remote = headers.get("Active-Remote")
+            _, path, _ = headers_raw[0].split(" ")
+            airplay_player = next(
+                (
+                    x
+                    for x in self._players.values()
+                    if x.active_stream and x.active_stream.active_remote_id == active_remote
+                ),
+                None,
+            )
+            self.logger.debug(
+                "DACP request for %s (%s): %s -- %s",
+                airplay_player.discovery_info.name if airplay_player else "UNKNOWN PLAYER",
+                active_remote,
+                path,
+                body,
+            )
+            if not airplay_player:
+                return
+
+            player_id = airplay_player.player_id
+            mass_player = self.mass.players.get(player_id)
+            if path == "/ctrl-int/1/nextitem":
+                self.mass.create_task(self.mass.player_queues.next(player_id))
+            elif path == "/ctrl-int/1/previtem":
+                self.mass.create_task(self.mass.player_queues.previous(player_id))
+            elif path == "/ctrl-int/1/play":
+                self.mass.create_task(self.mass.player_queues.play(player_id))
+            elif path == "/ctrl-int/1/playpause":
+                self.mass.create_task(self.mass.player_queues.play_pause(player_id))
+            elif path == "/ctrl-int/1/stop":
+                self.mass.create_task(self.cmd_stop(player_id))
+            elif path == "/ctrl-int/1/volumeup":
+                self.mass.create_task(self.mass.players.cmd_volume_up(player_id))
+            elif path == "/ctrl-int/1/volumedown":
+                self.mass.create_task(self.mass.players.cmd_volume_down(player_id))
+            elif path == "/ctrl-int/1/shuffle_songs":
+                queue = self.mass.player_queues.get(player_id)
+                self.mass.create_task(
+                    self.mass.player_queues.set_shuffle(player_id, not queue.shuffle_enabled)
+                )
+            elif path in ("/ctrl-int/1/pause", "/ctrl-int/1/discrete-pause"):
+                self.mass.create_task(self.mass.player_queues.pause(player_id))
+            elif "dmcp.device-volume=" in path:
+                raop_volume = float(path.split("dmcp.device-volume=", 1)[-1])
+                volume = convert_airplay_volume(raop_volume)
+                if abs(volume - mass_player.volume_level) > 2:
+                    self.mass.create_task(self.cmd_volume_set(player_id, volume))
+            elif "dmcp.volume=" in path:
+                volume = int(path.split("dmcp.volume=", 1)[-1])
+                if abs(volume - mass_player.volume_level) > 2:
+                    self.mass.create_task(self.cmd_volume_set(player_id, volume))
+            else:
+                self.logger.debug(
+                    "Unknown DACP request for %s: %s",
+                    airplay_player.discovery_info.name,
+                    path,
+                )
+
+            # send response
+            date_str = utc().strftime("%a, %-d %b %Y %H:%M:%S")
+            response = (
+                f"HTTP/1.0 204 No Content\r\nDate: {date_str} "
+                "GMT\r\nDAAP-Server: iTunes/7.6.2 (Windows; N;)\r\nContent-Type: "
+                "application/x-dmap-tagged\r\nContent-Length: 0\r\n"
+                "Connection: close\r\n\r\n"
+            )
+            writer.write(response.encode())
+            await writer.drain()
+        finally:
+            writer.close()
 
-    async def _send_metadata(self, player_id: str) -> None:
+    async def _send_metadata(self, player_id: str, queue: PlayerQueue) -> None:
         """Send metadata to player (and connected sync childs)."""
-        queue = self.mass.player_queues.get_active_queue(player_id)
         if not queue or not queue.current_item:
             return
         duration = min(queue.current_item.duration or 0, 3600)
@@ -1041,15 +890,20 @@ class AirplayProvider(PlayerProvider):
         cmd = f"TITLE={title or 'Music Assistant'}\nARTIST={artist}\nALBUM={album}\n"
         cmd += f"DURATION={duration}\nACTION=SENDMETA\n"
 
-        for atv_player in self._get_sync_clients(player_id):
-            await atv_player.send_cli_command(cmd)
+        for airplay_player in self._get_sync_clients(player_id):
+            if not airplay_player.active_stream:
+                continue
+            await airplay_player.active_stream.send_cli_command(cmd)
 
         # get image
         if not queue.current_item.image:
             return
 
+        # the image format needs to be 500x500 jpeg for maximum compatibility with players
         image_url = self.mass.metadata.get_image_url(
-            queue.current_item.image, size=512, prefer_proxy=True
+            queue.current_item.image, size=500, prefer_proxy=True, image_format="jpeg"
         )
-        for atv_player in self._get_sync_clients(player_id):
-            await atv_player.send_cli_command(f"ARTWORK={image_url}\n")
+        for airplay_player in self._get_sync_clients(player_id):
+            if not airplay_player.active_stream:
+                continue
+            await airplay_player.active_stream.send_cli_command(f"ARTWORK={image_url}\n")
index e2713b51280fa65c8e37be96df52c58257432679..f80b3debab8da3802857958238b0dfe593126c91 100755 (executable)
Binary files a/music_assistant/server/providers/airplay/bin/cliraop-linux-aarch64 and b/music_assistant/server/providers/airplay/bin/cliraop-linux-aarch64 differ
index cf1ff1d9a2a7337a2167a34d21faf92d09b2a1c6..de944030f1ac5889bdd1f84c538720a78904ba7a 100755 (executable)
Binary files a/music_assistant/server/providers/airplay/bin/cliraop-linux-x86_64 and b/music_assistant/server/providers/airplay/bin/cliraop-linux-x86_64 differ
index b955a34d1a47c2a6c2f5a6603899d1e74e42f3dc..f66534a5321e56f9bf702a58c155140854172b67 100755 (executable)
Binary files a/music_assistant/server/providers/airplay/bin/cliraop-macos-arm64 and b/music_assistant/server/providers/airplay/bin/cliraop-macos-arm64 differ
index 9e5fb135160fdb94b7e7995a9c9ff762cfbcf114..b2c8f18381d0cf074e48bf21d511452aaa781979 100644 (file)
@@ -3,11 +3,16 @@
   "domain": "airplay",
   "name": "Airplay",
   "description": "Support for players that support the Airplay protocol.",
-  "codeowners": ["@music-assistant"],
+  "codeowners": [
+    "@music-assistant"
+  ],
   "requirements": [],
   "documentation": "https://music-assistant.github.io/player-support/airplay/",
   "multi_instance": false,
   "builtin": false,
   "load_by_default": true,
-  "icon": "cast-variant"
+  "icon": "cast-variant",
+  "mdns_discovery": [
+    "_raop._tcp.local."
+  ]
 }
index 2540b8d79fd472ac0da3b9dbd5591981bfe22e42..09dbdb6cf41a711296ebbc0762f8bc6bcdffcffc 100644 (file)
@@ -3,11 +3,18 @@
   "domain": "chromecast",
   "name": "Chromecast",
   "description": "Support for Chromecast based players.",
-  "codeowners": ["@music-assistant"],
-  "requirements": ["PyChromecast==13.1.0"],
+  "codeowners": [
+    "@music-assistant"
+  ],
+  "requirements": [
+    "PyChromecast==13.1.0"
+  ],
   "documentation": "https://music-assistant.github.io/player-support/google-cast/",
   "multi_instance": false,
   "builtin": false,
   "load_by_default": true,
-  "icon": "cast"
+  "icon": "cast",
+  "mdns_discovery": [
+    "_googlecast._tcp.local."
+  ]
 }
index 48300dad9b42d7ddc8d37f097dfa52c057910590..7be62ed32a7d1a7a61147cec0a3842d9642a402a 100644 (file)
@@ -78,7 +78,6 @@ class MusicAssistant:
     loop: asyncio.AbstractEventLoop
     http_session: ClientSession
     aiozc: AsyncZeroconf
-    aiobrowser: AsyncServiceBrowser
     config: ConfigController
     webserver: WebserverController
     cache: CacheController
@@ -87,6 +86,7 @@ class MusicAssistant:
     players: PlayerController
     player_queues: PlayerQueuesController
     streams: StreamsController
+    _aiobrowser: AsyncServiceBrowser
 
     def __init__(self, storage_path: str) -> None:
         """Initialize the MusicAssistant Server."""
@@ -109,11 +109,6 @@ class MusicAssistant:
         # create shared zeroconf instance
         # TODO: enumerate interfaces and enable IPv6 support
         self.aiozc = AsyncZeroconf(ip_version=IPVersion.V4Only)
-        # self.aiobrowser = AsyncServiceBrowser(
-        #     self.aiozc.zeroconf,
-        #     [],
-        #     handlers=[self._on_mdns_service_state_change],
-        # )
         # create shared aiohttp ClientSession
         self.http_session = ClientSession(
             loop=self.loop,
@@ -154,10 +149,10 @@ class MusicAssistant:
         await self.streams.setup(await self.config.get_core_config("streams"))
         # register all api commands (methods with decorator)
         self._register_api_commands()
-        # setup discovery
-        self.create_task(self._setup_discovery())
         # load providers
         await self._load_providers()
+        # setup discovery
+        self.create_task(self._setup_discovery())
 
     async def stop(self) -> None:
         """Stop running the music assistant server."""
@@ -454,6 +449,10 @@ class MusicAssistant:
     async def unload_provider(self, instance_id: str) -> None:
         """Unload a provider."""
         if provider := self._providers.get(instance_id):
+            # remove mdns discovery if needed
+            if provider.manifest.mdns_discovery:
+                for mdns_type in provider.manifest.mdns_discovery:
+                    self.aiobrowser.types.discard(mdns_type)
             # make sure to stop any running sync tasks first
             for sync_task in self.music.in_progress_syncs:
                 if sync_task.provider_instance == instance_id:
@@ -573,10 +572,20 @@ class MusicAssistant:
                 tg.create_task(load_provider_manifest(dir_str, dir_path))
 
     async def _setup_discovery(self) -> None:
-        """Make this Music Assistant instance discoverable on the network."""
+        """Handle setup of MDNS discovery."""
+        # create a global mdns browser
+        all_types: set[str] = set()
+        for prov_manifest in self._provider_manifests.values():
+            if prov_manifest.mdns_discovery:
+                all_types.update(prov_manifest.mdns_discovery)
+        self._aiobrowser = AsyncServiceBrowser(
+            self.aiozc.zeroconf,
+            list(all_types),
+            handlers=[self._on_mdns_service_state_change],
+        )
+        # register MA itself on mdns to be discovered
         zeroconf_type = "_mass._tcp.local."
         server_id = self.server_id
-        # register MA on mdns to be discovered
         LOGGER.debug("Starting Zeroconf broadcast...")
         info = AsyncServiceInfo(
             zeroconf_type,
@@ -607,6 +616,21 @@ class MusicAssistant:
     ) -> None:
         """Handle MDNS service state callback."""
 
+        async def process_mdns_state_change(prov: ProviderInstanceType):
+            if state_change == ServiceStateChange.Removed:
+                info = None
+            else:
+                info = AsyncServiceInfo(service_type, name)
+                await info.async_request(zeroconf, 3000)
+            await prov.on_mdns_service_state_change(name, state_change, info)
+
+        LOGGER.debug(f"Service {name} of type {service_type} state changed: {state_change}")
+        for prov in self._providers.values():
+            if not prov.manifest.mdns_discovery:
+                continue
+            if service_type in prov.manifest.mdns_discovery:
+                self.create_task(process_mdns_state_change(prov))
+
     async def __aenter__(self) -> Self:
         """Return Context manager."""
         await self.start()