Bugfixes for Airplay and HLS streams (#1731)
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Sun, 20 Oct 2024 19:50:13 +0000 (21:50 +0200)
committerGitHub <noreply@github.com>
Sun, 20 Oct 2024 19:50:13 +0000 (21:50 +0200)
13 files changed:
music_assistant/server/controllers/config.py
music_assistant/server/controllers/player_queues.py
music_assistant/server/controllers/players.py
music_assistant/server/controllers/streams.py
music_assistant/server/helpers/audio.py
music_assistant/server/helpers/ffmpeg.py
music_assistant/server/providers/airplay/__init__.py
music_assistant/server/providers/airplay/const.py [new file with mode: 0644]
music_assistant/server/providers/airplay/helpers.py [new file with mode: 0644]
music_assistant/server/providers/airplay/player.py [new file with mode: 0644]
music_assistant/server/providers/airplay/provider.py [new file with mode: 0644]
music_assistant/server/providers/airplay/raop.py [new file with mode: 0644]
music_assistant/server/providers/sonos/__init__.py

index 7bfa26cafc879bfc3a1560955526b3cd7ec84b86..d77462a6c3dacb1481e12b66e94b7d57b05c745d 100644 (file)
@@ -394,6 +394,7 @@ class ConfigController:
             object_id=config.player_id,
             data=config,
         )
+        self.mass.players.update(config.player_id, force_update=True)
         # return full player config (just in case)
         return await self.get_player_config(player_id)
 
index 9a39d27615a5e61fc4ee66f4e2691a7412f6f75a..0b08a31cce11eb29676c1c5069e2076bfadf19bd 100644 (file)
@@ -309,6 +309,7 @@ class PlayerQueuesController(CoreController):
             self.mass.create_task(self.resume(queue_id))
         else:
             task_id = f"enqueue_next_{queue_id}"
+            self.logger.info("Repeat mode detected, enqueue next item")
             self.mass.call_later(2, self._enqueue_next, queue, queue.current_index, task_id=task_id)
 
     @api_command("player_queues/play_media")
index ecac01266d85815a7528aec107d53cd93de4ba89..c0c2861267aef45d48ca7993ca0b175c47c01128 100644 (file)
@@ -275,7 +275,7 @@ class PlayerController(CoreController):
         if PlayerFeature.SEEK not in player.supported_features:
             msg = f"Player {player.display_name} does not support seeking"
             raise UnsupportedFeaturedException(msg)
-        player_prov = self.mass.players.get_player_provider(player.player_id)
+        player_prov = self.get_player_provider(player.player_id)
         await player_prov.cmd_seek(player.player_id, position)
 
     @api_command("players/cmd/next")
@@ -290,7 +290,7 @@ class PlayerController(CoreController):
         if PlayerFeature.NEXT_PREVIOUS not in player.supported_features:
             msg = f"Player {player.display_name} does not support skipping to the next track."
             raise UnsupportedFeaturedException(msg)
-        player_prov = self.mass.players.get_player_provider(player.player_id)
+        player_prov = self.get_player_provider(player.player_id)
         await player_prov.cmd_next(player.player_id)
 
     @api_command("players/cmd/previous")
@@ -305,7 +305,7 @@ class PlayerController(CoreController):
         if PlayerFeature.NEXT_PREVIOUS not in player.supported_features:
             msg = f"Player {player.display_name} does not support skipping to the previous track."
             raise UnsupportedFeaturedException(msg)
-        player_prov = self.mass.players.get_player_provider(player.player_id)
+        player_prov = self.get_player_provider(player.player_id)
         await player_prov.cmd_previous(player.player_id)
 
     @api_command("players/cmd/power")
@@ -565,7 +565,7 @@ class PlayerController(CoreController):
         # power on the player if needed
         if not player.powered:
             await self.cmd_power(player.player_id, True)
-        player_prov = self.mass.players.get_player_provider(player.player_id)
+        player_prov = self.get_player_provider(player.player_id)
         await player_prov.play_media(
             player_id=player.player_id,
             media=media,
@@ -573,7 +573,7 @@ class PlayerController(CoreController):
 
     async def enqueue_next_media(self, player_id: str, media: PlayerMedia) -> None:
         """Handle enqueuing of a next media item on the player."""
-        player_prov = self.mass.players.get_player_provider(player_id)
+        player_prov = self.get_player_provider(player_id)
         async with self._player_throttlers[player_id]:
             await player_prov.enqueue_next_media(player_id=player_id, media=media)
 
@@ -1058,7 +1058,7 @@ class PlayerController(CoreController):
         if player_provider := self.mass.get_provider(config.provider):
             with suppress(PlayerUnavailableError):
                 await player_provider.on_player_config_change(config, changed_keys)
-        if not (player := self.mass.players.get(config.player_id)):
+        if not (player := self.get(config.player_id)):
             return
         if player_disabled:
             # edge case: ensure that the player is powered off if the player gets disabled
@@ -1070,14 +1070,13 @@ class PlayerController(CoreController):
         # check for group memberships that need to be updated
         if player_disabled and player.active_group and player_provider:
             # try to remove from the group
-            group_player = self.mass.players.get(player.active_group)
+            group_player = self.get(player.active_group)
             with suppress(UnsupportedFeaturedException, PlayerCommandFailed):
                 await player_provider.set_members(
                     player.active_group,
                     [x for x in group_player.group_childs if x != player.player_id],
                 )
         player.enabled = config.enabled
-        self.mass.players.update(config.player_id, force_update=True)
 
     async def _play_announcement(
         self,
index ccdd23247af3781e315cc830b0ea448babc597f1..e57940cb360544d7267e5cfc500f2aa39152ccc6 100644 (file)
@@ -58,7 +58,6 @@ from music_assistant.server.helpers.audio import (
     check_audio_support,
     crossfade_pcm_parts,
     get_chunksize,
-    get_hls_radio_stream,
     get_hls_substream,
     get_icy_radio_stream,
     get_media_stream,
@@ -836,21 +835,21 @@ class StreamsController(CoreController):
             )
         elif streamdetails.stream_type == StreamType.ICY:
             audio_source = get_icy_radio_stream(self.mass, streamdetails.path, streamdetails)
-        elif streamdetails.stream_type == StreamType.HLS:
+        elif streamdetails.stream_type in (StreamType.HLS, StreamType.ENCRYPTED_HLS):
+            substream = await get_hls_substream(self.mass, streamdetails.path)
+            audio_source = substream.path
             if streamdetails.media_type == MediaType.RADIO:
                 # Especially the BBC streams struggle when they're played directly
-                # with ffmpeg, so we use our own HLS stream parser/logic
-                audio_source = get_hls_radio_stream(self.mass, streamdetails.path, streamdetails)
-            else:
-                # normal tracks we just let ffmpeg deal with it
-                substream = await get_hls_substream(self.mass, streamdetails.path)
-                audio_source = substream.path
-        elif streamdetails.stream_type == StreamType.ENCRYPTED_HTTP:
-            audio_source = streamdetails.path
-            extra_input_args += ["-decryption_key", streamdetails.decryption_key]
+                # with ffmpeg, where they just stop after some minutes,
+                # so we tell ffmpeg to loop around in this case.
+                extra_input_args += ["-stream_loop", "-1", "-re"]
         else:
             audio_source = streamdetails.path
 
+        # add support for decryption key provided in streamdetails
+        if streamdetails.decryption_key:
+            extra_input_args += ["-decryption_key", streamdetails.decryption_key]
+
         # handle seek support
         if (
             streamdetails.seek_position
index 23fa5876f3cc09dc729b32104d078a4ab89bf477..13f9964782ba884659070d12ed7d3332e3585329 100644 (file)
@@ -570,51 +570,9 @@ async def get_icy_radio_stream(
                 streamdetails.stream_title = cleaned_stream_title
 
 
-async def get_hls_radio_stream(
-    mass: MusicAssistant,
-    url: str,
-    streamdetails: StreamDetails,
-) -> AsyncGenerator[bytes, None]:
-    """Get radio audio stream from HTTP HLS playlist."""
-    logger = LOGGER.getChild("hls_stream")
-    logger.debug("Start streaming HLS stream for url %s", url)
-    # we simply select the best quality substream here
-    # if we ever want to support adaptive stream selection based on bandwidth
-    # we need to move the substream selection into the loop below and make it
-    # bandwidth aware. For now we just assume domestic high bandwidth where
-    # the user wants the best quality possible at all times.
-    playlist_item = await get_hls_substream(mass, url)
-    substream_url = playlist_item.path
-    loops = 50 if streamdetails.media_type != MediaType.RADIO else 1
-    while loops:
-        logger.log(VERBOSE_LOG_LEVEL, "start streaming chunks from substream %s", substream_url)
-        # We simply let ffmpeg deal with parsing the HLS playlist and stichting chunks together.
-        # However we do not feed the playlist URL to ffmpeg directly to give us the possibility
-        # to monitor the stream title and other metadata for radio streams in the future.
-        # Also, we've seen cases where ffmpeg sometimes chokes in a stream and aborts, which is not
-        # very useful for radio streams which you want to simply go on forever, so we need to loop
-        # and restart ffmpeg in case of an error.
-        input_format = AudioFormat(content_type=ContentType.UNKNOWN)
-        audio_format_detected = False
-        async for chunk in get_ffmpeg_stream(
-            audio_input=substream_url,
-            input_format=input_format,
-            output_format=AudioFormat(content_type=ContentType.WAV),
-        ):
-            yield chunk
-            if audio_format_detected:
-                continue
-            if input_format.content_type not in (ContentType.UNKNOWN, ContentType.WAV):
-                # we need to determine the audio format from the first chunk
-                streamdetails.audio_format = input_format
-                audio_format_detected = True
-        loops -= 1
-
-
 async def get_hls_substream(
     mass: MusicAssistant,
     url: str,
-    allow_encrypted: bool = False,
 ) -> PlaylistItem:
     """Select the (highest quality) HLS substream for given HLS playlist/URL."""
     timeout = ClientTimeout(total=0, connect=30, sock_read=5 * 60)
@@ -627,9 +585,6 @@ async def get_hls_substream(
         raw_data = await resp.read()
         encoding = resp.charset or await detect_charset(raw_data)
         master_m3u_data = raw_data.decode(encoding)
-    if not allow_encrypted and "EXT-X-KEY:METHOD=" in master_m3u_data:
-        # for now we do not (yet) support encrypted HLS streams
-        raise InvalidDataError("HLS stream is encrypted, not supported")
     substreams = parse_m3u(master_m3u_data)
     # There is a chance that we did not get a master playlist with subplaylists
     # but just a single master/sub playlist with the actual audio stream(s)
index 3e390f658259185188307cb47b90ae6a4cfa9201..d283b24dfdd643578f94a754b1fd0b0d2029b678 100644 (file)
@@ -218,16 +218,24 @@ def get_ffmpeg_args(
     if input_path.startswith("http"):
         # append reconnect options for direct stream from http
         input_args += [
-            "-reconnect",
+            # Reconnect automatically when disconnected before EOF is hit.
+            "reconnect",
             "1",
-            "-reconnect_streamed",
+            # Set the maximum delay in seconds after which to give up reconnecting.
+            "-reconnect_delay_max",
+            "30",
+            # If set then even streamed/non seekable streams will be reconnected on errors.
+            "reconnect_streamed",
             "1",
         ]
         if major_version > 4:
             # these options are only supported in ffmpeg > 5
             input_args += [
+                # Reconnect automatically in case of TCP/TLS errors during connect.
                 "-reconnect_on_network_error",
                 "1",
+                # A comma separated list of HTTP status codes to reconnect on.
+                # The list can include specific status codes (e.g. 503) or the strings 4xx / 5xx.
                 "-reconnect_on_http_error",
                 "5xx,4xx",
             ]
index 2ca3c8cce7ce53c1fcf5983bc8ff06a90ef98c76..1a29387ec022880848c2538e9c4ae959f8a4e5e7 100644 (file)
 
 from __future__ import annotations
 
-import asyncio
-import logging
-import os
-import platform
-import socket
-import time
-from contextlib import suppress
-from random import randint, randrange
 from typing import TYPE_CHECKING
 
-from zeroconf import IPVersion, ServiceStateChange
-from zeroconf.asyncio import AsyncServiceInfo
-
-from music_assistant.common.helpers.datetime import utc
-from music_assistant.common.helpers.util import get_ip_pton, select_free_port
 from music_assistant.common.models.config_entries import (
-    CONF_ENTRY_CROSSFADE,
-    CONF_ENTRY_CROSSFADE_DURATION,
-    CONF_ENTRY_EQ_BASS,
-    CONF_ENTRY_EQ_MID,
-    CONF_ENTRY_EQ_TREBLE,
-    CONF_ENTRY_FLOW_MODE_ENFORCED,
-    CONF_ENTRY_OUTPUT_CHANNELS,
-    CONF_ENTRY_SYNC_ADJUST,
     ConfigEntry,
     ConfigValueType,
     ProviderConfig,
-    create_sample_rates_config_entry,
-)
-from music_assistant.common.models.enums import (
-    ConfigEntryType,
-    ContentType,
-    MediaType,
-    PlayerFeature,
-    PlayerState,
-    PlayerType,
-    ProviderFeature,
 )
-from music_assistant.common.models.media_items import AudioFormat
-from music_assistant.common.models.player import DeviceInfo, Player, PlayerMedia
-from music_assistant.common.models.player_queue import PlayerQueue
+from music_assistant.common.models.enums import ConfigEntryType
 from music_assistant.common.models.provider import ProviderManifest
-from music_assistant.constants import CONF_SYNC_ADJUST, VERBOSE_LOG_LEVEL
 from music_assistant.server import MusicAssistant
-from music_assistant.server.helpers.audio import FFMpeg, get_ffmpeg_stream, get_player_filter_params
-from music_assistant.server.helpers.process import AsyncProcess, check_output
-from music_assistant.server.helpers.util import TaskManager, lock
-from music_assistant.server.models.player_provider import PlayerProvider
+
+from .const import CONF_BIND_INTERFACE
+from .provider import AirplayProvider
 
 if TYPE_CHECKING:
     from music_assistant.common.models.config_entries import ProviderConfig
     from music_assistant.common.models.provider import ProviderManifest
     from music_assistant.server import MusicAssistant
     from music_assistant.server.models import ProviderInstanceType
-    from music_assistant.server.providers.player_group import PlayerGroupProvider
-
-DOMAIN = "airplay"
-
-CONF_ENCRYPTION = "encryption"
-CONF_ALAC_ENCODE = "alac_encode"
-CONF_VOLUME_START = "volume_start"
-CONF_PASSWORD = "password"
-CONF_BIND_INTERFACE = "bind_interface"
-
-PLAYER_CONFIG_ENTRIES = (
-    CONF_ENTRY_FLOW_MODE_ENFORCED,
-    CONF_ENTRY_CROSSFADE,
-    CONF_ENTRY_CROSSFADE_DURATION,
-    CONF_ENTRY_EQ_BASS,
-    CONF_ENTRY_EQ_MID,
-    CONF_ENTRY_EQ_TREBLE,
-    CONF_ENTRY_OUTPUT_CHANNELS,
-    ConfigEntry(
-        key=CONF_ENCRYPTION,
-        type=ConfigEntryType.BOOLEAN,
-        default_value=False,
-        label="Enable encryption",
-        description="Enable encrypted communication with the player, "
-        "some (3rd party) players require this.",
-        category="airplay",
-    ),
-    ConfigEntry(
-        key=CONF_ALAC_ENCODE,
-        type=ConfigEntryType.BOOLEAN,
-        default_value=True,
-        label="Enable compression",
-        description="Save some network bandwidth by sending the audio as "
-        "(lossless) ALAC at the cost of a bit CPU.",
-        category="airplay",
-    ),
-    CONF_ENTRY_SYNC_ADJUST,
-    ConfigEntry(
-        key=CONF_PASSWORD,
-        type=ConfigEntryType.SECURE_STRING,
-        default_value=None,
-        required=False,
-        label="Device password",
-        description="Some devices require a password to connect/play.",
-        category="airplay",
-    ),
-)
-BACKOFF_TIME_LOWER_LIMIT = 15  # seconds
-BACKOFF_TIME_UPPER_LIMIT = 300  # Five minutes
-
-CONF_CREDENTIALS = "credentials"
-CACHE_KEY_PREV_VOLUME = "airplay_prev_volume"
-FALLBACK_VOLUME = 20
-
-AIRPLAY_FLOW_PCM_FORMAT = AudioFormat(
-    content_type=ContentType.PCM_F32LE,
-    sample_rate=44100,
-    bit_depth=32,
-)
-AIRPLAY_PCM_FORMAT = AudioFormat(
-    content_type=ContentType.from_bit_depth(16), sample_rate=44100, bit_depth=16
-)
-
-# airplay has fixed sample rate/bit depth so make this config entry static and hidden
-CONF_ENTRY_SAMPLE_RATES_AIRPLAY = create_sample_rates_config_entry(44100, 16, 44100, 16, True)
-
-
-# TODO: Airplay provider
-# - split up and cleanup the code into more digestable parts
-# - Implement authentication for Apple TV
-# - Implement volume control for Apple devices using pyatv
-# - Implement metadata for Apple Apple devices using pyatv
-# - Use pyatv for communicating with original Apple devices
-# and use cliraop for actual streaming
-
-
-async def setup(
-    mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
-) -> ProviderInstanceType:
-    """Initialize provider(instance) with given configuration."""
-    return AirplayProvider(mass, manifest, config)
 
 
 async def get_config_entries(
@@ -165,874 +49,8 @@ async def get_config_entries(
     )
 
 
-def convert_airplay_volume(value: float) -> int:
-    """Remap Airplay Volume to 0..100 scale."""
-    airplay_min = -30
-    airplay_max = 0
-    normal_min = 0
-    normal_max = 100
-    portion = (value - airplay_min) * (normal_max - normal_min) / (airplay_max - airplay_min)
-    return int(portion + normal_min)
-
-
-def get_model_from_am(am_property: str | None) -> tuple[str, str]:
-    """Return Manufacturer and Model name from mdns AM property."""
-    manufacturer = "Unknown"
-    model = "Generic Airplay device"
-    if not am_property:
-        return (manufacturer, model)
-    if isinstance(am_property, bytes):
-        am_property = am_property.decode("utf-8")
-    if am_property == "AudioAccessory5,1":
-        model = "HomePod"
-        manufacturer = "Apple"
-    elif "AppleTV" in am_property:
-        model = "Apple TV"
-        manufacturer = "Apple"
-    else:
-        model = am_property
-    return (manufacturer, model)
-
-
-def get_primary_ip_address(discovery_info: AsyncServiceInfo) -> str | None:
-    """Get primary IP address from zeroconf discovery info."""
-    for address in discovery_info.parsed_addresses(IPVersion.V4Only):
-        if address.startswith("127"):
-            # filter out loopback address
-            continue
-        if address.startswith("169.254"):
-            # filter out APIPA address
-            continue
-        return address
-    return None
-
-
-class RaopStream:
-    """Object that holds the details of a (RAOP) stream job."""
-
-    def __init__(
-        self, prov: AirplayProvider, airplay_player: AirPlayPlayer, input_format: AudioFormat
-    ) -> None:
-        """Initialize RaopStream."""
-        self.prov = prov
-        self.mass = prov.mass
-        self.airplay_player = airplay_player
-        self.input_format = input_format
-        # always generate a new active remote id to prevent race conditions
-        # with the named pipe used to send audio
-        self.active_remote_id: str = str(randint(1000, 8000))
-        self.prevent_playback: bool = False
-        # audio_source_task will only exist for the main player in a sync group
-        self.audio_source_task: asyncio.Task | None = None
-        self._log_reader_task: asyncio.Task | None = None
-        self._cliraop_proc: AsyncProcess | None = None
-        self._ffmpeg_proc: AsyncProcess | None = None
-        self._started = asyncio.Event()
-        self._stopped = False
-
-    @property
-    def running(self) -> bool:
-        """Return boolean if this stream is running."""
-        return not self._stopped and self._started.is_set()
-
-    async def start(self, start_ntp: int, wait_start: int = 1000) -> None:
-        """Initialize CLIRaop process for a player."""
-        extra_args = []
-        player_id = self.airplay_player.player_id
-        mass_player = self.mass.players.get(player_id)
-        bind_ip = await self.mass.config.get_provider_config_value(
-            self.prov.instance_id, CONF_BIND_INTERFACE
-        )
-        extra_args += ["-if", bind_ip]
-        if self.mass.config.get_raw_player_config_value(player_id, CONF_ENCRYPTION, False):
-            extra_args += ["-encrypt"]
-        if self.mass.config.get_raw_player_config_value(player_id, CONF_ALAC_ENCODE, True):
-            extra_args += ["-alac"]
-        for prop in ("et", "md", "am", "pk", "pw"):
-            if prop_value := self.airplay_player.discovery_info.decoded_properties.get(prop):
-                extra_args += [f"-{prop}", prop_value]
-        sync_adjust = self.mass.config.get_raw_player_config_value(player_id, CONF_SYNC_ADJUST, 0)
-        if device_password := self.mass.config.get_raw_player_config_value(
-            player_id, CONF_PASSWORD, None
-        ):
-            extra_args += ["-password", device_password]
-        if self.prov.logger.isEnabledFor(logging.DEBUG):
-            extra_args += ["-debug", "5"]
-        elif self.prov.logger.isEnabledFor(VERBOSE_LOG_LEVEL):
-            extra_args += ["-debug", "10"]
-
-        # create os pipes to pipe ffmpeg to cliraop
-        read, write = await asyncio.to_thread(os.pipe)
-
-        # ffmpeg handles the player specific stream + filters and pipes
-        # audio to the cliraop process
-        self._ffmpeg_proc = FFMpeg(
-            audio_input="-",
-            input_format=self.input_format,
-            output_format=AIRPLAY_PCM_FORMAT,
-            filter_params=get_player_filter_params(self.mass, player_id),
-            audio_output=write,
-        )
-        await self._ffmpeg_proc.start()
-        await asyncio.to_thread(os.close, write)
-
-        # cliraop is the binary that handles the actual raop streaming to the player
-        cliraop_args = [
-            self.prov.cliraop_bin,
-            "-ntpstart",
-            str(start_ntp),
-            "-port",
-            str(self.airplay_player.discovery_info.port),
-            "-wait",
-            str(wait_start - sync_adjust),
-            "-volume",
-            str(mass_player.volume_level),
-            *extra_args,
-            "-dacp",
-            self.prov.dacp_id,
-            "-activeremote",
-            self.active_remote_id,
-            "-udn",
-            self.airplay_player.discovery_info.name,
-            self.airplay_player.address,
-            "-",
-        ]
-        self._cliraop_proc = AsyncProcess(cliraop_args, stdin=read, stderr=True, name="cliraop")
-        if platform.system() == "Darwin":
-            os.environ["DYLD_LIBRARY_PATH"] = "/usr/local/lib"
-        await self._cliraop_proc.start()
-        await asyncio.to_thread(os.close, read)
-        self._started.set()
-        self._log_reader_task = self.mass.create_task(self._log_watcher())
-
-    async def stop(self):
-        """Stop playback and cleanup."""
-        if self._stopped:
-            return
-        if self.audio_source_task and not self.audio_source_task.done():
-            self.audio_source_task.cancel()
-        if self._cliraop_proc.proc and not self._cliraop_proc.closed:
-            await self.send_cli_command("ACTION=STOP")
-        self._stopped = True  # set after send_cli command!
-        if self._cliraop_proc.proc and not self._cliraop_proc.closed:
-            await self._cliraop_proc.close(True)
-        if self._ffmpeg_proc and not self._ffmpeg_proc.closed:
-            await self._ffmpeg_proc.close(True)
-        self._cliraop_proc = None
-        self._ffmpeg_proc = None
-
-    async def write_chunk(self, chunk: bytes) -> None:
-        """Write a (pcm) audio chunk."""
-        if self._stopped:
-            return
-        await self._started.wait()
-        await self._ffmpeg_proc.write(chunk)
-
-    async def write_eof(self) -> None:
-        """Write EOF."""
-        if self._stopped:
-            return
-        await self._started.wait()
-        await self._ffmpeg_proc.write_eof()
-
-    async def send_cli_command(self, command: str) -> None:
-        """Send an interactive command to the running CLIRaop binary."""
-        if self._stopped:
-            return
-        await self._started.wait()
-
-        if not command.endswith("\n"):
-            command += "\n"
-
-        def send_data():
-            with suppress(BrokenPipeError), open(named_pipe, "w") as f:
-                f.write(command)
-
-        named_pipe = f"/tmp/raop-{self.active_remote_id}"  # noqa: S108
-        self.airplay_player.logger.log(VERBOSE_LOG_LEVEL, "sending command %s", command)
-        await asyncio.to_thread(send_data)
-
-    async def _log_watcher(self) -> None:
-        """Monitor stderr for the running CLIRaop process."""
-        airplay_player = self.airplay_player
-        mass_player = self.mass.players.get(airplay_player.player_id)
-        queue = self.mass.player_queues.get_active_queue(mass_player.active_source)
-        logger = airplay_player.logger
-        lost_packets = 0
-        prev_metadata_checksum: str = ""
-        prev_progress_report: float = 0
-        async for line in self._cliraop_proc.iter_stderr():
-            if "elapsed milliseconds:" in line:
-                # this is received more or less every second while playing
-                millis = int(line.split("elapsed milliseconds: ")[1])
-                mass_player.elapsed_time = millis / 1000
-                mass_player.elapsed_time_last_updated = time.time()
-                # send metadata to player(s) if needed
-                # NOTE: this must all be done in separate tasks to not disturb audio
-                now = time.time()
-                if (
-                    mass_player.elapsed_time > 2
-                    and queue
-                    and queue.current_item
-                    and queue.current_item.streamdetails
-                ):
-                    metadata_checksum = (
-                        queue.current_item.streamdetails.stream_title
-                        or queue.current_item.queue_item_id
-                    )
-                    if prev_metadata_checksum != metadata_checksum:
-                        prev_metadata_checksum = metadata_checksum
-                        prev_progress_report = now
-                        self.mass.create_task(self._send_metadata(queue))
-                    # send the progress report every 5 seconds
-                    elif now - prev_progress_report >= 5:
-                        prev_progress_report = now
-                        self.mass.create_task(self._send_progress(queue))
-            if "set pause" in line or "Pause at" in line:
-                mass_player.state = PlayerState.PAUSED
-                self.mass.players.update(airplay_player.player_id)
-            if "Restarted at" in line or "restarting w/ pause" in line:
-                mass_player.state = PlayerState.PLAYING
-                self.mass.players.update(airplay_player.player_id)
-            if "restarting w/o pause" in line:
-                # streaming has started
-                mass_player.state = PlayerState.PLAYING
-                mass_player.elapsed_time = 0
-                mass_player.elapsed_time_last_updated = time.time()
-                self.mass.players.update(airplay_player.player_id)
-            if "lost packet out of backlog" in line:
-                lost_packets += 1
-                if lost_packets == 100:
-                    logger.error("High packet loss detected, restarting playback...")
-                    self.mass.create_task(self.mass.player_queues.resume(queue.queue_id))
-                else:
-                    logger.warning("Packet loss detected!")
-            if "end of stream reached" in line:
-                logger.debug("End of stream reached")
-                break
-
-            logger.log(VERBOSE_LOG_LEVEL, line)
-
-        # if we reach this point, the process exited
-        if airplay_player.raop_stream == self:
-            mass_player.state = PlayerState.IDLE
-            self.mass.players.update(airplay_player.player_id)
-        # ensure we're cleaned up afterwards (this also logs the returncode)
-        await self.stop()
-
-    async def _send_metadata(self, queue: PlayerQueue) -> None:
-        """Send metadata to player (and connected sync childs)."""
-        if not queue or not queue.current_item:
-            return
-        duration = min(queue.current_item.duration or 0, 3600)
-        title = queue.current_item.name
-        artist = ""
-        album = ""
-        if queue.current_item.streamdetails and queue.current_item.streamdetails.stream_title:
-            # stream title from radio station
-            stream_title = queue.current_item.streamdetails.stream_title
-            if " - " in stream_title:
-                artist, title = stream_title.split(" - ", 1)
-            else:
-                title = stream_title
-            # set album to radio station name
-            album = queue.current_item.name
-        elif media_item := queue.current_item.media_item:
-            title = media_item.name
-            if artist_str := getattr(media_item, "artist_str", None):
-                artist = artist_str
-            if _album := getattr(media_item, "album", None):
-                album = _album.name
-
-        cmd = f"TITLE={title or 'Music Assistant'}\nARTIST={artist}\nALBUM={album}\n"
-        cmd += f"DURATION={duration}\nPROGRESS=0\nACTION=SENDMETA\n"
-
-        await self.send_cli_command(cmd)
-
-        # get image
-        if not queue.current_item.image:
-            return
-
-        # the image format needs to be 500x500 jpeg for maximum compatibility with players
-        image_url = self.mass.metadata.get_image_url(
-            queue.current_item.image, size=500, prefer_proxy=True, image_format="jpeg"
-        )
-        await self.send_cli_command(f"ARTWORK={image_url}\n")
-
-    async def _send_progress(self, queue: PlayerQueue) -> None:
-        """Send progress report to player (and connected sync childs)."""
-        if not queue or not queue.current_item:
-            return
-        progress = int(queue.corrected_elapsed_time)
-        await self.send_cli_command(f"PROGRESS={progress}\n")
-
-
-class AirPlayPlayer:
-    """Holds the details of the (discovered) Airplay (RAOP) player."""
-
-    def __init__(
-        self, prov: AirplayProvider, player_id: str, discovery_info: AsyncServiceInfo, address: str
-    ) -> None:
-        """Initialize AirPlayPlayer."""
-        self.prov = prov
-        self.mass = prov.mass
-        self.player_id = player_id
-        self.discovery_info = discovery_info
-        self.address = address
-        self.logger = prov.logger.getChild(player_id)
-        self.raop_stream: RaopStream | None = None
-
-    async def cmd_stop(self, update_state: bool = True) -> None:
-        """Send STOP command to player."""
-        if self.raop_stream:
-            await self.raop_stream.stop()
-        if update_state and (mass_player := self.mass.players.get(self.player_id)):
-            mass_player.state = PlayerState.IDLE
-            self.mass.players.update(mass_player.player_id)
-
-    async def cmd_play(self) -> None:
-        """Send PLAY (unpause) command to player."""
-        if self.raop_stream and self.raop_stream.running:
-            await self.raop_stream.send_cli_command("ACTION=PLAY")
-
-    async def cmd_pause(self) -> None:
-        """Send PAUSE command to player."""
-        if not self.raop_stream or not self.raop_stream.running:
-            return
-        await self.raop_stream.send_cli_command("ACTION=PAUSE")
-
-
-class AirplayProvider(PlayerProvider):
-    """Player provider for Airplay based players."""
-
-    cliraop_bin: str | None = None
-    _players: dict[str, AirPlayPlayer]
-    _dacp_server: asyncio.Server = None
-    _dacp_info: AsyncServiceInfo = None
-    _play_media_lock: asyncio.Lock = asyncio.Lock()
-
-    @property
-    def supported_features(self) -> tuple[ProviderFeature, ...]:
-        """Return the features supported by this Provider."""
-        return (ProviderFeature.SYNC_PLAYERS,)
-
-    async def handle_async_init(self) -> None:
-        """Handle async initialization of the provider."""
-        self._players = {}
-        self.cliraop_bin = await self._getcliraop_binary()
-        dacp_port = await select_free_port(39831, 49831)
-        self.dacp_id = dacp_id = f"{randrange(2 ** 64):X}"
-        self.logger.debug("Starting DACP ActiveRemote %s on port %s", dacp_id, dacp_port)
-        self._dacp_server = await asyncio.start_server(
-            self._handle_dacp_request, "0.0.0.0", dacp_port
-        )
-        zeroconf_type = "_dacp._tcp.local."
-        server_id = f"iTunes_Ctrl_{dacp_id}.{zeroconf_type}"
-        self._dacp_info = AsyncServiceInfo(
-            zeroconf_type,
-            name=server_id,
-            addresses=[await get_ip_pton(self.mass.streams.publish_ip)],
-            port=dacp_port,
-            properties={
-                "txtvers": "1",
-                "Ver": "63B5E5C0C201542E",
-                "DbId": "63B5E5C0C201542E",
-                "OSsi": "0x1F5",
-            },
-            server=f"{socket.gethostname()}.local",
-        )
-        await self.mass.aiozc.async_register_service(self._dacp_info)
-
-    async def on_mdns_service_state_change(
-        self, name: str, state_change: ServiceStateChange, info: AsyncServiceInfo | None
-    ) -> None:
-        """Handle MDNS service state callback."""
-        raw_id, display_name = name.split(".")[0].split("@", 1)
-        player_id = f"ap{raw_id.lower()}"
-        # handle removed player
-        if state_change == ServiceStateChange.Removed:
-            if mass_player := self.mass.players.get(player_id):
-                if not mass_player.available:
-                    return
-                # the player has become unavailable
-                self.logger.debug("Player offline: %s", display_name)
-                mass_player.available = False
-                self.mass.players.update(player_id)
-            return
-        # handle update for existing device
-        if airplay_player := self._players.get(player_id):
-            if mass_player := self.mass.players.get(player_id):
-                cur_address = get_primary_ip_address(info)
-                if cur_address and cur_address != airplay_player.address:
-                    airplay_player.logger.debug(
-                        "Address updated from %s to %s", airplay_player.address, cur_address
-                    )
-                    airplay_player.address = cur_address
-                    mass_player.device_info = DeviceInfo(
-                        model=mass_player.device_info.model,
-                        manufacturer=mass_player.device_info.manufacturer,
-                        address=str(cur_address),
-                    )
-                if not mass_player.available:
-                    self.logger.debug("Player back online: %s", display_name)
-                    mass_player.available = True
-            # always update the latest discovery info
-            airplay_player.discovery_info = info
-            self.mass.players.update(player_id)
-            return
-        # handle new player
-        await self._setup_player(player_id, display_name, info)
-
-    async def unload(self) -> None:
-        """Handle close/cleanup of the provider."""
-        # power off all players (will disconnect and close cliraop)
-        for player_id in self._players:
-            await self.cmd_power(player_id, False)
-        # shutdown DACP server
-        if self._dacp_server:
-            self._dacp_server.close()
-        # shutdown DACP zeroconf service
-        if self._dacp_info:
-            await self.mass.aiozc.async_unregister_service(self._dacp_info)
-
-    async def get_player_config_entries(self, player_id: str) -> tuple[ConfigEntry, ...]:
-        """Return all (provider/player specific) Config Entries for the given player (if any)."""
-        base_entries = await super().get_player_config_entries(player_id)
-        if player_id not in self._players:
-            # most probably a syncgroup
-            return (
-                *base_entries,
-                CONF_ENTRY_CROSSFADE,
-                CONF_ENTRY_CROSSFADE_DURATION,
-                CONF_ENTRY_SAMPLE_RATES_AIRPLAY,
-                CONF_ENTRY_FLOW_MODE_ENFORCED,
-            )
-        return (*base_entries, *PLAYER_CONFIG_ENTRIES, CONF_ENTRY_SAMPLE_RATES_AIRPLAY)
-
-    async def cmd_stop(self, player_id: str) -> None:
-        """Send STOP command to given player.
-
-        - player_id: player_id of the player to handle the command.
-        """
-        # forward command to player and any connected sync members
-        async with TaskManager(self.mass) as tg:
-            for airplay_player in self._get_sync_clients(player_id):
-                tg.create_task(airplay_player.cmd_stop())
-
-    async def cmd_play(self, player_id: str) -> None:
-        """Send PLAY (unpause) command to given player.
-
-        - player_id: player_id of the player to handle the command.
-        """
-        # forward command to player and any connected sync members
-        async with TaskManager(self.mass) as tg:
-            for airplay_player in self._get_sync_clients(player_id):
-                tg.create_task(airplay_player.cmd_play())
-
-    async def cmd_pause(self, player_id: str) -> None:
-        """Send PAUSE command to given player.
-
-        - player_id: player_id of the player to handle the command.
-        """
-        player = self.mass.players.get(player_id)
-        if player.synced_to:
-            # should not happen, but just in case
-            raise RuntimeError("Player is synced")
-        if player.group_childs:
-            # pause is not supported while synced, use stop instead
-            self.logger.debug("Player is synced, using STOP instead of PAUSE")
-            await self.cmd_stop(player_id)
-            return
-        airplay_player = self._players[player_id]
-        await airplay_player.cmd_pause()
-
-    @lock
-    async def play_media(
-        self,
-        player_id: str,
-        media: PlayerMedia,
-    ) -> None:
-        """Handle PLAY MEDIA on given player."""
-        async with self._play_media_lock:
-            player = self.mass.players.get(player_id)
-            # set the active source for the player to the media queue
-            # this accounts for syncgroups and linked players (e.g. sonos)
-            player.active_source = media.queue_id
-            if player.synced_to:
-                # should not happen, but just in case
-                raise RuntimeError("Player is synced")
-            # always stop existing stream first
-            async with TaskManager(self.mass) as tg:
-                for airplay_player in self._get_sync_clients(player_id):
-                    tg.create_task(airplay_player.cmd_stop(update_state=False))
-            # select audio source
-            if media.media_type == MediaType.ANNOUNCEMENT:
-                # special case: stream announcement
-                input_format = AIRPLAY_PCM_FORMAT
-                audio_source = self.mass.streams.get_announcement_stream(
-                    media.custom_data["url"],
-                    output_format=AIRPLAY_PCM_FORMAT,
-                    use_pre_announce=media.custom_data["use_pre_announce"],
-                )
-            elif media.queue_id.startswith("ugp_"):
-                # special case: UGP stream
-                ugp_provider: PlayerGroupProvider = self.mass.get_provider("player_group")
-                ugp_stream = ugp_provider.ugp_streams[media.queue_id]
-                input_format = ugp_stream.output_format
-                audio_source = ugp_stream.subscribe()
-            elif media.queue_id and media.queue_item_id:
-                # regular queue (flow) stream request
-                input_format = AIRPLAY_FLOW_PCM_FORMAT
-                audio_source = self.mass.streams.get_flow_stream(
-                    queue=self.mass.player_queues.get(media.queue_id),
-                    start_queue_item=self.mass.player_queues.get_item(
-                        media.queue_id, media.queue_item_id
-                    ),
-                    pcm_format=input_format,
-                )
-            else:
-                # assume url or some other direct path
-                # NOTE: this will fail if its an uri not playable by ffmpeg
-                input_format = AIRPLAY_PCM_FORMAT
-                audio_source = get_ffmpeg_stream(
-                    audio_input=media.uri,
-                    input_format=AudioFormat(ContentType.try_parse(media.uri)),
-                    output_format=AIRPLAY_PCM_FORMAT,
-                )
-
-            # Python is not suitable for realtime audio streaming so we do the actual streaming
-            # of (RAOP) audio using a small executable written in C based on libraop to do
-            # the actual timestamped playback, which reads pcm audio from stdin
-            # and we can send some interactive commands using a named pipe.
-
-            # setup RaopStream for player and its sync childs
-            sync_clients = self._get_sync_clients(player_id)
-            for airplay_player in sync_clients:
-                airplay_player.raop_stream = RaopStream(
-                    self, airplay_player, input_format=input_format
-                )
-
-            async def audio_streamer() -> None:
-                async for chunk in audio_source:
-                    await asyncio.gather(
-                        *[x.raop_stream.write_chunk(chunk) for x in sync_clients],
-                        return_exceptions=True,
-                    )
-                # entire stream consumed: send EOF
-                await asyncio.gather(
-                    *[x.raop_stream.write_eof() for x in sync_clients],
-                    return_exceptions=True,
-                )
-
-            # get current ntp and start cliraop
-            _, stdout = await check_output(self.cliraop_bin, "-ntp")
-            start_ntp = int(stdout.strip())
-            wait_start = 1250 + (250 * len(sync_clients))
-            await asyncio.gather(
-                *[x.raop_stream.start(start_ntp, wait_start) for x in sync_clients],
-                return_exceptions=True,
-            )
-            self._players[player_id].raop_stream.audio_source_task = asyncio.create_task(
-                audio_streamer()
-            )
-
-    async def cmd_volume_set(self, player_id: str, volume_level: int) -> None:
-        """Send VOLUME_SET command to given player.
-
-        - player_id: player_id of the player to handle the command.
-        - volume_level: volume level (0..100) to set on the player.
-        """
-        airplay_player = self._players[player_id]
-        if airplay_player.raop_stream and airplay_player.raop_stream.running:
-            await airplay_player.raop_stream.send_cli_command(f"VOLUME={volume_level}\n")
-        mass_player = self.mass.players.get(player_id)
-        mass_player.volume_level = volume_level
-        mass_player.volume_muted = volume_level == 0
-        self.mass.players.update(player_id)
-        # store last state in cache
-        await self.mass.cache.set(player_id, volume_level, base_key=CACHE_KEY_PREV_VOLUME)
-
-    @lock
-    async def cmd_sync(self, player_id: str, target_player: str) -> None:
-        """Handle SYNC command for given player.
-
-        Join/add the given player(id) to the given (master) player/sync group.
-
-            - player_id: player_id of the player to handle the command.
-            - target_player: player_id of the syncgroup master or group player.
-        """
-        if player_id == target_player:
-            return
-        child_player = self.mass.players.get(player_id)
-        assert child_player  # guard
-        parent_player = self.mass.players.get(target_player)
-        assert parent_player  # guard
-        if parent_player.synced_to:
-            raise RuntimeError("Player is already synced")
-        if child_player.synced_to and child_player.synced_to != target_player:
-            raise RuntimeError("Player is already synced to another player")
-        # always make sure that the parent player is part of the sync group
-        parent_player.group_childs.add(parent_player.player_id)
-        parent_player.group_childs.add(child_player.player_id)
-        child_player.synced_to = parent_player.player_id
-        # mark players as powered
-        parent_player.powered = True
-        child_player.powered = True
-        # check if we should (re)start or join a stream session
-        active_queue = self.mass.player_queues.get_active_queue(parent_player.player_id)
-        if active_queue.state == PlayerState.PLAYING:
-            # playback needs to be restarted to form a new multi client stream session
-            # this could potentially be called by multiple players at the exact same time
-            # so we debounce the resync a bit here with a timer
-            self.mass.call_later(
-                1,
-                self.mass.player_queues.resume,
-                active_queue.queue_id,
-                fade_in=False,
-                task_id=f"resume_{active_queue.queue_id}",
-            )
-        else:
-            # make sure that the player manager gets an update
-            self.mass.players.update(child_player.player_id, skip_forward=True)
-            self.mass.players.update(parent_player.player_id, skip_forward=True)
-
-    @lock
-    async def cmd_unsync(self, player_id: str) -> None:
-        """Handle UNSYNC command for given player.
-
-        Remove the given player from any syncgroups it currently is synced to.
-
-            - player_id: player_id of the player to handle the command.
-        """
-        player = self.mass.players.get(player_id, raise_unavailable=True)
-        if player.synced_to:
-            group_leader = self.mass.players.get(player.synced_to, raise_unavailable=True)
-            if player_id in group_leader.group_childs:
-                group_leader.group_childs.remove(player_id)
-            player.synced_to = None
-            airplay_player = self._players.get(player_id)
-            await airplay_player.cmd_stop()
-            # make sure that the player manager gets an update
-            self.mass.players.update(player.player_id, skip_forward=True)
-            self.mass.players.update(group_leader.player_id, skip_forward=True)
-
-    async def _getcliraop_binary(self):
-        """Find the correct raop/airplay binary belonging to the platform."""
-        # ruff: noqa: SIM102
-        if self.cliraop_bin is not None:
-            return self.cliraop_bin
-
-        async def check_binary(cliraop_path: str) -> str | None:
-            try:
-                returncode, output = await check_output(
-                    cliraop_path,
-                    "-check",
-                )
-                if returncode == 0 and output.strip().decode() == "cliraop check":
-                    self.cliraop_bin = cliraop_path
-                    return cliraop_path
-            except OSError:
-                return None
-
-        base_path = os.path.join(os.path.dirname(__file__), "bin")
-        system = platform.system().lower().replace("darwin", "macos")
-        architecture = platform.machine().lower()
-
-        if bridge_binary := await check_binary(
-            os.path.join(base_path, f"cliraop-{system}-{architecture}")
-        ):
-            return bridge_binary
-
-        msg = f"Unable to locate RAOP Play binary for {system}/{architecture}"
-        raise RuntimeError(msg)
-
-    def _get_sync_clients(self, player_id: str) -> list[AirPlayPlayer]:
-        """Get all sync clients for a player."""
-        mass_player = self.mass.players.get(player_id, True)
-        sync_clients: list[AirPlayPlayer] = []
-        # we need to return the player itself too
-        group_child_ids = {player_id}
-        group_child_ids.update(mass_player.group_childs)
-        for child_id in group_child_ids:
-            if client := self._players.get(child_id):
-                sync_clients.append(client)
-        return sync_clients
-
-    async def _setup_player(
-        self, player_id: str, display_name: str, info: AsyncServiceInfo
-    ) -> None:
-        """Handle setup of a new player that is discovered using mdns."""
-        address = get_primary_ip_address(info)
-        if address is None:
-            return
-        self.logger.debug("Discovered Airplay device %s on %s", display_name, address)
-        manufacturer, model = get_model_from_am(info.decoded_properties.get("am"))
-        if "apple tv" in model.lower():
-            # For now, we ignore the Apple TV until we implement the authentication.
-            # maybe we can simply use pyatv only for this part?
-            # the cliraop application has already been prepared to accept the secret.
-            self.logger.debug(
-                "Ignoring %s in discovery due to authentication requirement.", display_name
-            )
-            return
-        if not self.mass.config.get_raw_player_config_value(player_id, "enabled", True):
-            self.logger.debug("Ignoring %s in discovery as it is disabled.", display_name)
-            return
-        self._players[player_id] = AirPlayPlayer(self, player_id, info, address)
-        if not (volume := await self.mass.cache.get(player_id, base_key=CACHE_KEY_PREV_VOLUME)):
-            volume = FALLBACK_VOLUME
-        mass_player = Player(
-            player_id=player_id,
-            provider=self.instance_id,
-            type=PlayerType.PLAYER,
-            name=display_name,
-            available=True,
-            powered=False,
-            device_info=DeviceInfo(
-                model=model,
-                manufacturer=manufacturer,
-                address=address,
-            ),
-            supported_features=(
-                PlayerFeature.PAUSE,
-                PlayerFeature.SYNC,
-                PlayerFeature.VOLUME_SET,
-            ),
-            volume_level=volume,
-        )
-        await self.mass.players.register_or_update(mass_player)
-
-    async def _handle_dacp_request(  # noqa: PLR0915
-        self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter
-    ) -> None:
-        """Handle new connection on the socket."""
-        try:
-            raw_request = b""
-            while recv := await reader.read(1024):
-                raw_request += recv
-                if len(recv) < 1024:
-                    break
-
-            request = raw_request.decode("UTF-8")
-            if "\r\n\r\n" in request:
-                headers_raw, body = request.split("\r\n\r\n", 1)
-            else:
-                headers_raw = request
-                body = ""
-            headers_raw = headers_raw.split("\r\n")
-            headers = {}
-            for line in headers_raw[1:]:
-                if ":" not in line:
-                    continue
-                x, y = line.split(":", 1)
-                headers[x.strip()] = y.strip()
-            active_remote = headers.get("Active-Remote")
-            _, path, _ = headers_raw[0].split(" ")
-            airplay_player = next(
-                (
-                    x
-                    for x in self._players.values()
-                    if x.raop_stream and x.raop_stream.active_remote_id == active_remote
-                ),
-                None,
-            )
-            self.logger.debug(
-                "DACP request for %s (%s): %s -- %s",
-                airplay_player.discovery_info.name if airplay_player else "UNKNOWN PLAYER",
-                active_remote,
-                path,
-                body,
-            )
-            if not airplay_player:
-                return
-
-            player_id = airplay_player.player_id
-            mass_player = self.mass.players.get(player_id)
-            active_queue = self.mass.player_queues.get_active_queue(player_id)
-            if path == "/ctrl-int/1/nextitem":
-                self.mass.create_task(self.mass.player_queues.next(active_queue.queue_id))
-            elif path == "/ctrl-int/1/previtem":
-                self.mass.create_task(self.mass.player_queues.previous(active_queue.queue_id))
-            elif path == "/ctrl-int/1/play":
-                self.mass.create_task(self.mass.player_queues.play(active_queue.queue_id))
-            elif path == "/ctrl-int/1/playpause":
-                self.mass.create_task(self.mass.player_queues.play_pause(active_queue.queue_id))
-            elif path == "/ctrl-int/1/stop":
-                self.mass.create_task(self.mass.player_queues.stop(active_queue.queue_id))
-            elif path == "/ctrl-int/1/volumeup":
-                self.mass.create_task(self.mass.players.cmd_volume_up(player_id))
-            elif path == "/ctrl-int/1/volumedown":
-                self.mass.create_task(self.mass.players.cmd_volume_down(player_id))
-            elif path == "/ctrl-int/1/shuffle_songs":
-                queue = self.mass.player_queues.get(player_id)
-                self.mass.loop.call_soon(
-                    self.mass.player_queues.set_shuffle(
-                        active_queue.queue_id, not queue.shuffle_enabled
-                    )
-                )
-            elif path in ("/ctrl-int/1/pause", "/ctrl-int/1/discrete-pause"):
-                self.mass.create_task(self.mass.player_queues.pause(active_queue.queue_id))
-            elif "dmcp.device-volume=" in path:
-                if mass_player.device_info.manufacturer.lower() == "apple":
-                    # Apple devices only report their (new) volume level, they dont request it
-                    return
-                raop_volume = float(path.split("dmcp.device-volume=", 1)[-1])
-                volume = convert_airplay_volume(raop_volume)
-                if volume != mass_player.volume_level:
-                    self.mass.create_task(self.cmd_volume_set(player_id, volume))
-                    # optimistically set the new volume to prevent bouncing around
-                    mass_player.volume_level = volume
-            elif "dmcp.volume=" in path:
-                volume = int(path.split("dmcp.volume=", 1)[-1])
-                if volume != mass_player.volume_level:
-                    self.mass.create_task(self.cmd_volume_set(player_id, volume))
-                    # optimistically set the new volume to prevent bouncing around
-                    mass_player.volume_level = volume
-            elif "device-prevent-playback=1" in path:
-                # device switched to another source (or is powered off)
-                if raop_stream := airplay_player.raop_stream:
-                    # ignore this if we just started playing to prevent false positives
-                    if mass_player.elapsed_time > 10 and mass_player.state == PlayerState.PLAYING:
-                        raop_stream.prevent_playback = True
-                        self.mass.create_task(self.monitor_prevent_playback(player_id))
-            elif "device-prevent-playback=0" in path:
-                # device reports that its ready for playback again
-                if raop_stream := airplay_player.raop_stream:
-                    raop_stream.prevent_playback = False
-
-            # send response
-            date_str = utc().strftime("%a, %-d %b %Y %H:%M:%S")
-            response = (
-                f"HTTP/1.0 204 No Content\r\nDate: {date_str} "
-                "GMT\r\nDAAP-Server: iTunes/7.6.2 (Windows; N;)\r\nContent-Type: "
-                "application/x-dmap-tagged\r\nContent-Length: 0\r\n"
-                "Connection: close\r\n\r\n"
-            )
-            writer.write(response.encode())
-            await writer.drain()
-        finally:
-            writer.close()
-
-    async def monitor_prevent_playback(self, player_id: str):
-        """Monitor the prevent playback state of an airplay player."""
-        count = 0
-        if not (airplay_player := self._players.get(player_id)):
-            return
-        prev_active_remote_id = airplay_player.raop_stream.active_remote_id
-        while count < 40:
-            count += 1
-            if not (airplay_player := self._players.get(player_id)):
-                return
-            if not (raop_stream := airplay_player.raop_stream):
-                return
-            if raop_stream.active_remote_id != prev_active_remote_id:
-                # checksum
-                return
-            if not raop_stream.prevent_playback:
-                return
-            await asyncio.sleep(0.5)
-
-        airplay_player.logger.info(
-            "Player has been in prevent playback mode for too long, powering off.",
-        )
-        await self.mass.players.cmd_power(airplay_player.player_id, False)
+async def setup(
+    mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
+) -> ProviderInstanceType:
+    """Initialize provider(instance) with given configuration."""
+    return AirplayProvider(mass, manifest, config)
diff --git a/music_assistant/server/providers/airplay/const.py b/music_assistant/server/providers/airplay/const.py
new file mode 100644 (file)
index 0000000..3d9ecd8
--- /dev/null
@@ -0,0 +1,31 @@
+"""Constants for the AirPlay provider."""
+
+from __future__ import annotations
+
+from music_assistant.common.models.enums import ContentType
+from music_assistant.common.models.media_items import AudioFormat
+
+DOMAIN = "airplay"
+
+CONF_ENCRYPTION = "encryption"
+CONF_ALAC_ENCODE = "alac_encode"
+CONF_VOLUME_START = "volume_start"
+CONF_PASSWORD = "password"
+CONF_BIND_INTERFACE = "bind_interface"
+CONF_READ_AHEAD_BUFFER = "read_ahead_buffer"
+
+BACKOFF_TIME_LOWER_LIMIT = 15  # seconds
+BACKOFF_TIME_UPPER_LIMIT = 300  # Five minutes
+
+CONF_CREDENTIALS = "credentials"
+CACHE_KEY_PREV_VOLUME = "airplay_prev_volume"
+FALLBACK_VOLUME = 20
+
+AIRPLAY_FLOW_PCM_FORMAT = AudioFormat(
+    content_type=ContentType.PCM_F32LE,
+    sample_rate=44100,
+    bit_depth=32,
+)
+AIRPLAY_PCM_FORMAT = AudioFormat(
+    content_type=ContentType.from_bit_depth(16), sample_rate=44100, bit_depth=16
+)
diff --git a/music_assistant/server/providers/airplay/helpers.py b/music_assistant/server/providers/airplay/helpers.py
new file mode 100644 (file)
index 0000000..fe8f518
--- /dev/null
@@ -0,0 +1,52 @@
+"""Various helpers/utilities for the Airplay provider."""
+
+from __future__ import annotations
+
+from typing import TYPE_CHECKING
+
+from zeroconf import IPVersion
+
+if TYPE_CHECKING:
+    from zeroconf.asyncio import AsyncServiceInfo
+
+
+def convert_airplay_volume(value: float) -> int:
+    """Remap Airplay Volume to 0..100 scale."""
+    airplay_min = -30
+    airplay_max = 0
+    normal_min = 0
+    normal_max = 100
+    portion = (value - airplay_min) * (normal_max - normal_min) / (airplay_max - airplay_min)
+    return int(portion + normal_min)
+
+
+def get_model_from_am(am_property: str | None) -> tuple[str, str]:
+    """Return Manufacturer and Model name from mdns AM property."""
+    manufacturer = "Unknown"
+    model = "Generic Airplay device"
+    if not am_property:
+        return (manufacturer, model)
+    if isinstance(am_property, bytes):
+        am_property = am_property.decode("utf-8")
+    if am_property == "AudioAccessory5,1":
+        model = "HomePod"
+        manufacturer = "Apple"
+    elif "AppleTV" in am_property:
+        model = "Apple TV"
+        manufacturer = "Apple"
+    else:
+        model = am_property
+    return (manufacturer, model)
+
+
+def get_primary_ip_address(discovery_info: AsyncServiceInfo) -> str | None:
+    """Get primary IP address from zeroconf discovery info."""
+    for address in discovery_info.parsed_addresses(IPVersion.V4Only):
+        if address.startswith("127"):
+            # filter out loopback address
+            continue
+        if address.startswith("169.254"):
+            # filter out APIPA address
+            continue
+        return address
+    return None
diff --git a/music_assistant/server/providers/airplay/player.py b/music_assistant/server/providers/airplay/player.py
new file mode 100644 (file)
index 0000000..48e3a1b
--- /dev/null
@@ -0,0 +1,49 @@
+"""AirPlay Player definition."""
+
+from __future__ import annotations
+
+from typing import TYPE_CHECKING
+
+from music_assistant.common.models.enums import PlayerState
+
+if TYPE_CHECKING:
+    from zeroconf.asyncio import AsyncServiceInfo
+
+    from .provider import AirplayProvider
+    from .raop import RaopStream
+
+
+class AirPlayPlayer:
+    """Holds the details of the (discovered) Airplay (RAOP) player."""
+
+    def __init__(
+        self, prov: AirplayProvider, player_id: str, discovery_info: AsyncServiceInfo, address: str
+    ) -> None:
+        """Initialize AirPlayPlayer."""
+        self.prov = prov
+        self.mass = prov.mass
+        self.player_id = player_id
+        self.discovery_info = discovery_info
+        self.address = address
+        self.logger = prov.logger.getChild(player_id)
+        self.raop_stream: RaopStream | None = None
+
+    async def cmd_stop(self, update_state: bool = True) -> None:
+        """Send STOP command to player."""
+        if self.raop_stream:
+            # forward stop to the entire stream session
+            await self.raop_stream.session.stop()
+        if update_state and (mass_player := self.mass.players.get(self.player_id)):
+            mass_player.state = PlayerState.IDLE
+            self.mass.players.update(mass_player.player_id)
+
+    async def cmd_play(self) -> None:
+        """Send PLAY (unpause) command to player."""
+        if self.raop_stream and self.raop_stream.running:
+            await self.raop_stream.send_cli_command("ACTION=PLAY")
+
+    async def cmd_pause(self) -> None:
+        """Send PAUSE command to player."""
+        if not self.raop_stream or not self.raop_stream.running:
+            return
+        await self.raop_stream.send_cli_command("ACTION=PAUSE")
diff --git a/music_assistant/server/providers/airplay/provider.py b/music_assistant/server/providers/airplay/provider.py
new file mode 100644 (file)
index 0000000..51276ba
--- /dev/null
@@ -0,0 +1,621 @@
+"""Airplay Player provider for Music Assistant."""
+
+from __future__ import annotations
+
+import asyncio
+import os
+import platform
+import socket
+from random import randrange
+from typing import TYPE_CHECKING
+
+from zeroconf import ServiceStateChange
+from zeroconf.asyncio import AsyncServiceInfo
+
+from music_assistant.common.helpers.datetime import utc
+from music_assistant.common.helpers.util import get_ip_pton, select_free_port
+from music_assistant.common.models.config_entries import (
+    CONF_ENTRY_CROSSFADE,
+    CONF_ENTRY_CROSSFADE_DURATION,
+    CONF_ENTRY_EQ_BASS,
+    CONF_ENTRY_EQ_MID,
+    CONF_ENTRY_EQ_TREBLE,
+    CONF_ENTRY_FLOW_MODE_ENFORCED,
+    CONF_ENTRY_OUTPUT_CHANNELS,
+    CONF_ENTRY_SYNC_ADJUST,
+    ConfigEntry,
+    create_sample_rates_config_entry,
+)
+from music_assistant.common.models.enums import (
+    ConfigEntryType,
+    ContentType,
+    MediaType,
+    PlayerFeature,
+    PlayerState,
+    PlayerType,
+    ProviderFeature,
+)
+from music_assistant.common.models.media_items import AudioFormat
+from music_assistant.common.models.player import DeviceInfo, Player, PlayerMedia
+from music_assistant.server.helpers.audio import get_ffmpeg_stream
+from music_assistant.server.helpers.process import check_output
+from music_assistant.server.helpers.util import TaskManager, lock
+from music_assistant.server.models.player_provider import PlayerProvider
+from music_assistant.server.providers.airplay.raop import RaopStreamSession
+
+from .const import (
+    AIRPLAY_FLOW_PCM_FORMAT,
+    AIRPLAY_PCM_FORMAT,
+    CACHE_KEY_PREV_VOLUME,
+    CONF_ALAC_ENCODE,
+    CONF_ENCRYPTION,
+    CONF_PASSWORD,
+    CONF_READ_AHEAD_BUFFER,
+    FALLBACK_VOLUME,
+)
+from .helpers import convert_airplay_volume, get_model_from_am, get_primary_ip_address
+from .player import AirPlayPlayer
+
+if TYPE_CHECKING:
+    from music_assistant.server.providers.player_group import PlayerGroupProvider
+
+
+PLAYER_CONFIG_ENTRIES = (
+    CONF_ENTRY_FLOW_MODE_ENFORCED,
+    CONF_ENTRY_CROSSFADE,
+    CONF_ENTRY_CROSSFADE_DURATION,
+    CONF_ENTRY_EQ_BASS,
+    CONF_ENTRY_EQ_MID,
+    CONF_ENTRY_EQ_TREBLE,
+    CONF_ENTRY_OUTPUT_CHANNELS,
+    ConfigEntry(
+        key=CONF_ENCRYPTION,
+        type=ConfigEntryType.BOOLEAN,
+        default_value=False,
+        label="Enable encryption",
+        description="Enable encrypted communication with the player, "
+        "some (3rd party) players require this.",
+        category="airplay",
+    ),
+    ConfigEntry(
+        key=CONF_ALAC_ENCODE,
+        type=ConfigEntryType.BOOLEAN,
+        default_value=True,
+        label="Enable compression",
+        description="Save some network bandwidth by sending the audio as "
+        "(lossless) ALAC at the cost of a bit CPU.",
+        category="airplay",
+    ),
+    CONF_ENTRY_SYNC_ADJUST,
+    ConfigEntry(
+        key=CONF_PASSWORD,
+        type=ConfigEntryType.SECURE_STRING,
+        default_value=None,
+        required=False,
+        label="Device password",
+        description="Some devices require a password to connect/play.",
+        category="airplay",
+    ),
+    ConfigEntry(
+        key=CONF_READ_AHEAD_BUFFER,
+        type=ConfigEntryType.INTEGER,
+        default_value=1000,
+        required=False,
+        label="Audio buffer (ms)",
+        description="Amount of buffer (in milliseconds), "
+        "the player should keep to absorb network throughput jitter. "
+        "If you experience audio dropouts, try increasing this value.",
+        category="airplay",
+        range=(500, 3000),
+    ),
+    # airplay has fixed sample rate/bit depth so make this config entry static and hidden
+    create_sample_rates_config_entry(44100, 16, 44100, 16, True),
+)
+
+
+# TODO: Airplay provider
+# - Implement authentication for Apple TV
+# - Implement volume control for Apple devices using pyatv
+# - Implement metadata for Apple Apple devices using pyatv
+# - Use pyatv for communicating with original Apple devices (and use cliraop for actual streaming)
+# - Implement Airplay 2 support
+# - Implement late joining to existing stream (instead of restarting it)
+
+
+class AirplayProvider(PlayerProvider):
+    """Player provider for Airplay based players."""
+
+    cliraop_bin: str | None = None
+    _players: dict[str, AirPlayPlayer]
+    _dacp_server: asyncio.Server = None
+    _dacp_info: AsyncServiceInfo = None
+    _play_media_lock: asyncio.Lock = asyncio.Lock()
+
+    @property
+    def supported_features(self) -> tuple[ProviderFeature, ...]:
+        """Return the features supported by this Provider."""
+        return (ProviderFeature.SYNC_PLAYERS,)
+
+    async def handle_async_init(self) -> None:
+        """Handle async initialization of the provider."""
+        self._players = {}
+        self.cliraop_bin = await self._getcliraop_binary()
+        dacp_port = await select_free_port(39831, 49831)
+        self.dacp_id = dacp_id = f"{randrange(2 ** 64):X}"
+        self.logger.debug("Starting DACP ActiveRemote %s on port %s", dacp_id, dacp_port)
+        self._dacp_server = await asyncio.start_server(
+            self._handle_dacp_request, "0.0.0.0", dacp_port
+        )
+        zeroconf_type = "_dacp._tcp.local."
+        server_id = f"iTunes_Ctrl_{dacp_id}.{zeroconf_type}"
+        self._dacp_info = AsyncServiceInfo(
+            zeroconf_type,
+            name=server_id,
+            addresses=[await get_ip_pton(self.mass.streams.publish_ip)],
+            port=dacp_port,
+            properties={
+                "txtvers": "1",
+                "Ver": "63B5E5C0C201542E",
+                "DbId": "63B5E5C0C201542E",
+                "OSsi": "0x1F5",
+            },
+            server=f"{socket.gethostname()}.local",
+        )
+        await self.mass.aiozc.async_register_service(self._dacp_info)
+
+    async def on_mdns_service_state_change(
+        self, name: str, state_change: ServiceStateChange, info: AsyncServiceInfo | None
+    ) -> None:
+        """Handle MDNS service state callback."""
+        raw_id, display_name = name.split(".")[0].split("@", 1)
+        player_id = f"ap{raw_id.lower()}"
+        # handle removed player
+        if state_change == ServiceStateChange.Removed:
+            if mass_player := self.mass.players.get(player_id):
+                if not mass_player.available:
+                    return
+                # the player has become unavailable
+                self.logger.debug("Player offline: %s", display_name)
+                mass_player.available = False
+                self.mass.players.update(player_id)
+            return
+        # handle update for existing device
+        if airplay_player := self._players.get(player_id):
+            if mass_player := self.mass.players.get(player_id):
+                cur_address = get_primary_ip_address(info)
+                if cur_address and cur_address != airplay_player.address:
+                    airplay_player.logger.debug(
+                        "Address updated from %s to %s", airplay_player.address, cur_address
+                    )
+                    airplay_player.address = cur_address
+                    mass_player.device_info = DeviceInfo(
+                        model=mass_player.device_info.model,
+                        manufacturer=mass_player.device_info.manufacturer,
+                        address=str(cur_address),
+                    )
+                if not mass_player.available:
+                    self.logger.debug("Player back online: %s", display_name)
+                    mass_player.available = True
+            # always update the latest discovery info
+            airplay_player.discovery_info = info
+            self.mass.players.update(player_id)
+            return
+        # handle new player
+        await self._setup_player(player_id, display_name, info)
+
+    async def unload(self) -> None:
+        """Handle close/cleanup of the provider."""
+        # power off all players (will disconnect and close cliraop)
+        for player_id in self._players:
+            await self.cmd_power(player_id, False)
+        # shutdown DACP server
+        if self._dacp_server:
+            self._dacp_server.close()
+        # shutdown DACP zeroconf service
+        if self._dacp_info:
+            await self.mass.aiozc.async_unregister_service(self._dacp_info)
+
+    async def get_player_config_entries(self, player_id: str) -> tuple[ConfigEntry, ...]:
+        """Return all (provider/player specific) Config Entries for the given player (if any)."""
+        base_entries = await super().get_player_config_entries(player_id)
+        return (*base_entries, *PLAYER_CONFIG_ENTRIES)
+
+    async def cmd_stop(self, player_id: str) -> None:
+        """Send STOP command to given player.
+
+        - player_id: player_id of the player to handle the command.
+        """
+        if airplay_player := self._players.get(player_id):
+            await airplay_player.cmd_stop()
+
+    async def cmd_play(self, player_id: str) -> None:
+        """Send PLAY (unpause) command to given player.
+
+        - player_id: player_id of the player to handle the command.
+        """
+        if airplay_player := self._players.get(player_id):
+            await airplay_player.cmd_play()
+
+    async def cmd_pause(self, player_id: str) -> None:
+        """Send PAUSE command to given player.
+
+        - player_id: player_id of the player to handle the command.
+        """
+        player = self.mass.players.get(player_id)
+        if player.group_childs:
+            # pause is not supported while synced, use stop instead
+            self.logger.debug("Player is synced, using STOP instead of PAUSE")
+            await self.cmd_stop(player_id)
+            return
+        airplay_player = self._players[player_id]
+        await airplay_player.cmd_pause()
+
+    @lock
+    async def play_media(
+        self,
+        player_id: str,
+        media: PlayerMedia,
+    ) -> None:
+        """Handle PLAY MEDIA on given player."""
+        async with self._play_media_lock:
+            player = self.mass.players.get(player_id)
+            # set the active source for the player to the media queue
+            # this accounts for syncgroups and linked players (e.g. sonos)
+            player.active_source = media.queue_id
+            if player.synced_to:
+                # should not happen, but just in case
+                raise RuntimeError("Player is synced")
+            # always stop existing stream first
+            async with TaskManager(self.mass) as tg:
+                for airplay_player in self._get_sync_clients(player_id):
+                    tg.create_task(airplay_player.cmd_stop(update_state=False))
+            # select audio source
+            if media.media_type == MediaType.ANNOUNCEMENT:
+                # special case: stream announcement
+                input_format = AIRPLAY_PCM_FORMAT
+                audio_source = self.mass.streams.get_announcement_stream(
+                    media.custom_data["url"],
+                    output_format=AIRPLAY_PCM_FORMAT,
+                    use_pre_announce=media.custom_data["use_pre_announce"],
+                )
+            elif media.queue_id.startswith("ugp_"):
+                # special case: UGP stream
+                ugp_provider: PlayerGroupProvider = self.mass.get_provider("player_group")
+                ugp_stream = ugp_provider.ugp_streams[media.queue_id]
+                input_format = ugp_stream.output_format
+                audio_source = ugp_stream.subscribe()
+            elif media.queue_id and media.queue_item_id:
+                # regular queue (flow) stream request
+                input_format = AIRPLAY_FLOW_PCM_FORMAT
+                audio_source = self.mass.streams.get_flow_stream(
+                    queue=self.mass.player_queues.get(media.queue_id),
+                    start_queue_item=self.mass.player_queues.get_item(
+                        media.queue_id, media.queue_item_id
+                    ),
+                    pcm_format=input_format,
+                )
+            else:
+                # assume url or some other direct path
+                # NOTE: this will fail if its an uri not playable by ffmpeg
+                input_format = AIRPLAY_PCM_FORMAT
+                audio_source = get_ffmpeg_stream(
+                    audio_input=media.uri,
+                    input_format=AudioFormat(ContentType.try_parse(media.uri)),
+                    output_format=AIRPLAY_PCM_FORMAT,
+                )
+            # setup RaopStreamSession for player (and its sync childs if any)
+            sync_clients = self._get_sync_clients(player_id)
+            raop_stream_session = RaopStreamSession(self, sync_clients, input_format, audio_source)
+            await raop_stream_session.start()
+
+    async def cmd_volume_set(self, player_id: str, volume_level: int) -> None:
+        """Send VOLUME_SET command to given player.
+
+        - player_id: player_id of the player to handle the command.
+        - volume_level: volume level (0..100) to set on the player.
+        """
+        airplay_player = self._players[player_id]
+        if airplay_player.raop_stream and airplay_player.raop_stream.running:
+            await airplay_player.raop_stream.send_cli_command(f"VOLUME={volume_level}\n")
+        mass_player = self.mass.players.get(player_id)
+        mass_player.volume_level = volume_level
+        mass_player.volume_muted = volume_level == 0
+        self.mass.players.update(player_id)
+        # store last state in cache
+        await self.mass.cache.set(player_id, volume_level, base_key=CACHE_KEY_PREV_VOLUME)
+
+    @lock
+    async def cmd_sync(self, player_id: str, target_player: str) -> None:
+        """Handle SYNC command for given player.
+
+        Join/add the given player(id) to the given (master) player/sync group.
+
+            - player_id: player_id of the player to handle the command.
+            - target_player: player_id of the syncgroup master or group player.
+        """
+        if player_id == target_player:
+            return
+        child_player = self.mass.players.get(player_id)
+        assert child_player  # guard
+        parent_player = self.mass.players.get(target_player)
+        assert parent_player  # guard
+        if parent_player.synced_to:
+            raise RuntimeError("Player is already synced")
+        if child_player.synced_to and child_player.synced_to != target_player:
+            raise RuntimeError("Player is already synced to another player")
+        if player_id in parent_player.group_childs:
+            # nothing to do: player is already part of the group
+            return
+        # ensure the child does not have an existing steam session active
+        if airplay_player := self._players.get(player_id):
+            if airplay_player.raop_stream and airplay_player.raop_stream.running:
+                await airplay_player.raop_stream.session.remove_client(airplay_player)
+        # always make sure that the parent player is part of the sync group
+        parent_player.group_childs.add(parent_player.player_id)
+        parent_player.group_childs.add(child_player.player_id)
+        child_player.synced_to = parent_player.player_id
+        # mark players as powered
+        parent_player.powered = True
+        child_player.powered = True
+        # check if we should (re)start or join a stream session
+        active_queue = self.mass.player_queues.get_active_queue(parent_player.player_id)
+        if active_queue.state == PlayerState.PLAYING:
+            # playback needs to be restarted to form a new multi client stream session
+            # this could potentially be called by multiple players at the exact same time
+            # so we debounce the resync a bit here with a timer
+            self.mass.call_later(
+                1,
+                self.mass.player_queues.resume,
+                active_queue.queue_id,
+                fade_in=False,
+                task_id=f"resume_{active_queue.queue_id}",
+            )
+        else:
+            # make sure that the player manager gets an update
+            self.mass.players.update(child_player.player_id, skip_forward=True)
+            self.mass.players.update(parent_player.player_id, skip_forward=True)
+
+    @lock
+    async def cmd_unsync(self, player_id: str) -> None:
+        """Handle UNSYNC command for given player.
+
+        Remove the given player from any syncgroups it currently is synced to.
+
+            - player_id: player_id of the player to handle the command.
+        """
+        mass_player = self.mass.players.get(player_id, raise_unavailable=True)
+        if not mass_player.synced_to:
+            return
+        ap_player = self._players[player_id]
+        if ap_player.raop_stream and ap_player.raop_stream.running:
+            await ap_player.raop_stream.session.remove_client(ap_player)
+        group_leader = self.mass.players.get(mass_player.synced_to, raise_unavailable=True)
+        if player_id in group_leader.group_childs:
+            group_leader.group_childs.remove(player_id)
+        mass_player.synced_to = None
+        airplay_player = self._players.get(player_id)
+        await airplay_player.cmd_stop()
+        # make sure that the player manager gets an update
+        self.mass.players.update(mass_player.player_id, skip_forward=True)
+        self.mass.players.update(group_leader.player_id, skip_forward=True)
+
+    async def _getcliraop_binary(self):
+        """Find the correct raop/airplay binary belonging to the platform."""
+        # ruff: noqa: SIM102
+        if self.cliraop_bin is not None:
+            return self.cliraop_bin
+
+        async def check_binary(cliraop_path: str) -> str | None:
+            try:
+                returncode, output = await check_output(
+                    cliraop_path,
+                    "-check",
+                )
+                if returncode == 0 and output.strip().decode() == "cliraop check":
+                    self.cliraop_bin = cliraop_path
+                    return cliraop_path
+            except OSError:
+                return None
+
+        base_path = os.path.join(os.path.dirname(__file__), "bin")
+        system = platform.system().lower().replace("darwin", "macos")
+        architecture = platform.machine().lower()
+
+        if bridge_binary := await check_binary(
+            os.path.join(base_path, f"cliraop-{system}-{architecture}")
+        ):
+            return bridge_binary
+
+        msg = f"Unable to locate RAOP Play binary for {system}/{architecture}"
+        raise RuntimeError(msg)
+
+    def _get_sync_clients(self, player_id: str) -> list[AirPlayPlayer]:
+        """Get all sync clients for a player."""
+        mass_player = self.mass.players.get(player_id, True)
+        sync_clients: list[AirPlayPlayer] = []
+        # we need to return the player itself too
+        group_child_ids = {player_id}
+        group_child_ids.update(mass_player.group_childs)
+        for child_id in group_child_ids:
+            if client := self._players.get(child_id):
+                sync_clients.append(client)
+        return sync_clients
+
+    async def _setup_player(
+        self, player_id: str, display_name: str, info: AsyncServiceInfo
+    ) -> None:
+        """Handle setup of a new player that is discovered using mdns."""
+        address = get_primary_ip_address(info)
+        if address is None:
+            return
+        self.logger.debug("Discovered Airplay device %s on %s", display_name, address)
+        manufacturer, model = get_model_from_am(info.decoded_properties.get("am"))
+        if "apple tv" in model.lower():
+            # For now, we ignore the Apple TV until we implement the authentication.
+            # maybe we can simply use pyatv only for this part?
+            # the cliraop application has already been prepared to accept the secret.
+            self.logger.debug(
+                "Ignoring %s in discovery due to authentication requirement.", display_name
+            )
+            return
+        if not self.mass.config.get_raw_player_config_value(player_id, "enabled", True):
+            self.logger.debug("Ignoring %s in discovery as it is disabled.", display_name)
+            return
+        self._players[player_id] = AirPlayPlayer(self, player_id, info, address)
+        if not (volume := await self.mass.cache.get(player_id, base_key=CACHE_KEY_PREV_VOLUME)):
+            volume = FALLBACK_VOLUME
+        mass_player = Player(
+            player_id=player_id,
+            provider=self.instance_id,
+            type=PlayerType.PLAYER,
+            name=display_name,
+            available=True,
+            powered=False,
+            device_info=DeviceInfo(
+                model=model,
+                manufacturer=manufacturer,
+                address=address,
+            ),
+            supported_features=(
+                PlayerFeature.PAUSE,
+                PlayerFeature.SYNC,
+                PlayerFeature.VOLUME_SET,
+            ),
+            volume_level=volume,
+        )
+        await self.mass.players.register_or_update(mass_player)
+
+    async def _handle_dacp_request(  # noqa: PLR0915
+        self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter
+    ) -> None:
+        """Handle new connection on the socket."""
+        try:
+            raw_request = b""
+            while recv := await reader.read(1024):
+                raw_request += recv
+                if len(recv) < 1024:
+                    break
+
+            request = raw_request.decode("UTF-8")
+            if "\r\n\r\n" in request:
+                headers_raw, body = request.split("\r\n\r\n", 1)
+            else:
+                headers_raw = request
+                body = ""
+            headers_raw = headers_raw.split("\r\n")
+            headers = {}
+            for line in headers_raw[1:]:
+                if ":" not in line:
+                    continue
+                x, y = line.split(":", 1)
+                headers[x.strip()] = y.strip()
+            active_remote = headers.get("Active-Remote")
+            _, path, _ = headers_raw[0].split(" ")
+            airplay_player = next(
+                (
+                    x
+                    for x in self._players.values()
+                    if x.raop_stream and x.raop_stream.active_remote_id == active_remote
+                ),
+                None,
+            )
+            self.logger.debug(
+                "DACP request for %s (%s): %s -- %s",
+                airplay_player.discovery_info.name if airplay_player else "UNKNOWN PLAYER",
+                active_remote,
+                path,
+                body,
+            )
+            if not airplay_player:
+                return
+
+            player_id = airplay_player.player_id
+            mass_player = self.mass.players.get(player_id)
+            active_queue = self.mass.player_queues.get_active_queue(player_id)
+            if path == "/ctrl-int/1/nextitem":
+                self.mass.create_task(self.mass.player_queues.next(active_queue.queue_id))
+            elif path == "/ctrl-int/1/previtem":
+                self.mass.create_task(self.mass.player_queues.previous(active_queue.queue_id))
+            elif path == "/ctrl-int/1/play":
+                self.mass.create_task(self.mass.player_queues.play(active_queue.queue_id))
+            elif path == "/ctrl-int/1/playpause":
+                self.mass.create_task(self.mass.player_queues.play_pause(active_queue.queue_id))
+            elif path == "/ctrl-int/1/stop":
+                self.mass.create_task(self.mass.player_queues.stop(active_queue.queue_id))
+            elif path == "/ctrl-int/1/volumeup":
+                self.mass.create_task(self.mass.players.cmd_volume_up(player_id))
+            elif path == "/ctrl-int/1/volumedown":
+                self.mass.create_task(self.mass.players.cmd_volume_down(player_id))
+            elif path == "/ctrl-int/1/shuffle_songs":
+                queue = self.mass.player_queues.get(player_id)
+                self.mass.loop.call_soon(
+                    self.mass.player_queues.set_shuffle(
+                        active_queue.queue_id, not queue.shuffle_enabled
+                    )
+                )
+            elif path in ("/ctrl-int/1/pause", "/ctrl-int/1/discrete-pause"):
+                self.mass.create_task(self.mass.player_queues.pause(active_queue.queue_id))
+            elif "dmcp.device-volume=" in path:
+                if mass_player.device_info.manufacturer.lower() == "apple":
+                    # Apple devices only report their (new) volume level, they dont request it
+                    return
+                raop_volume = float(path.split("dmcp.device-volume=", 1)[-1])
+                volume = convert_airplay_volume(raop_volume)
+                if volume != mass_player.volume_level:
+                    self.mass.create_task(self.cmd_volume_set(player_id, volume))
+                    # optimistically set the new volume to prevent bouncing around
+                    mass_player.volume_level = volume
+            elif "dmcp.volume=" in path:
+                volume = int(path.split("dmcp.volume=", 1)[-1])
+                if volume != mass_player.volume_level:
+                    self.mass.create_task(self.cmd_volume_set(player_id, volume))
+                    # optimistically set the new volume to prevent bouncing around
+                    mass_player.volume_level = volume
+            elif "device-prevent-playback=1" in path:
+                # device switched to another source (or is powered off)
+                if raop_stream := airplay_player.raop_stream:
+                    # ignore this if we just started playing to prevent false positives
+                    if mass_player.elapsed_time > 10 and mass_player.state == PlayerState.PLAYING:
+                        raop_stream.prevent_playback = True
+                        self.mass.create_task(self.monitor_prevent_playback(player_id))
+            elif "device-prevent-playback=0" in path:
+                # device reports that its ready for playback again
+                if raop_stream := airplay_player.raop_stream:
+                    raop_stream.prevent_playback = False
+
+            # send response
+            date_str = utc().strftime("%a, %-d %b %Y %H:%M:%S")
+            response = (
+                f"HTTP/1.0 204 No Content\r\nDate: {date_str} "
+                "GMT\r\nDAAP-Server: iTunes/7.6.2 (Windows; N;)\r\nContent-Type: "
+                "application/x-dmap-tagged\r\nContent-Length: 0\r\n"
+                "Connection: close\r\n\r\n"
+            )
+            writer.write(response.encode())
+            await writer.drain()
+        finally:
+            writer.close()
+
+    async def monitor_prevent_playback(self, player_id: str):
+        """Monitor the prevent playback state of an airplay player."""
+        count = 0
+        if not (airplay_player := self._players.get(player_id)):
+            return
+        prev_active_remote_id = airplay_player.raop_stream.active_remote_id
+        while count < 40:
+            count += 1
+            if not (airplay_player := self._players.get(player_id)):
+                return
+            if not (raop_stream := airplay_player.raop_stream):
+                return
+            if raop_stream.active_remote_id != prev_active_remote_id:
+                # checksum
+                return
+            if not raop_stream.prevent_playback:
+                return
+            await asyncio.sleep(0.5)
+
+        airplay_player.logger.info(
+            "Player has been in prevent playback mode for too long, powering off.",
+        )
+        await self.mass.players.cmd_power(airplay_player.player_id, False)
diff --git a/music_assistant/server/providers/airplay/raop.py b/music_assistant/server/providers/airplay/raop.py
new file mode 100644 (file)
index 0000000..3424750
--- /dev/null
@@ -0,0 +1,399 @@
+"""Logic for RAOP (AirPlay 1) audio streaming to Airplay devices."""
+
+from __future__ import annotations
+
+import asyncio
+import logging
+import os
+import platform
+import time
+from collections.abc import AsyncGenerator
+from contextlib import suppress
+from random import randint
+from typing import TYPE_CHECKING
+
+from music_assistant.common.models.enums import PlayerState
+from music_assistant.constants import CONF_SYNC_ADJUST, VERBOSE_LOG_LEVEL
+from music_assistant.server.helpers.audio import get_player_filter_params
+from music_assistant.server.helpers.ffmpeg import FFMpeg
+from music_assistant.server.helpers.process import AsyncProcess, check_output
+from music_assistant.server.helpers.util import close_async_generator
+
+from .const import (
+    AIRPLAY_PCM_FORMAT,
+    CONF_ALAC_ENCODE,
+    CONF_BIND_INTERFACE,
+    CONF_ENCRYPTION,
+    CONF_PASSWORD,
+    CONF_READ_AHEAD_BUFFER,
+)
+
+if TYPE_CHECKING:
+    from music_assistant.common.models.media_items import AudioFormat
+    from music_assistant.common.models.player_queue import PlayerQueue
+
+    from .player import AirPlayPlayer
+    from .provider import AirplayProvider
+
+
+class RaopStreamSession:
+    """Object that holds the details of a (RAOP) stream session to one or more players."""
+
+    def __init__(
+        self,
+        airplay_provider: AirplayProvider,
+        sync_clients: list[AirPlayPlayer],
+        input_format: AudioFormat,
+        audio_source: AsyncGenerator[bytes, None],
+    ) -> None:
+        """Initialize RaopStreamSession."""
+        assert sync_clients
+        self.prov = airplay_provider
+        self.mass = airplay_provider.mass
+        self.input_format = input_format
+        self._sync_clients = sync_clients
+        self._audio_source = audio_source
+        self._audio_source_task: asyncio.Task | None = None
+        self._stopped: bool = False
+        self._lock = asyncio.Lock()
+
+    async def start(self) -> None:
+        """Initialize RaopStreamSession."""
+        # initialize raop stream for all players
+        for airplay_player in self._sync_clients:
+            if airplay_player.raop_stream and airplay_player.raop_stream.running:
+                raise RuntimeError("Player already has an active stream")
+            airplay_player.raop_stream = RaopStream(self, airplay_player)
+
+        async def audio_streamer() -> None:
+            """Stream audio to all players."""
+            generator_exhausted = False
+            try:
+                async for chunk in self._audio_source:
+                    if not self._sync_clients:
+                        return
+                    async with self._lock:
+                        await asyncio.gather(
+                            *[x.raop_stream.write_chunk(chunk) for x in self._sync_clients],
+                            return_exceptions=True,
+                        )
+                # entire stream consumed: send EOF
+                generator_exhausted = True
+                async with self._lock:
+                    await asyncio.gather(
+                        *[x.raop_stream.write_eof() for x in self._sync_clients],
+                        return_exceptions=True,
+                    )
+            finally:
+                if not generator_exhausted:
+                    await close_async_generator(self._audio_source)
+
+        # get current ntp and start RaopStream per player
+        _, stdout = await check_output(self.prov.cliraop_bin, "-ntp")
+        start_ntp = int(stdout.strip())
+        wait_start = 1500 + (250 * len(self._sync_clients))
+        async with self._lock:
+            await asyncio.gather(
+                *[x.raop_stream.start(start_ntp, wait_start) for x in self._sync_clients],
+                return_exceptions=True,
+            )
+        self._audio_source_task = asyncio.create_task(audio_streamer())
+
+    async def stop(self) -> None:
+        """Stop playback and cleanup."""
+        if self._stopped:
+            return
+        self._stopped = True
+        if self._audio_source_task and not self._audio_source_task.done():
+            self._audio_source_task.cancel()
+        await asyncio.gather(
+            *[self.remove_client(x) for x in self._sync_clients],
+            return_exceptions=True,
+        )
+
+    async def remove_client(self, airplay_player: AirPlayPlayer) -> None:
+        """Remove a sync client from the session."""
+        if airplay_player not in self._sync_clients:
+            return
+        assert airplay_player.raop_stream.session == self
+        async with self._lock:
+            self._sync_clients.remove(airplay_player)
+        await airplay_player.raop_stream.stop()
+        airplay_player.raop_stream = None
+
+    async def add_client(self, airplay_player: AirPlayPlayer) -> None:
+        """Add a sync client to the session."""
+        # TODO: Add the ability to add a new client to an existing session
+        # e.g. by counting the number of frames sent etc.
+        raise NotImplementedError("Adding clients to a session is not yet supported")
+
+
+class RaopStream:
+    """
+    RAOP (Airplay 1) Audio Streamer.
+
+    Python is not suitable for realtime audio streaming so we do the actual streaming
+    of (RAOP) audio using a small executable written in C based on libraop to do
+    the actual timestamped playback, which reads pcm audio from stdin
+    and we can send some interactive commands using a named pipe.
+    """
+
+    def __init__(
+        self,
+        session: RaopStreamSession,
+        airplay_player: AirPlayPlayer,
+    ) -> None:
+        """Initialize RaopStream."""
+        self.session = session
+        self.prov = session.prov
+        self.mass = session.prov.mass
+        self.airplay_player = airplay_player
+
+        # always generate a new active remote id to prevent race conditions
+        # with the named pipe used to send audio
+        self.active_remote_id: str = str(randint(1000, 8000))
+        self.prevent_playback: bool = False
+        self._log_reader_task: asyncio.Task | None = None
+        self._cliraop_proc: AsyncProcess | None = None
+        self._ffmpeg_proc: AsyncProcess | None = None
+        self._started = asyncio.Event()
+        self._stopped = False
+
+    @property
+    def running(self) -> bool:
+        """Return boolean if this stream is running."""
+        return not self._stopped and self._started.is_set()
+
+    async def start(self, start_ntp: int, wait_start: int = 1000) -> None:
+        """Initialize CLIRaop process for a player."""
+        extra_args = []
+        player_id = self.airplay_player.player_id
+        mass_player = self.mass.players.get(player_id)
+        bind_ip = await self.mass.config.get_provider_config_value(
+            self.prov.instance_id, CONF_BIND_INTERFACE
+        )
+        extra_args += ["-if", bind_ip]
+        if self.mass.config.get_raw_player_config_value(player_id, CONF_ENCRYPTION, False):
+            extra_args += ["-encrypt"]
+        if self.mass.config.get_raw_player_config_value(player_id, CONF_ALAC_ENCODE, True):
+            extra_args += ["-alac"]
+        for prop in ("et", "md", "am", "pk", "pw"):
+            if prop_value := self.airplay_player.discovery_info.decoded_properties.get(prop):
+                extra_args += [f"-{prop}", prop_value]
+        sync_adjust = self.mass.config.get_raw_player_config_value(player_id, CONF_SYNC_ADJUST, 0)
+        if device_password := self.mass.config.get_raw_player_config_value(
+            player_id, CONF_PASSWORD, None
+        ):
+            extra_args += ["-password", device_password]
+        if self.prov.logger.isEnabledFor(logging.DEBUG):
+            extra_args += ["-debug", "5"]
+        elif self.prov.logger.isEnabledFor(VERBOSE_LOG_LEVEL):
+            extra_args += ["-debug", "10"]
+        read_ahead = await self.mass.config.get_player_config_value(
+            player_id, CONF_READ_AHEAD_BUFFER
+        )
+
+        # create os pipes to pipe ffmpeg to cliraop
+        read, write = await asyncio.to_thread(os.pipe)
+
+        # ffmpeg handles the player specific stream + filters and pipes
+        # audio to the cliraop process
+        self._ffmpeg_proc = FFMpeg(
+            audio_input="-",
+            input_format=self.session.input_format,
+            output_format=AIRPLAY_PCM_FORMAT,
+            filter_params=get_player_filter_params(self.mass, player_id),
+            audio_output=write,
+        )
+        await self._ffmpeg_proc.start()
+        await asyncio.to_thread(os.close, write)
+
+        # cliraop is the binary that handles the actual raop streaming to the player
+        cliraop_args = [
+            self.prov.cliraop_bin,
+            "-ntpstart",
+            str(start_ntp),
+            "-port",
+            str(self.airplay_player.discovery_info.port),
+            "-wait",
+            str(wait_start - sync_adjust),
+            "-latency",
+            str(read_ahead),
+            "-volume",
+            str(mass_player.volume_level),
+            *extra_args,
+            "-dacp",
+            self.prov.dacp_id,
+            "-activeremote",
+            self.active_remote_id,
+            "-udn",
+            self.airplay_player.discovery_info.name,
+            self.airplay_player.address,
+            "-",
+        ]
+        self._cliraop_proc = AsyncProcess(cliraop_args, stdin=read, stderr=True, name="cliraop")
+        if platform.system() == "Darwin":
+            os.environ["DYLD_LIBRARY_PATH"] = "/usr/local/lib"
+        await self._cliraop_proc.start()
+        await asyncio.to_thread(os.close, read)
+        self._started.set()
+        self._log_reader_task = self.mass.create_task(self._log_watcher())
+
+    async def stop(self):
+        """Stop playback and cleanup."""
+        if self._stopped:
+            return
+        if self._cliraop_proc.proc and not self._cliraop_proc.closed:
+            await self.send_cli_command("ACTION=STOP")
+        self._stopped = True  # set after send_cli command!
+        if self._cliraop_proc.proc and not self._cliraop_proc.closed:
+            await self._cliraop_proc.close(True)
+        if self._ffmpeg_proc and not self._ffmpeg_proc.closed:
+            await self._ffmpeg_proc.close(True)
+        self._cliraop_proc = None
+        self._ffmpeg_proc = None
+
+    async def write_chunk(self, chunk: bytes) -> None:
+        """Write a (pcm) audio chunk."""
+        if self._stopped:
+            return
+        await self._started.wait()
+        await self._ffmpeg_proc.write(chunk)
+
+    async def write_eof(self) -> None:
+        """Write EOF."""
+        if self._stopped:
+            return
+        await self._started.wait()
+        await self._ffmpeg_proc.write_eof()
+
+    async def send_cli_command(self, command: str) -> None:
+        """Send an interactive command to the running CLIRaop binary."""
+        if self._stopped:
+            return
+        await self._started.wait()
+
+        if not command.endswith("\n"):
+            command += "\n"
+
+        def send_data():
+            with suppress(BrokenPipeError), open(named_pipe, "w") as f:
+                f.write(command)
+
+        named_pipe = f"/tmp/raop-{self.active_remote_id}"  # noqa: S108
+        self.airplay_player.logger.log(VERBOSE_LOG_LEVEL, "sending command %s", command)
+        await asyncio.to_thread(send_data)
+
+    async def _log_watcher(self) -> None:
+        """Monitor stderr for the running CLIRaop process."""
+        airplay_player = self.airplay_player
+        mass_player = self.mass.players.get(airplay_player.player_id)
+        queue = self.mass.player_queues.get_active_queue(mass_player.active_source)
+        logger = airplay_player.logger
+        lost_packets = 0
+        prev_metadata_checksum: str = ""
+        prev_progress_report: float = 0
+        async for line in self._cliraop_proc.iter_stderr():
+            if "elapsed milliseconds:" in line:
+                # this is received more or less every second while playing
+                millis = int(line.split("elapsed milliseconds: ")[1])
+                mass_player.elapsed_time = millis / 1000
+                mass_player.elapsed_time_last_updated = time.time()
+                # send metadata to player(s) if needed
+                # NOTE: this must all be done in separate tasks to not disturb audio
+                now = time.time()
+                if (
+                    mass_player.elapsed_time > 2
+                    and queue
+                    and queue.current_item
+                    and queue.current_item.streamdetails
+                ):
+                    metadata_checksum = (
+                        queue.current_item.streamdetails.stream_title
+                        or queue.current_item.queue_item_id
+                    )
+                    if prev_metadata_checksum != metadata_checksum:
+                        prev_metadata_checksum = metadata_checksum
+                        prev_progress_report = now
+                        self.mass.create_task(self._send_metadata(queue))
+                    # send the progress report every 5 seconds
+                    elif now - prev_progress_report >= 5:
+                        prev_progress_report = now
+                        self.mass.create_task(self._send_progress(queue))
+            if "set pause" in line or "Pause at" in line:
+                mass_player.state = PlayerState.PAUSED
+                self.mass.players.update(airplay_player.player_id)
+            if "Restarted at" in line or "restarting w/ pause" in line:
+                mass_player.state = PlayerState.PLAYING
+                self.mass.players.update(airplay_player.player_id)
+            if "restarting w/o pause" in line:
+                # streaming has started
+                mass_player.state = PlayerState.PLAYING
+                mass_player.elapsed_time = 0
+                mass_player.elapsed_time_last_updated = time.time()
+                self.mass.players.update(airplay_player.player_id)
+            if "lost packet out of backlog" in line:
+                lost_packets += 1
+                if lost_packets == 100:
+                    logger.error("High packet loss detected, restarting playback...")
+                    self.mass.create_task(self.mass.player_queues.resume(queue.queue_id))
+                else:
+                    logger.warning("Packet loss detected!")
+            if "end of stream reached" in line:
+                logger.debug("End of stream reached")
+                break
+
+            logger.log(VERBOSE_LOG_LEVEL, line)
+
+        # if we reach this point, the process exited
+        if airplay_player.raop_stream == self:
+            mass_player.state = PlayerState.IDLE
+            self.mass.players.update(airplay_player.player_id)
+        # ensure we're cleaned up afterwards (this also logs the returncode)
+        await self.stop()
+
+    async def _send_metadata(self, queue: PlayerQueue) -> None:
+        """Send metadata to player (and connected sync childs)."""
+        if not queue or not queue.current_item:
+            return
+        duration = min(queue.current_item.duration or 0, 3600)
+        title = queue.current_item.name
+        artist = ""
+        album = ""
+        if queue.current_item.streamdetails and queue.current_item.streamdetails.stream_title:
+            # stream title from radio station
+            stream_title = queue.current_item.streamdetails.stream_title
+            if " - " in stream_title:
+                artist, title = stream_title.split(" - ", 1)
+            else:
+                title = stream_title
+            # set album to radio station name
+            album = queue.current_item.name
+        elif media_item := queue.current_item.media_item:
+            title = media_item.name
+            if artist_str := getattr(media_item, "artist_str", None):
+                artist = artist_str
+            if _album := getattr(media_item, "album", None):
+                album = _album.name
+
+        cmd = f"TITLE={title or 'Music Assistant'}\nARTIST={artist}\nALBUM={album}\n"
+        cmd += f"DURATION={duration}\nPROGRESS=0\nACTION=SENDMETA\n"
+
+        await self.send_cli_command(cmd)
+
+        # get image
+        if not queue.current_item.image:
+            return
+
+        # the image format needs to be 500x500 jpeg for maximum compatibility with players
+        image_url = self.mass.metadata.get_image_url(
+            queue.current_item.image, size=500, prefer_proxy=True, image_format="jpeg"
+        )
+        await self.send_cli_command(f"ARTWORK={image_url}\n")
+
+    async def _send_progress(self, queue: PlayerQueue) -> None:
+        """Send progress report to player (and connected sync childs)."""
+        if not queue or not queue.current_item:
+            return
+        progress = int(queue.corrected_elapsed_time)
+        await self.send_cli_command(f"PROGRESS={progress}\n")
index f1b62be140bc8fdb7773fab540591f93fcd6ecff..dc355bf7fe0841fe91c924ef3ee1d4fae5717b86 100644 (file)
@@ -22,6 +22,7 @@ from zeroconf import IPVersion, ServiceStateChange
 
 from music_assistant.common.models.config_entries import (
     CONF_ENTRY_CROSSFADE,
+    CONF_ENTRY_ENFORCE_MP3,
     CONF_ENTRY_FLOW_MODE_HIDDEN_DISABLED,
     ConfigEntry,
     ConfigValueType,
@@ -29,6 +30,7 @@ from music_assistant.common.models.config_entries import (
 )
 from music_assistant.common.models.enums import (
     ConfigEntryType,
+    ContentType,
     EventType,
     PlayerFeature,
     PlayerState,
@@ -533,6 +535,7 @@ class SonosPlayerProvider(PlayerProvider):
             *await super().get_player_config_entries(player_id),
             CONF_ENTRY_CROSSFADE,
             CONF_ENTRY_FLOW_MODE_HIDDEN_DISABLED,
+            CONF_ENTRY_ENFORCE_MP3,
             create_sample_rates_config_entry(48000, 24, 48000, 24, True),
         )
         if not (sonos_player := self.sonos_players.get(player_id)):
@@ -666,6 +669,10 @@ class SonosPlayerProvider(PlayerProvider):
             return
 
         # play a single uri/url
+        if self.mass.config.get_raw_player_config_value(
+            player_id, CONF_ENTRY_ENFORCE_MP3.key, CONF_ENTRY_ENFORCE_MP3.default_value
+        ):
+            media.uri = media.uri.replace(".flac", ".mp3")
         await sonos_player.client.player.group.play_stream_url(
             media.uri, {"name": media.title, "type": "track"}
         )
@@ -773,6 +780,9 @@ class SonosPlayerProvider(PlayerProvider):
             limit=upcoming_window_size + previous_window_size,
             offset=max(queue_index - previous_window_size, 0),
         )
+        enforce_mp3 = self.mass.config.get_raw_player_config_value(
+            sonos_player_id, CONF_ENTRY_ENFORCE_MP3.key, CONF_ENTRY_ENFORCE_MP3.default_value
+        )
         sonos_queue_items = [
             {
                 "id": item.queue_item_id,
@@ -780,7 +790,9 @@ class SonosPlayerProvider(PlayerProvider):
                 "policies": {},
                 "track": {
                     "type": "track",
-                    "mediaUrl": self.mass.streams.resolve_stream_url(item),
+                    "mediaUrl": self.mass.streams.resolve_stream_url(
+                        item, output_codec=ContentType.MP3 if enforce_mp3 else ContentType.FLAC
+                    ),
                     "contentType": "audio/flac",
                     "service": {
                         "name": "Music Assistant",