Fixes for the squeezelite provider after refactor (#2338)
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Mon, 18 Aug 2025 09:30:57 +0000 (11:30 +0200)
committerGitHub <noreply@github.com>
Mon, 18 Aug 2025 09:30:57 +0000 (11:30 +0200)
music_assistant/providers/squeezelite/constants.py
music_assistant/providers/squeezelite/player.py
music_assistant/providers/squeezelite/provider.py

index 1c337898ab9c0a5676af67618d3aab6a91084543..8faae126e5338ceef564399c1b23c9e90353724b 100644 (file)
@@ -2,7 +2,12 @@
 
 from __future__ import annotations
 
+from dataclasses import dataclass
+
+from aioslimproto.client import PlayerState as SlimPlayerState
 from aioslimproto.models import VisualisationType as SlimVisualisationType
+from music_assistant_models.config_entries import ConfigEntry, ConfigValueOption
+from music_assistant_models.enums import ConfigEntryType, PlaybackState, RepeatMode
 
 CONF_CLI_TELNET_PORT = "cli_telnet_port"
 CONF_CLI_JSON_PORT = "cli_json_port"
@@ -12,5 +17,57 @@ DEFAULT_SLIMPROTO_PORT = 3483
 CONF_DISPLAY = "display"
 CONF_VISUALIZATION = "visualization"
 
+CACHE_KEY_PREV_STATE = "slimproto_prev_state"
 
+DEFAULT_PLAYER_VOLUME = 20
 DEFAULT_VISUALIZATION = SlimVisualisationType.NONE
+
+# sync constants
+MIN_DEVIATION_ADJUST = 8  # 5 milliseconds
+MIN_REQ_PLAYPOINTS = 8  # we need at least 8 measurements
+DEVIATION_JUMP_IGNORE = 500  # ignore a sudden unrealistic jump
+MAX_SKIP_AHEAD_MS = 800  # 0.8 seconds
+
+STATE_MAP = {
+    SlimPlayerState.BUFFERING: PlaybackState.PLAYING,
+    SlimPlayerState.BUFFER_READY: PlaybackState.PLAYING,
+    SlimPlayerState.PAUSED: PlaybackState.PAUSED,
+    SlimPlayerState.PLAYING: PlaybackState.PLAYING,
+    SlimPlayerState.STOPPED: PlaybackState.IDLE,
+}
+
+REPEATMODE_MAP = {RepeatMode.OFF: 0, RepeatMode.ONE: 1, RepeatMode.ALL: 2}
+
+CONF_ENTRY_DISPLAY = ConfigEntry(
+    key=CONF_DISPLAY,
+    type=ConfigEntryType.BOOLEAN,
+    default_value=False,
+    required=False,
+    label="Enable display support",
+    description="Enable/disable native display support on squeezebox or squeezelite32 hardware.",
+    category="advanced",
+)
+CONF_ENTRY_VISUALIZATION = ConfigEntry(
+    key=CONF_VISUALIZATION,
+    type=ConfigEntryType.STRING,
+    default_value=DEFAULT_VISUALIZATION,
+    options=[
+        ConfigValueOption(title=x.name.replace("_", " ").title(), value=x.value)
+        for x in SlimVisualisationType
+    ],
+    required=False,
+    label="Visualization type",
+    description="The type of visualization to show on the display "
+    "during playback if the device supports this.",
+    category="advanced",
+    depends_on=CONF_DISPLAY,
+)
+
+
+@dataclass
+class SyncPlayPoint:
+    """Simple structure to describe a Sync Playpoint."""
+
+    timestamp: float
+    sync_master: str
+    diff: int
index 67a39a4a98cdb7150f24d0122d98fe45c59592e9..33bde07f900c2b803a25bec37e43666200c52125 100644 (file)
@@ -2,13 +2,28 @@
 
 from __future__ import annotations
 
+import asyncio
+import statistics
+import time
+from collections import deque
 from collections.abc import Iterator
-from typing import TYPE_CHECKING
+from typing import TYPE_CHECKING, cast
 
-from aioslimproto.client import PlayerState as SlimPlayerState
-from aioslimproto.client import SlimClient
+from aioslimproto.models import EventType as SlimEventType
+from aioslimproto.models import PlayerState as SlimPlayerState
+from aioslimproto.models import Preset as SlimPreset
+from aioslimproto.models import SlimEvent
+from aioslimproto.models import VisualisationType as SlimVisualisationType
 from music_assistant_models.config_entries import ConfigEntry, ConfigValueOption, PlayerConfig
-from music_assistant_models.enums import ConfigEntryType, PlaybackState, PlayerFeature, PlayerType
+from music_assistant_models.enums import (
+    ConfigEntryType,
+    ContentType,
+    MediaType,
+    PlayerFeature,
+    PlayerType,
+    RepeatMode,
+)
+from music_assistant_models.errors import MusicAssistantError
 from music_assistant_models.media_items import AudioFormat
 
 from music_assistant.constants import (
@@ -21,54 +36,31 @@ from music_assistant.constants import (
     DEFAULT_PCM_FORMAT,
     create_sample_rates_config_entry,
 )
+from music_assistant.helpers.ffmpeg import get_ffmpeg_stream
 from music_assistant.helpers.util import TaskManager
 from music_assistant.models.player import DeviceInfo, Player, PlayerMedia
 
 from .constants import (
-    CONF_DISPLAY,
-    CONF_VISUALIZATION,
-    DEFAULT_VISUALIZATION,
-    SlimVisualisationType,
+    CACHE_KEY_PREV_STATE,
+    CONF_ENTRY_DISPLAY,
+    CONF_ENTRY_VISUALIZATION,
+    DEFAULT_PLAYER_VOLUME,
+    DEVIATION_JUMP_IGNORE,
+    MAX_SKIP_AHEAD_MS,
+    MIN_DEVIATION_ADJUST,
+    MIN_REQ_PLAYPOINTS,
+    REPEATMODE_MAP,
+    STATE_MAP,
+    SyncPlayPoint,
 )
+from .multi_client_stream import MultiClientStream
 
 if TYPE_CHECKING:
-    from aioslimproto.models import EventType as SlimEventType
-
-    from .provider import SqueezelitePlayerProvider
+    from aioslimproto.client import SlimClient
 
+    from music_assistant.providers.universal_group import UniversalGroupPlayer
 
-STATE_MAP = {
-    SlimPlayerState.BUFFERING: PlaybackState.PLAYING,
-    SlimPlayerState.BUFFER_READY: PlaybackState.PLAYING,
-    SlimPlayerState.PAUSED: PlaybackState.PAUSED,
-    SlimPlayerState.PLAYING: PlaybackState.PLAYING,
-    SlimPlayerState.STOPPED: PlaybackState.IDLE,
-}
-
-CONF_ENTRY_DISPLAY = ConfigEntry(
-    key=CONF_DISPLAY,
-    type=ConfigEntryType.BOOLEAN,
-    default_value=False,
-    required=False,
-    label="Enable display support",
-    description="Enable/disable native display support on squeezebox or squeezelite32 hardware.",
-    category="advanced",
-)
-CONF_ENTRY_VISUALIZATION = ConfigEntry(
-    key=CONF_VISUALIZATION,
-    type=ConfigEntryType.STRING,
-    default_value=DEFAULT_VISUALIZATION,
-    options=[
-        ConfigValueOption(title=x.name.replace("_", " ").title(), value=x.value)
-        for x in SlimVisualisationType
-    ],
-    required=False,
-    label="Visualization type",
-    description="The type of visualization to show on the display "
-    "during playback if the device supports this.",
-    category="advanced",
-    depends_on=CONF_DISPLAY,
-)
+    from .provider import SqueezelitePlayerProvider
 
 
 class SqueezelitePlayer(Player):
@@ -85,8 +77,7 @@ class SqueezelitePlayer(Player):
         """Initialize the Squeezelite Player."""
         super().__init__(provider, player_id)
         self.client = client
-        self.provider: SqueezelitePlayerProvider = provider
-
+        self._provider: SqueezelitePlayerProvider = provider
         # Set static player attributes
         self._attr_supported_features = {
             PlayerFeature.POWER,
@@ -98,18 +89,30 @@ class SqueezelitePlayer(Player):
             PlayerFeature.ENQUEUE,
             PlayerFeature.GAPLESS_PLAYBACK,
         }
-        self._attr_name = client.name
-        self._attr_available = True
-        self._attr_powered = client.powered
-        self._attr_device_info = DeviceInfo(
-            model=client.device_model,
-            ip_address=client.device_address,
-            manufacturer=client.device_type,
-        )
         self._attr_can_group_with = {provider.lookup_key}
+        self._multi_client_stream: MultiClientStream | None = None
+        self._sync_playpoints: deque[SyncPlayPoint] = deque(maxlen=MIN_REQ_PLAYPOINTS)
+        self._do_not_resync_before: float = 0.0
 
     async def setup(self) -> None:
         """Set up the player."""
+        player_id = self.client.player_id
+        self.logger.info("Player %s connected", self.client.name or player_id)
+        # set presets and display
+        await self._set_preset_items()
+        await self._set_display()
+        # update all dynamic attributes
+        self.update_attributes()
+        # restore volume and power state
+        if last_state := await self.mass.cache.get(player_id, base_key=CACHE_KEY_PREV_STATE):
+            init_power = last_state[0]
+            init_volume = last_state[1]
+        else:
+            init_volume = DEFAULT_PLAYER_VOLUME
+            init_power = False
+        await self.client.power(init_power)
+        await self.client.stop()
+        await self.client.volume_set(init_volume)
         await self.mass.players.register_or_update(self)
 
     async def get_config_entries(self) -> list[ConfigEntry]:
@@ -117,14 +120,13 @@ class SqueezelitePlayer(Player):
         base_entries = await super().get_config_entries()
         max_sample_rate = int(self.client.max_sample_rate)
         # create preset entries (for players that support it)
-        preset_entries = ()
         presets = []
         async for playlist in self.mass.music.playlists.iter_library_items(True):
             presets.append(ConfigValueOption(playlist.name, playlist.uri))
         async for radio in self.mass.music.radio.iter_library_items(True):
             presets.append(ConfigValueOption(radio.name, radio.uri))
         preset_count = 10
-        preset_entries = tuple(
+        preset_entries = [
             ConfigEntry(
                 key=f"preset_{index}",
                 type=ConfigEntryType.STRING,
@@ -136,84 +138,59 @@ class SqueezelitePlayer(Player):
                 required=False,
             )
             for index in range(1, preset_count + 1)
-        )
-        return (
-            base_entries
-            + preset_entries
-            + (
-                CONF_ENTRY_DEPRECATED_EQ_BASS,
-                CONF_ENTRY_DEPRECATED_EQ_MID,
-                CONF_ENTRY_DEPRECATED_EQ_TREBLE,
-                CONF_ENTRY_OUTPUT_CODEC,
-                CONF_ENTRY_SYNC_ADJUST,
-                CONF_ENTRY_DISPLAY,
-                CONF_ENTRY_VISUALIZATION,
-                CONF_ENTRY_HTTP_PROFILE_FORCED_2,
-                create_sample_rates_config_entry(
-                    max_sample_rate=max_sample_rate, max_bit_depth=24, safe_max_bit_depth=24
-                ),
-            )
-        )
-
-    async def handle_slim_event(self, event: SlimEventType) -> None:
-        """Handle player update from slimproto server."""
-        # Update player state from slim player
-        self._attr_available = True
-        self._attr_name = self.client.name
-        self._attr_powered = self.client.powered
-        self._attr_playback_state = STATE_MAP[self.client.state]
-        self._attr_volume_level = self.client.volume_level
-        self._attr_volume_muted = self.client.muted
-        self._attr_active_source = self.player_id
-
-        # Update current media if available
-        if self.client.current_media and (metadata := self.client.current_media.metadata):
-            self._attr_current_media = PlayerMedia(
-                uri=metadata.get("item_id"),
-                title=metadata.get("title"),
-                album=metadata.get("album"),
-                artist=metadata.get("artist"),
-                image_url=metadata.get("image_url"),
-                duration=metadata.get("duration"),
-                queue_id=metadata.get("queue_id"),
-                queue_item_id=metadata.get("queue_item_id"),
-            )
-        else:
-            self._attr_current_media = None
-
-        self.update_state()
+        ]
+        return [
+            *base_entries,
+            *preset_entries,
+            CONF_ENTRY_DEPRECATED_EQ_BASS,
+            CONF_ENTRY_DEPRECATED_EQ_MID,
+            CONF_ENTRY_DEPRECATED_EQ_TREBLE,
+            CONF_ENTRY_OUTPUT_CODEC,
+            CONF_ENTRY_SYNC_ADJUST,
+            CONF_ENTRY_DISPLAY,
+            CONF_ENTRY_VISUALIZATION,
+            CONF_ENTRY_HTTP_PROFILE_FORCED_2,
+            create_sample_rates_config_entry(
+                max_sample_rate=max_sample_rate, max_bit_depth=24, safe_max_bit_depth=24
+            ),
+        ]
 
     async def power(self, powered: bool) -> None:
         """Handle POWER command on the player."""
-        if powered:
-            await self.client.power_on()
-        else:
-            await self.client.power_off()
+        await self.client.power(powered)
+        # store last state in cache
+        await self.mass.cache.set(
+            self.player_id, (powered, self.client.volume_level), base_key=CACHE_KEY_PREV_STATE
+        )
 
     async def volume_set(self, volume_level: int) -> None:
         """Handle VOLUME_SET command on the player."""
         await self.client.volume_set(volume_level)
+        # store last state in cache
+        await self.mass.cache.set(
+            self.player_id, (self.client.powered, volume_level), base_key=CACHE_KEY_PREV_STATE
+        )
 
     async def volume_mute(self, muted: bool) -> None:
         """Handle VOLUME MUTE command on the player."""
-        await self.client.volume_mute(muted)
+        await self.client.mute(muted)
 
     async def stop(self) -> None:
         """Handle STOP command on the player."""
         async with TaskManager(self.mass) as tg:
-            for client in self.provider._get_sync_clients(self.player_id):
+            for client in self._get_sync_clients():
                 tg.create_task(client.stop())
 
     async def play(self) -> None:
         """Handle PLAY command on the player."""
         async with TaskManager(self.mass) as tg:
-            for client in self.provider._get_sync_clients(self.player_id):
+            for client in self._get_sync_clients():
                 tg.create_task(client.play())
 
     async def pause(self) -> None:
         """Handle PAUSE command on the player."""
         async with TaskManager(self.mass) as tg:
-            for client in self.provider._get_sync_clients(self.player_id):
+            for client in self._get_sync_clients():
                 tg.create_task(client.pause())
 
     async def play_media(self, media: PlayerMedia) -> None:
@@ -225,7 +202,6 @@ class SqueezelitePlayer(Player):
         if not self.group_members:
             # Simple, single-player playback
             await self._handle_play_url(
-                self.client,
                 url=media.uri,
                 media=media,
                 send_flush=True,
@@ -233,35 +209,83 @@ class SqueezelitePlayer(Player):
             )
             return
 
-        # This is a syncgroup, we need to handle this with a multi client stream
+        # this is a syncgroup, we need to handle this with a multi client stream
         master_audio_format = AudioFormat(
             content_type=DEFAULT_PCM_FORMAT.content_type,
-            sample_rate=48000,  # Default for squeezelite
-            bit_depth=16,
-            channels=2,
+            sample_rate=DEFAULT_PCM_FORMAT.sample_rate,
+            bit_depth=DEFAULT_PCM_FORMAT.bit_depth,
+        )
+        if media.media_type == MediaType.ANNOUNCEMENT:
+            # special case: stream announcement
+            audio_source = self.mass.streams.get_announcement_stream(
+                media.custom_data["url"],
+                output_format=master_audio_format,
+                use_pre_announce=media.custom_data["use_pre_announce"],
+            )
+        elif media.media_type == MediaType.PLUGIN_SOURCE:
+            # special case: plugin source stream
+            audio_source = self.mass.streams.get_plugin_source_stream(
+                plugin_source_id=media.custom_data["source_id"],
+                output_format=master_audio_format,
+                # need to pass player_id from the PlayerMedia object
+                # because this could have been a group
+                player_id=media.custom_data["player_id"],
+            )
+        elif media.queue_id.startswith("ugp_"):
+            # special case: UGP stream
+            ugp_player: UniversalGroupPlayer = self.mass.players.get(media.queue_id)
+            ugp_stream = ugp_player.stream
+            # Filter is later applied in MultiClientStream
+            audio_source = ugp_stream.get_stream(master_audio_format, filter_params=None)
+        elif media.queue_id and media.queue_item_id:
+            # regular queue stream request
+            audio_source = self.mass.streams.get_queue_flow_stream(
+                queue=self.mass.player_queues.get(media.queue_id),
+                start_queue_item=self.mass.player_queues.get_item(
+                    media.queue_id, media.queue_item_id
+                ),
+                pcm_format=master_audio_format,
+            )
+        else:
+            # assume url or some other direct path
+            # NOTE: this will fail if its an uri not playable by ffmpeg
+            audio_source = get_ffmpeg_stream(
+                audio_input=media.uri,
+                input_format=AudioFormat(ContentType.try_parse(media.uri)),
+                output_format=master_audio_format,
+            )
+        # start the stream task
+        self._multi_client_stream = stream = MultiClientStream(
+            audio_source=audio_source, audio_format=master_audio_format
+        )
+        base_url = (
+            f"{self.mass.streams.base_url}/slimproto/multi?player_id={self.player_id}&fmt=flac"
         )
 
-        # Start multi-client stream for sync group
-        await self._handle_multi_client_stream(media, master_audio_format)
+        # forward to downstream play_media commands
+        async with TaskManager(self.mass) as tg:
+            for slimplayer in self._get_sync_clients():
+                url = f"{base_url}&child_player_id={slimplayer.player_id}"
+                stream.expected_clients += 1
+                tg.create_task(
+                    self._handle_play_url(
+                        slimplayer,
+                        url=url,
+                        media=media,
+                        send_flush=True,
+                        auto_play=False,
+                    )
+                )
 
     async def enqueue_next_media(self, media: PlayerMedia) -> None:
         """Handle enqueuing next media item."""
-        if self.synced_to:
-            msg = "A synced player cannot receive enqueue commands directly"
-            raise RuntimeError(msg)
-
-        # Handle enqueue for single player or sync group
-        if not self.group_members:
-            await self._handle_play_url(
-                self.client,
-                url=media.uri,
-                media=media,
-                send_flush=False,
-                auto_play=True,
-            )
-        else:
-            # Handle multi-client enqueue
-            await self._handle_multi_client_enqueue(media)
+        await self._handle_play_url(
+            url=media.uri,
+            media=media,
+            enqueue=True,
+            send_flush=False,
+            auto_play=True,
+        )
 
     async def set_members(
         self,
@@ -276,25 +300,19 @@ class SqueezelitePlayer(Player):
             # nothing to do
             return
 
-        raop_session = self.raop_stream.session if self.raop_stream else None
         # handle removals first
         if player_ids_to_remove:
-            if self.player_id in player_ids_to_remove:
-                # dissolve the entire sync group
-                if self.raop_stream and self.raop_stream.running:
-                    # stop the stream session if it is running
-                    await self.raop_stream.session.stop()
-                self._attr_group_members = []
-                self.update_state()
-                return
-
-            for child_player in self._get_sync_clients():
-                if child_player.player_id in player_ids_to_remove:
-                    if raop_session:
-                        await raop_session.remove_client(child_player)
-                    self._attr_group_members.remove(child_player.player_id)
+            for sync_client in self._get_sync_clients():
+                if sync_client.player_id in player_ids_to_remove:
+                    if sync_client.player_id in self._attr_group_members:
+                        # remove child from the group
+                        self._attr_group_members.remove(sync_client.player_id)
+                        if sync_client.state != SlimPlayerState.STOPPED:
+                            # stop the player if it is playing
+                            await sync_client.stop()
 
         # handle additions
+        players_added = False
         for player_id in player_ids_to_add or []:
             if player_id == self.player_id or player_id in self.group_members:
                 # nothing to do: player is already part of the group
@@ -303,45 +321,87 @@ class SqueezelitePlayer(Player):
             if not child_player:
                 # should not happen, but guard against it
                 continue
-            if child_player.synced_to and child_player.synced_to != self.player_id:
-                raise RuntimeError("Player is already synced to another player")
-
-            # ensure the child does not have an existing stream session active
-            if child_player := self.mass.players.get(player_id):
-                if (
-                    child_player.raop_stream
-                    and child_player.raop_stream.running
-                    and child_player.raop_stream.session != raop_session
-                ):
-                    await child_player.raop_stream.session.remove_client(child_player)
-
-            # add new child to the existing raop session (if any)
+            if child_player.state != SlimPlayerState.STOPPED:
+                # stop the player if it is already playing something else
+                await child_player.stop()
             self._attr_group_members.append(player_id)
-            if raop_session:
-                await raop_session.add_client(child_player)
+            players_added = True
 
         # always update the state after modifying group members
         self.update_state()
 
+        stream_session = self._multi_client_stream
+        if players_added and stream_session and not stream_session.done:
+            # restart stream session if it was already playing
+            # for now, we dont support late joining into an existing stream
+            self.mass.create_task(self.play_media(self.current_media))
+
     def set_config(self, config: PlayerConfig) -> None:
         """Set/update the player config."""
         super().set_config(config)
+        # update preset and display when config changes
         self.mass.create_task(self._set_preset_items())
         self.mass.create_task(self._set_display())
 
+    def handle_slim_event(self, event: SlimEvent) -> None:
+        """Handle player event from slimproto server."""
+        if event.type == SlimEventType.PLAYER_BUFFER_READY:
+            self.mass.create_task(self._handle_buffer_ready())
+            return
+
+        if event.type == SlimEventType.PLAYER_HEARTBEAT:
+            self._handle_player_heartbeat()
+            return
+
+        if event.type in (SlimEventType.PLAYER_BTN_EVENT, SlimEventType.PLAYER_CLI_EVENT):
+            self.mass.create_task(self._handle_player_cli_event(event))
+            return
+
+        # all other: update attributes and update state
+        self.update_attributes()
+        self.update_state()
+
+    def update_attributes(self) -> None:
+        """Update player attributes from slim player."""
+        # Update player state from slim player
+        self._attr_available = self.client.connected
+        self._attr_name = self.client.name
+        self._attr_powered = self.client.powered
+        self._attr_playback_state = STATE_MAP[self.client.state]
+        self._attr_volume_level = self.client.volume_level
+        self._attr_volume_muted = self.client.muted
+        self._attr_active_source = self.player_id
+        self._attr_device_info = DeviceInfo(
+            model=self.client.device_model,
+            ip_address=self.client.device_address,
+            manufacturer=self.client.device_type,
+        )
+        self._attr_elapsed_time = self.client.elapsed_seconds
+        self._attr_elapsed_time_last_updated = time.time()
+        # Update current media if available
+        if self.client.current_media and (metadata := self.client.current_media.metadata):
+            self._attr_current_media = PlayerMedia(
+                uri=metadata.get("item_id"),
+                title=metadata.get("title"),
+                album=metadata.get("album"),
+                artist=metadata.get("artist"),
+                image_url=metadata.get("image_url"),
+                duration=metadata.get("duration"),
+                queue_id=metadata.get("queue_id"),
+                queue_item_id=metadata.get("queue_item_id"),
+            )
+        else:
+            self._attr_current_media = None
+
     async def _handle_play_url(
         self,
-        client: SlimClient,
         url: str,
         media: PlayerMedia,
+        enqueue: bool = False,
         send_flush: bool = True,
-        auto_play: bool = True,
+        auto_play: bool = False,
     ) -> None:
-        """Handle playing a URL on a client."""
-        if send_flush:
-            await client.flush()
-
-        # Send play command with metadata
+        """Handle playback of an url on slimproto player(s)."""
         metadata = {
             "item_id": media.uri,
             "title": media.title,
@@ -352,49 +412,245 @@ class SqueezelitePlayer(Player):
             "queue_id": media.queue_id,
             "queue_item_id": media.queue_item_id,
         }
+        if queue := self.mass.player_queues.get(media.queue_id):
+            self.extra_data["playlist repeat"] = REPEATMODE_MAP[queue.repeat_mode]
+            self.extra_data["playlist shuffle"] = int(queue.shuffle_enabled)
+        await self.client.play_url(
+            url=url,
+            mime_type=f"audio/{url.split('.')[-1].split('?')[0]}",
+            metadata=metadata,
+            enqueue=enqueue,
+            send_flush=send_flush,
+            # if autoplay=False playback will not start automatically
+            # instead 'buffer ready' will be called when the buffer is full
+            # to coordinate a start of multiple synced players
+            autostart=auto_play,
+        )
+        # if queue is set to single track repeat,
+        # immediately set this track as the next
+        # this prevents race conditions with super short audio clips (on single repeat)
+        # https://github.com/music-assistant/hass-music-assistant/issues/2059
+        if queue and queue.repeat_mode == RepeatMode.ONE:
+            self.mass.call_later(
+                0.2,
+                self.client.play_url(
+                    url=url,
+                    mime_type=f"audio/{url.split('.')[-1].split('?')[0]}",
+                    metadata=metadata,
+                    enqueue=True,
+                    send_flush=False,
+                    autostart=True,
+                ),
+            )
 
-        await client.play_url(url, metadata=metadata, auto_play=auto_play)
+    def _handle_player_heartbeat(self) -> None:
+        """Process SlimClient elapsed_time update."""
+        if self.client.state == SlimPlayerState.STOPPED:
+            # ignore server heartbeats when stopped
+            return
+        # elapsed time change on the player will be auto picked up
+        # by the player manager.
+        self._attr_elapsed_time = self.client.elapsed_seconds
+        self._attr_elapsed_time_last_updated = time.time()
 
-    def _get_sync_clients(self) -> Iterator[SlimClient]:
-        """Get all sync clients for a player."""
-        yield self.client
-        for member_id in self.group_members:
-            yield self.provider.slimproto.get_player(member_id)
+        # handle sync
+        if self.synced_to:
+            self._handle_sync()
 
-    async def _handle_multi_client_stream(
-        self, media: PlayerMedia, master_audio_format: AudioFormat
-    ) -> None:
-        """Handle multi-client stream for sync groups."""
-        # This would need implementation of the multi-client streaming logic
-        # For now, simplified implementation
-        sync_clients = list(self.provider._get_sync_clients(self.player_id))
+    async def _handle_buffer_ready(self) -> None:
+        """
+        Handle buffer ready event, player has buffered a (new) track.
 
-        # Play on all sync clients
+        Only used when autoplay=0 for coordinated start of synced players.
+        """
+        if self.synced_to:
+            # unpause of sync child is handled by sync master
+            return
+        if not self.group_members:
+            # not a sync group, continue
+            await self.client.unpause_at(self.client.jiffies)
+            return
+        count = 0
+        while count < 40:
+            childs_total = 0
+            childs_ready = 0
+            await asyncio.sleep(0.2)
+            for sync_child in self._get_sync_clients():
+                childs_total += 1
+                if sync_child.state == SlimPlayerState.BUFFER_READY:
+                    childs_ready += 1
+            if childs_total == childs_ready:
+                break
+
+        # all child's ready (or timeout) - start play
         async with TaskManager(self.mass) as tg:
-            for slimclient in sync_clients:
-                tg.create_task(
-                    self._handle_play_url(
-                        slimclient,
-                        media.uri,
-                        media,
-                        send_flush=True,
-                        auto_play=False,
-                    )
-                )
+            for sync_client in self._get_sync_clients():
+                # NOTE: Officially you should do an unpause_at based on the player timestamp
+                # but I did not have any good results with that.
+                # Instead just start playback on all players and let the sync logic work out
+                # the delays etc.
+                tg.create_task(sync_client.pause_for(200))
+
+    async def _handle_player_cli_event(self, event: SlimEvent) -> None:
+        """Process CLI Event."""
+        if not event.data:
+            return
+        # event data is str, not dict
+        # TODO: fix this in the aioslimproto lib
+        event_data = cast("str", event.data)
+        queue = self.mass.player_queues.get_active_queue(self.player_id)
+        if event_data.startswith("button preset_") and event_data.endswith(".single"):
+            preset_id = event_data.split("preset_")[1].split(".")[0]
+            preset_index = int(preset_id) - 1
+            if len(self.client.presets) >= preset_index + 1:
+                preset = self.client.presets[preset_index]
+                await self.mass.player_queues.play_media(queue.queue_id, preset.uri)
+        elif event_data == "button repeat":
+            if queue.repeat_mode == RepeatMode.OFF:
+                repeat_mode = RepeatMode.ONE
+            elif queue.repeat_mode == RepeatMode.ONE:
+                repeat_mode = RepeatMode.ALL
+            else:
+                repeat_mode = RepeatMode.OFF
+            self.mass.player_queues.set_repeat(queue.queue_id, repeat_mode)
+            self.client.extra_data["playlist repeat"] = REPEATMODE_MAP[queue.repeat_mode]
+            self.client.signal_update()
+        elif event.data == "button shuffle":
+            self.mass.player_queues.set_shuffle(queue.queue_id, not queue.shuffle_enabled)
+            self.client.extra_data["playlist shuffle"] = int(queue.shuffle_enabled)
+            self.client.signal_update()
+        elif event_data in ("button jump_fwd", "button fwd"):
+            await self.mass.player_queues.next(queue.queue_id)
+        elif event_data in ("button jump_rew", "button rew"):
+            await self.mass.player_queues.previous(queue.queue_id)
+        elif event_data.startswith("time "):
+            # seek request
+            _, param = event_data.split(" ", 1)
+            if param.isnumeric():
+                await self.mass.player_queues.seek(queue.queue_id, int(param))
+        self.logger.debug("CLI Event: %s", event_data)
+
+    def _handle_sync(self) -> None:
+        """Synchronize audio of a sync slimplayer."""
+        sync_master_id = self.synced_to
+        if not sync_master_id:
+            # we only correct sync members, not the sync master itself
+            return
+        if not (sync_master := self.provider.slimproto.get_player(sync_master_id)):
+            return  # just here as a guard as bad things can happen
 
-    async def _handle_multi_client_enqueue(self, media: PlayerMedia) -> None:
-        """Handle multi-client enqueue for sync groups."""
-        sync_clients = list(self.provider._get_sync_clients(self.player_id))
+        if sync_master.state != SlimPlayerState.PLAYING:
+            return
+        if self.client.state != SlimPlayerState.PLAYING:
+            return
 
-        # Enqueue on all sync clients
-        async with TaskManager(self.mass) as tg:
-            for slimclient in sync_clients:
-                tg.create_task(
-                    self._handle_play_url(
-                        slimclient,
-                        media.uri,
-                        media,
-                        send_flush=False,
-                        auto_play=True,
+        # we collect a few playpoints of the player to determine
+        # average lag/drift so we can adjust accordingly
+        sync_playpoints = self._sync_playpoints
+
+        now = time.time()
+        if now < self._do_not_resync_before:
+            return
+
+        last_playpoint = sync_playpoints[-1] if sync_playpoints else None
+        if last_playpoint and (now - last_playpoint.timestamp) > 10:
+            # last playpoint is too old, invalidate
+            sync_playpoints.clear()
+        if last_playpoint and last_playpoint.sync_master != sync_master.player_id:
+            # this should not happen, but just in case
+            sync_playpoints.clear()
+
+        diff = int(
+            self.provider.get_corrected_elapsed_milliseconds(sync_master)
+            - self.provider.get_corrected_elapsed_milliseconds(self.client)
+        )
+
+        sync_playpoints.append(SyncPlayPoint(now, sync_master.player_id, diff))
+
+        # ignore unexpected spikes
+        if (
+            sync_playpoints
+            and abs(statistics.fmean(abs(x.diff) for x in sync_playpoints) - abs(diff))
+            > DEVIATION_JUMP_IGNORE
+        ):
+            return
+
+        min_req_playpoints = 2 if sync_master.elapsed_seconds < 2 else MIN_REQ_PLAYPOINTS
+        if len(sync_playpoints) < min_req_playpoints:
+            return
+
+        # get the average diff
+        avg_diff = statistics.fmean(x.diff for x in sync_playpoints)
+        delta = int(abs(avg_diff))
+
+        if delta < MIN_DEVIATION_ADJUST:
+            return
+
+        # resync the player by skipping ahead or pause for x amount of (milli)seconds
+        sync_playpoints.clear()
+        self._do_not_resync_before = now + 5
+        if avg_diff > MAX_SKIP_AHEAD_MS:
+            # player lagging behind more than MAX_SKIP_AHEAD_MS,
+            # we need to correct the sync_master
+            self.logger.debug("%s resync: pauseFor %sms", sync_master.name, delta)
+            self.mass.create_task(sync_master.pause_for(delta))
+        elif avg_diff > 0:
+            # handle player lagging behind, fix with skip_ahead
+            self.logger.debug("%s resync: skipAhead %sms", self.display_name, delta)
+            self.mass.create_task(self.client.skip_over(delta))
+        else:
+            # handle player is drifting too far ahead, use pause_for to adjust
+            self.logger.debug("%s resync: pauseFor %sms", self.display_name, delta)
+            self.mass.create_task(self.client.pause_for(delta))
+
+    async def _set_preset_items(self) -> None:
+        """Set the presets for a player."""
+        preset_items: list[SlimPreset] = []
+        for preset_index in range(1, 11):
+            if preset_conf := self.mass.config.get_raw_player_config_value(
+                self.player_id, f"preset_{preset_index}"
+            ):
+                try:
+                    media_item = await self.mass.music.get_item_by_uri(preset_conf)
+                    preset_items.append(
+                        SlimPreset(
+                            uri=media_item.uri,
+                            text=media_item.name,
+                            icon=self.mass.metadata.get_image_url(media_item.image),
+                        )
                     )
-                )
+                except MusicAssistantError:
+                    # non-existing media item or some other edge case
+                    preset_items.append(
+                        SlimPreset(
+                            uri=f"preset_{preset_index}",
+                            text=f"ERROR <preset {preset_index}>",
+                            icon="",
+                        )
+                    )
+            else:
+                break
+        self.client.presets = preset_items
+
+    async def _set_display(self) -> None:
+        """Set the display config for a player."""
+        display_enabled = self.mass.config.get_raw_player_config_value(
+            self.player_id,
+            CONF_ENTRY_DISPLAY.key,
+            CONF_ENTRY_DISPLAY.default_value,
+        )
+        visualization = self.mass.config.get_raw_player_config_value(
+            self.player_id,
+            CONF_ENTRY_VISUALIZATION.key,
+            CONF_ENTRY_VISUALIZATION.default_value,
+        )
+        await self.client.configure_display(
+            visualisation=SlimVisualisationType(visualization), disabled=not display_enabled
+        )
+
+    def _get_sync_clients(self) -> Iterator[SlimClient]:
+        """Get all sync clients for a player."""
+        yield self.client
+        for member_id in self.group_members:
+            if slimplayer := self.provider.slimproto.get_player(member_id):
+                yield slimplayer
index f5afd6b8f28a023a1d678516792b39cdb29cb5ce..9293a970204a740be4f2607a53421c147b5c074b 100644 (file)
@@ -4,13 +4,15 @@ from __future__ import annotations
 
 import logging
 from dataclasses import dataclass
-from typing import TYPE_CHECKING
+from typing import TYPE_CHECKING, cast
 
+from aioslimproto.models import EventType as SlimEventType
+from aioslimproto.models import SlimEvent
 from aioslimproto.server import SlimServer
 from music_assistant_models.enums import ProviderFeature
 from music_assistant_models.errors import SetupFailedError
 
-from music_assistant.constants import CONF_PORT, VERBOSE_LOG_LEVEL
+from music_assistant.constants import CONF_PORT, CONF_SYNC_ADJUST, VERBOSE_LOG_LEVEL
 from music_assistant.helpers.util import is_port_in_use
 from music_assistant.models.player_provider import PlayerProvider
 
@@ -20,7 +22,6 @@ from .player import SqueezelitePlayer
 
 if TYPE_CHECKING:
     from aioslimproto.client import SlimClient
-    from aioslimproto.models import EventType as SlimEventType
 
 
 @dataclass
@@ -90,7 +91,7 @@ class SqueezelitePlayerProvider(PlayerProvider):
     async def loaded_in_mass(self) -> None:
         """Call after the provider has been loaded."""
         await super().loaded_in_mass()
-        self.slimproto.subscribe(self._client_callback)
+        self.slimproto.subscribe(self._handle_slimproto_event)
         self.mass.streams.register_dynamic_route(
             "/slimproto/multi", self._serve_multi_client_stream
         )
@@ -109,31 +110,37 @@ class SqueezelitePlayerProvider(PlayerProvider):
         self.mass.streams.unregister_dynamic_route("/slimproto/multi")
         self.mass.streams.unregister_dynamic_route("/jsonrpc.js")
 
-    async def _player_join(self, slimplayer: SlimClient) -> None:
-        """Handle player joining the slimproto server."""
-        player_id = slimplayer.player_id
-        if player_id in self._players:
-            return
-
-        self.logger.debug("Player %s joined the server", player_id)
+    def get_corrected_elapsed_milliseconds(self, slimplayer: SlimClient) -> int:
+        """Return corrected elapsed milliseconds for a slimplayer."""
+        sync_delay = self.mass.config.get_raw_player_config_value(
+            slimplayer.player_id, CONF_SYNC_ADJUST, 0
+        )
+        return slimplayer.elapsed_milliseconds - sync_delay
 
-        # Create SqueezelitePlayer instance
-        player = SqueezelitePlayer(self, player_id, slimplayer)
-        self._players[player_id] = player
+    def _handle_slimproto_event(
+        self,
+        event: SlimEvent,
+    ) -> None:
+        if self.mass.closing:
+            return
 
-        # Register with Music Assistant
-        await player.setup()
+        # handle new player connect (or reconnect of existing player)
+        if event.type == SlimEventType.PLAYER_CONNECTED:
+            if not (slimclient := self.slimproto.get_player(event.player_id)):
+                return  # should not happen, but guard anyways
+            player = SqueezelitePlayer(self, event.player_id, slimclient)
+            self.mass.create_task(player.setup())
+            return
 
-    async def _player_leave(self, player_id: str) -> None:
-        """Handle player leaving the slimproto server."""
-        self.logger.debug("Player %s left the server", player_id)
+        if not (player := self.mass.players.get(event.player_id)):
+            return  # guard for unknown player
+        if TYPE_CHECKING:
+            player = cast("SqueezelitePlayer", player)
 
-        if self._players.pop(player_id, None):
-            if mass_player := self.mass.players.get(player_id):
-                mass_player.available = False
-                self.mass.players.update(player_id)
+        # handle player disconnect
+        if event.type == SlimEventType.PLAYER_DISCONNECTED:
+            self.mass.create_task(self.mass.players.unregister(player.player_id))
+            return
 
-    async def _player_update(self, player_id: str, event: SlimEventType) -> None:
-        """Handle player update from slimproto server."""
-        if player := self._players.get(player_id):
-            await player.handle_slim_event(event)
+        # forward all other events to the player itself
+        player.handle_slim_event(event)