From dbdd7be75ce4b5d48b45cef74510153646d9122b Mon Sep 17 00:00:00 2001 From: Marcel van der Veldt Date: Mon, 18 Aug 2025 11:30:57 +0200 Subject: [PATCH] Fixes for the squeezelite provider after refactor (#2338) --- .../providers/squeezelite/constants.py | 57 ++ .../providers/squeezelite/player.py | 670 ++++++++++++------ .../providers/squeezelite/provider.py | 61 +- 3 files changed, 554 insertions(+), 234 deletions(-) diff --git a/music_assistant/providers/squeezelite/constants.py b/music_assistant/providers/squeezelite/constants.py index 1c337898..8faae126 100644 --- a/music_assistant/providers/squeezelite/constants.py +++ b/music_assistant/providers/squeezelite/constants.py @@ -2,7 +2,12 @@ from __future__ import annotations +from dataclasses import dataclass + +from aioslimproto.client import PlayerState as SlimPlayerState from aioslimproto.models import VisualisationType as SlimVisualisationType +from music_assistant_models.config_entries import ConfigEntry, ConfigValueOption +from music_assistant_models.enums import ConfigEntryType, PlaybackState, RepeatMode CONF_CLI_TELNET_PORT = "cli_telnet_port" CONF_CLI_JSON_PORT = "cli_json_port" @@ -12,5 +17,57 @@ DEFAULT_SLIMPROTO_PORT = 3483 CONF_DISPLAY = "display" CONF_VISUALIZATION = "visualization" +CACHE_KEY_PREV_STATE = "slimproto_prev_state" +DEFAULT_PLAYER_VOLUME = 20 DEFAULT_VISUALIZATION = SlimVisualisationType.NONE + +# sync constants +MIN_DEVIATION_ADJUST = 8 # 5 milliseconds +MIN_REQ_PLAYPOINTS = 8 # we need at least 8 measurements +DEVIATION_JUMP_IGNORE = 500 # ignore a sudden unrealistic jump +MAX_SKIP_AHEAD_MS = 800 # 0.8 seconds + +STATE_MAP = { + SlimPlayerState.BUFFERING: PlaybackState.PLAYING, + SlimPlayerState.BUFFER_READY: PlaybackState.PLAYING, + SlimPlayerState.PAUSED: PlaybackState.PAUSED, + SlimPlayerState.PLAYING: PlaybackState.PLAYING, + SlimPlayerState.STOPPED: PlaybackState.IDLE, +} + +REPEATMODE_MAP = {RepeatMode.OFF: 0, RepeatMode.ONE: 1, RepeatMode.ALL: 2} + +CONF_ENTRY_DISPLAY = ConfigEntry( + key=CONF_DISPLAY, + type=ConfigEntryType.BOOLEAN, + default_value=False, + required=False, + label="Enable display support", + description="Enable/disable native display support on squeezebox or squeezelite32 hardware.", + category="advanced", +) +CONF_ENTRY_VISUALIZATION = ConfigEntry( + key=CONF_VISUALIZATION, + type=ConfigEntryType.STRING, + default_value=DEFAULT_VISUALIZATION, + options=[ + ConfigValueOption(title=x.name.replace("_", " ").title(), value=x.value) + for x in SlimVisualisationType + ], + required=False, + label="Visualization type", + description="The type of visualization to show on the display " + "during playback if the device supports this.", + category="advanced", + depends_on=CONF_DISPLAY, +) + + +@dataclass +class SyncPlayPoint: + """Simple structure to describe a Sync Playpoint.""" + + timestamp: float + sync_master: str + diff: int diff --git a/music_assistant/providers/squeezelite/player.py b/music_assistant/providers/squeezelite/player.py index 67a39a4a..33bde07f 100644 --- a/music_assistant/providers/squeezelite/player.py +++ b/music_assistant/providers/squeezelite/player.py @@ -2,13 +2,28 @@ from __future__ import annotations +import asyncio +import statistics +import time +from collections import deque from collections.abc import Iterator -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, cast -from aioslimproto.client import PlayerState as SlimPlayerState -from aioslimproto.client import SlimClient +from aioslimproto.models import EventType as SlimEventType +from aioslimproto.models import PlayerState as SlimPlayerState +from aioslimproto.models import Preset as SlimPreset +from aioslimproto.models import SlimEvent +from aioslimproto.models import VisualisationType as SlimVisualisationType from music_assistant_models.config_entries import ConfigEntry, ConfigValueOption, PlayerConfig -from music_assistant_models.enums import ConfigEntryType, PlaybackState, PlayerFeature, PlayerType +from music_assistant_models.enums import ( + ConfigEntryType, + ContentType, + MediaType, + PlayerFeature, + PlayerType, + RepeatMode, +) +from music_assistant_models.errors import MusicAssistantError from music_assistant_models.media_items import AudioFormat from music_assistant.constants import ( @@ -21,54 +36,31 @@ from music_assistant.constants import ( DEFAULT_PCM_FORMAT, create_sample_rates_config_entry, ) +from music_assistant.helpers.ffmpeg import get_ffmpeg_stream from music_assistant.helpers.util import TaskManager from music_assistant.models.player import DeviceInfo, Player, PlayerMedia from .constants import ( - CONF_DISPLAY, - CONF_VISUALIZATION, - DEFAULT_VISUALIZATION, - SlimVisualisationType, + CACHE_KEY_PREV_STATE, + CONF_ENTRY_DISPLAY, + CONF_ENTRY_VISUALIZATION, + DEFAULT_PLAYER_VOLUME, + DEVIATION_JUMP_IGNORE, + MAX_SKIP_AHEAD_MS, + MIN_DEVIATION_ADJUST, + MIN_REQ_PLAYPOINTS, + REPEATMODE_MAP, + STATE_MAP, + SyncPlayPoint, ) +from .multi_client_stream import MultiClientStream if TYPE_CHECKING: - from aioslimproto.models import EventType as SlimEventType - - from .provider import SqueezelitePlayerProvider + from aioslimproto.client import SlimClient + from music_assistant.providers.universal_group import UniversalGroupPlayer -STATE_MAP = { - SlimPlayerState.BUFFERING: PlaybackState.PLAYING, - SlimPlayerState.BUFFER_READY: PlaybackState.PLAYING, - SlimPlayerState.PAUSED: PlaybackState.PAUSED, - SlimPlayerState.PLAYING: PlaybackState.PLAYING, - SlimPlayerState.STOPPED: PlaybackState.IDLE, -} - -CONF_ENTRY_DISPLAY = ConfigEntry( - key=CONF_DISPLAY, - type=ConfigEntryType.BOOLEAN, - default_value=False, - required=False, - label="Enable display support", - description="Enable/disable native display support on squeezebox or squeezelite32 hardware.", - category="advanced", -) -CONF_ENTRY_VISUALIZATION = ConfigEntry( - key=CONF_VISUALIZATION, - type=ConfigEntryType.STRING, - default_value=DEFAULT_VISUALIZATION, - options=[ - ConfigValueOption(title=x.name.replace("_", " ").title(), value=x.value) - for x in SlimVisualisationType - ], - required=False, - label="Visualization type", - description="The type of visualization to show on the display " - "during playback if the device supports this.", - category="advanced", - depends_on=CONF_DISPLAY, -) + from .provider import SqueezelitePlayerProvider class SqueezelitePlayer(Player): @@ -85,8 +77,7 @@ class SqueezelitePlayer(Player): """Initialize the Squeezelite Player.""" super().__init__(provider, player_id) self.client = client - self.provider: SqueezelitePlayerProvider = provider - + self._provider: SqueezelitePlayerProvider = provider # Set static player attributes self._attr_supported_features = { PlayerFeature.POWER, @@ -98,18 +89,30 @@ class SqueezelitePlayer(Player): PlayerFeature.ENQUEUE, PlayerFeature.GAPLESS_PLAYBACK, } - self._attr_name = client.name - self._attr_available = True - self._attr_powered = client.powered - self._attr_device_info = DeviceInfo( - model=client.device_model, - ip_address=client.device_address, - manufacturer=client.device_type, - ) self._attr_can_group_with = {provider.lookup_key} + self._multi_client_stream: MultiClientStream | None = None + self._sync_playpoints: deque[SyncPlayPoint] = deque(maxlen=MIN_REQ_PLAYPOINTS) + self._do_not_resync_before: float = 0.0 async def setup(self) -> None: """Set up the player.""" + player_id = self.client.player_id + self.logger.info("Player %s connected", self.client.name or player_id) + # set presets and display + await self._set_preset_items() + await self._set_display() + # update all dynamic attributes + self.update_attributes() + # restore volume and power state + if last_state := await self.mass.cache.get(player_id, base_key=CACHE_KEY_PREV_STATE): + init_power = last_state[0] + init_volume = last_state[1] + else: + init_volume = DEFAULT_PLAYER_VOLUME + init_power = False + await self.client.power(init_power) + await self.client.stop() + await self.client.volume_set(init_volume) await self.mass.players.register_or_update(self) async def get_config_entries(self) -> list[ConfigEntry]: @@ -117,14 +120,13 @@ class SqueezelitePlayer(Player): base_entries = await super().get_config_entries() max_sample_rate = int(self.client.max_sample_rate) # create preset entries (for players that support it) - preset_entries = () presets = [] async for playlist in self.mass.music.playlists.iter_library_items(True): presets.append(ConfigValueOption(playlist.name, playlist.uri)) async for radio in self.mass.music.radio.iter_library_items(True): presets.append(ConfigValueOption(radio.name, radio.uri)) preset_count = 10 - preset_entries = tuple( + preset_entries = [ ConfigEntry( key=f"preset_{index}", type=ConfigEntryType.STRING, @@ -136,84 +138,59 @@ class SqueezelitePlayer(Player): required=False, ) for index in range(1, preset_count + 1) - ) - return ( - base_entries - + preset_entries - + ( - CONF_ENTRY_DEPRECATED_EQ_BASS, - CONF_ENTRY_DEPRECATED_EQ_MID, - CONF_ENTRY_DEPRECATED_EQ_TREBLE, - CONF_ENTRY_OUTPUT_CODEC, - CONF_ENTRY_SYNC_ADJUST, - CONF_ENTRY_DISPLAY, - CONF_ENTRY_VISUALIZATION, - CONF_ENTRY_HTTP_PROFILE_FORCED_2, - create_sample_rates_config_entry( - max_sample_rate=max_sample_rate, max_bit_depth=24, safe_max_bit_depth=24 - ), - ) - ) - - async def handle_slim_event(self, event: SlimEventType) -> None: - """Handle player update from slimproto server.""" - # Update player state from slim player - self._attr_available = True - self._attr_name = self.client.name - self._attr_powered = self.client.powered - self._attr_playback_state = STATE_MAP[self.client.state] - self._attr_volume_level = self.client.volume_level - self._attr_volume_muted = self.client.muted - self._attr_active_source = self.player_id - - # Update current media if available - if self.client.current_media and (metadata := self.client.current_media.metadata): - self._attr_current_media = PlayerMedia( - uri=metadata.get("item_id"), - title=metadata.get("title"), - album=metadata.get("album"), - artist=metadata.get("artist"), - image_url=metadata.get("image_url"), - duration=metadata.get("duration"), - queue_id=metadata.get("queue_id"), - queue_item_id=metadata.get("queue_item_id"), - ) - else: - self._attr_current_media = None - - self.update_state() + ] + return [ + *base_entries, + *preset_entries, + CONF_ENTRY_DEPRECATED_EQ_BASS, + CONF_ENTRY_DEPRECATED_EQ_MID, + CONF_ENTRY_DEPRECATED_EQ_TREBLE, + CONF_ENTRY_OUTPUT_CODEC, + CONF_ENTRY_SYNC_ADJUST, + CONF_ENTRY_DISPLAY, + CONF_ENTRY_VISUALIZATION, + CONF_ENTRY_HTTP_PROFILE_FORCED_2, + create_sample_rates_config_entry( + max_sample_rate=max_sample_rate, max_bit_depth=24, safe_max_bit_depth=24 + ), + ] async def power(self, powered: bool) -> None: """Handle POWER command on the player.""" - if powered: - await self.client.power_on() - else: - await self.client.power_off() + await self.client.power(powered) + # store last state in cache + await self.mass.cache.set( + self.player_id, (powered, self.client.volume_level), base_key=CACHE_KEY_PREV_STATE + ) async def volume_set(self, volume_level: int) -> None: """Handle VOLUME_SET command on the player.""" await self.client.volume_set(volume_level) + # store last state in cache + await self.mass.cache.set( + self.player_id, (self.client.powered, volume_level), base_key=CACHE_KEY_PREV_STATE + ) async def volume_mute(self, muted: bool) -> None: """Handle VOLUME MUTE command on the player.""" - await self.client.volume_mute(muted) + await self.client.mute(muted) async def stop(self) -> None: """Handle STOP command on the player.""" async with TaskManager(self.mass) as tg: - for client in self.provider._get_sync_clients(self.player_id): + for client in self._get_sync_clients(): tg.create_task(client.stop()) async def play(self) -> None: """Handle PLAY command on the player.""" async with TaskManager(self.mass) as tg: - for client in self.provider._get_sync_clients(self.player_id): + for client in self._get_sync_clients(): tg.create_task(client.play()) async def pause(self) -> None: """Handle PAUSE command on the player.""" async with TaskManager(self.mass) as tg: - for client in self.provider._get_sync_clients(self.player_id): + for client in self._get_sync_clients(): tg.create_task(client.pause()) async def play_media(self, media: PlayerMedia) -> None: @@ -225,7 +202,6 @@ class SqueezelitePlayer(Player): if not self.group_members: # Simple, single-player playback await self._handle_play_url( - self.client, url=media.uri, media=media, send_flush=True, @@ -233,35 +209,83 @@ class SqueezelitePlayer(Player): ) return - # This is a syncgroup, we need to handle this with a multi client stream + # this is a syncgroup, we need to handle this with a multi client stream master_audio_format = AudioFormat( content_type=DEFAULT_PCM_FORMAT.content_type, - sample_rate=48000, # Default for squeezelite - bit_depth=16, - channels=2, + sample_rate=DEFAULT_PCM_FORMAT.sample_rate, + bit_depth=DEFAULT_PCM_FORMAT.bit_depth, + ) + if media.media_type == MediaType.ANNOUNCEMENT: + # special case: stream announcement + audio_source = self.mass.streams.get_announcement_stream( + media.custom_data["url"], + output_format=master_audio_format, + use_pre_announce=media.custom_data["use_pre_announce"], + ) + elif media.media_type == MediaType.PLUGIN_SOURCE: + # special case: plugin source stream + audio_source = self.mass.streams.get_plugin_source_stream( + plugin_source_id=media.custom_data["source_id"], + output_format=master_audio_format, + # need to pass player_id from the PlayerMedia object + # because this could have been a group + player_id=media.custom_data["player_id"], + ) + elif media.queue_id.startswith("ugp_"): + # special case: UGP stream + ugp_player: UniversalGroupPlayer = self.mass.players.get(media.queue_id) + ugp_stream = ugp_player.stream + # Filter is later applied in MultiClientStream + audio_source = ugp_stream.get_stream(master_audio_format, filter_params=None) + elif media.queue_id and media.queue_item_id: + # regular queue stream request + audio_source = self.mass.streams.get_queue_flow_stream( + queue=self.mass.player_queues.get(media.queue_id), + start_queue_item=self.mass.player_queues.get_item( + media.queue_id, media.queue_item_id + ), + pcm_format=master_audio_format, + ) + else: + # assume url or some other direct path + # NOTE: this will fail if its an uri not playable by ffmpeg + audio_source = get_ffmpeg_stream( + audio_input=media.uri, + input_format=AudioFormat(ContentType.try_parse(media.uri)), + output_format=master_audio_format, + ) + # start the stream task + self._multi_client_stream = stream = MultiClientStream( + audio_source=audio_source, audio_format=master_audio_format + ) + base_url = ( + f"{self.mass.streams.base_url}/slimproto/multi?player_id={self.player_id}&fmt=flac" ) - # Start multi-client stream for sync group - await self._handle_multi_client_stream(media, master_audio_format) + # forward to downstream play_media commands + async with TaskManager(self.mass) as tg: + for slimplayer in self._get_sync_clients(): + url = f"{base_url}&child_player_id={slimplayer.player_id}" + stream.expected_clients += 1 + tg.create_task( + self._handle_play_url( + slimplayer, + url=url, + media=media, + send_flush=True, + auto_play=False, + ) + ) async def enqueue_next_media(self, media: PlayerMedia) -> None: """Handle enqueuing next media item.""" - if self.synced_to: - msg = "A synced player cannot receive enqueue commands directly" - raise RuntimeError(msg) - - # Handle enqueue for single player or sync group - if not self.group_members: - await self._handle_play_url( - self.client, - url=media.uri, - media=media, - send_flush=False, - auto_play=True, - ) - else: - # Handle multi-client enqueue - await self._handle_multi_client_enqueue(media) + await self._handle_play_url( + url=media.uri, + media=media, + enqueue=True, + send_flush=False, + auto_play=True, + ) async def set_members( self, @@ -276,25 +300,19 @@ class SqueezelitePlayer(Player): # nothing to do return - raop_session = self.raop_stream.session if self.raop_stream else None # handle removals first if player_ids_to_remove: - if self.player_id in player_ids_to_remove: - # dissolve the entire sync group - if self.raop_stream and self.raop_stream.running: - # stop the stream session if it is running - await self.raop_stream.session.stop() - self._attr_group_members = [] - self.update_state() - return - - for child_player in self._get_sync_clients(): - if child_player.player_id in player_ids_to_remove: - if raop_session: - await raop_session.remove_client(child_player) - self._attr_group_members.remove(child_player.player_id) + for sync_client in self._get_sync_clients(): + if sync_client.player_id in player_ids_to_remove: + if sync_client.player_id in self._attr_group_members: + # remove child from the group + self._attr_group_members.remove(sync_client.player_id) + if sync_client.state != SlimPlayerState.STOPPED: + # stop the player if it is playing + await sync_client.stop() # handle additions + players_added = False for player_id in player_ids_to_add or []: if player_id == self.player_id or player_id in self.group_members: # nothing to do: player is already part of the group @@ -303,45 +321,87 @@ class SqueezelitePlayer(Player): if not child_player: # should not happen, but guard against it continue - if child_player.synced_to and child_player.synced_to != self.player_id: - raise RuntimeError("Player is already synced to another player") - - # ensure the child does not have an existing stream session active - if child_player := self.mass.players.get(player_id): - if ( - child_player.raop_stream - and child_player.raop_stream.running - and child_player.raop_stream.session != raop_session - ): - await child_player.raop_stream.session.remove_client(child_player) - - # add new child to the existing raop session (if any) + if child_player.state != SlimPlayerState.STOPPED: + # stop the player if it is already playing something else + await child_player.stop() self._attr_group_members.append(player_id) - if raop_session: - await raop_session.add_client(child_player) + players_added = True # always update the state after modifying group members self.update_state() + stream_session = self._multi_client_stream + if players_added and stream_session and not stream_session.done: + # restart stream session if it was already playing + # for now, we dont support late joining into an existing stream + self.mass.create_task(self.play_media(self.current_media)) + def set_config(self, config: PlayerConfig) -> None: """Set/update the player config.""" super().set_config(config) + # update preset and display when config changes self.mass.create_task(self._set_preset_items()) self.mass.create_task(self._set_display()) + def handle_slim_event(self, event: SlimEvent) -> None: + """Handle player event from slimproto server.""" + if event.type == SlimEventType.PLAYER_BUFFER_READY: + self.mass.create_task(self._handle_buffer_ready()) + return + + if event.type == SlimEventType.PLAYER_HEARTBEAT: + self._handle_player_heartbeat() + return + + if event.type in (SlimEventType.PLAYER_BTN_EVENT, SlimEventType.PLAYER_CLI_EVENT): + self.mass.create_task(self._handle_player_cli_event(event)) + return + + # all other: update attributes and update state + self.update_attributes() + self.update_state() + + def update_attributes(self) -> None: + """Update player attributes from slim player.""" + # Update player state from slim player + self._attr_available = self.client.connected + self._attr_name = self.client.name + self._attr_powered = self.client.powered + self._attr_playback_state = STATE_MAP[self.client.state] + self._attr_volume_level = self.client.volume_level + self._attr_volume_muted = self.client.muted + self._attr_active_source = self.player_id + self._attr_device_info = DeviceInfo( + model=self.client.device_model, + ip_address=self.client.device_address, + manufacturer=self.client.device_type, + ) + self._attr_elapsed_time = self.client.elapsed_seconds + self._attr_elapsed_time_last_updated = time.time() + # Update current media if available + if self.client.current_media and (metadata := self.client.current_media.metadata): + self._attr_current_media = PlayerMedia( + uri=metadata.get("item_id"), + title=metadata.get("title"), + album=metadata.get("album"), + artist=metadata.get("artist"), + image_url=metadata.get("image_url"), + duration=metadata.get("duration"), + queue_id=metadata.get("queue_id"), + queue_item_id=metadata.get("queue_item_id"), + ) + else: + self._attr_current_media = None + async def _handle_play_url( self, - client: SlimClient, url: str, media: PlayerMedia, + enqueue: bool = False, send_flush: bool = True, - auto_play: bool = True, + auto_play: bool = False, ) -> None: - """Handle playing a URL on a client.""" - if send_flush: - await client.flush() - - # Send play command with metadata + """Handle playback of an url on slimproto player(s).""" metadata = { "item_id": media.uri, "title": media.title, @@ -352,49 +412,245 @@ class SqueezelitePlayer(Player): "queue_id": media.queue_id, "queue_item_id": media.queue_item_id, } + if queue := self.mass.player_queues.get(media.queue_id): + self.extra_data["playlist repeat"] = REPEATMODE_MAP[queue.repeat_mode] + self.extra_data["playlist shuffle"] = int(queue.shuffle_enabled) + await self.client.play_url( + url=url, + mime_type=f"audio/{url.split('.')[-1].split('?')[0]}", + metadata=metadata, + enqueue=enqueue, + send_flush=send_flush, + # if autoplay=False playback will not start automatically + # instead 'buffer ready' will be called when the buffer is full + # to coordinate a start of multiple synced players + autostart=auto_play, + ) + # if queue is set to single track repeat, + # immediately set this track as the next + # this prevents race conditions with super short audio clips (on single repeat) + # https://github.com/music-assistant/hass-music-assistant/issues/2059 + if queue and queue.repeat_mode == RepeatMode.ONE: + self.mass.call_later( + 0.2, + self.client.play_url( + url=url, + mime_type=f"audio/{url.split('.')[-1].split('?')[0]}", + metadata=metadata, + enqueue=True, + send_flush=False, + autostart=True, + ), + ) - await client.play_url(url, metadata=metadata, auto_play=auto_play) + def _handle_player_heartbeat(self) -> None: + """Process SlimClient elapsed_time update.""" + if self.client.state == SlimPlayerState.STOPPED: + # ignore server heartbeats when stopped + return + # elapsed time change on the player will be auto picked up + # by the player manager. + self._attr_elapsed_time = self.client.elapsed_seconds + self._attr_elapsed_time_last_updated = time.time() - def _get_sync_clients(self) -> Iterator[SlimClient]: - """Get all sync clients for a player.""" - yield self.client - for member_id in self.group_members: - yield self.provider.slimproto.get_player(member_id) + # handle sync + if self.synced_to: + self._handle_sync() - async def _handle_multi_client_stream( - self, media: PlayerMedia, master_audio_format: AudioFormat - ) -> None: - """Handle multi-client stream for sync groups.""" - # This would need implementation of the multi-client streaming logic - # For now, simplified implementation - sync_clients = list(self.provider._get_sync_clients(self.player_id)) + async def _handle_buffer_ready(self) -> None: + """ + Handle buffer ready event, player has buffered a (new) track. - # Play on all sync clients + Only used when autoplay=0 for coordinated start of synced players. + """ + if self.synced_to: + # unpause of sync child is handled by sync master + return + if not self.group_members: + # not a sync group, continue + await self.client.unpause_at(self.client.jiffies) + return + count = 0 + while count < 40: + childs_total = 0 + childs_ready = 0 + await asyncio.sleep(0.2) + for sync_child in self._get_sync_clients(): + childs_total += 1 + if sync_child.state == SlimPlayerState.BUFFER_READY: + childs_ready += 1 + if childs_total == childs_ready: + break + + # all child's ready (or timeout) - start play async with TaskManager(self.mass) as tg: - for slimclient in sync_clients: - tg.create_task( - self._handle_play_url( - slimclient, - media.uri, - media, - send_flush=True, - auto_play=False, - ) - ) + for sync_client in self._get_sync_clients(): + # NOTE: Officially you should do an unpause_at based on the player timestamp + # but I did not have any good results with that. + # Instead just start playback on all players and let the sync logic work out + # the delays etc. + tg.create_task(sync_client.pause_for(200)) + + async def _handle_player_cli_event(self, event: SlimEvent) -> None: + """Process CLI Event.""" + if not event.data: + return + # event data is str, not dict + # TODO: fix this in the aioslimproto lib + event_data = cast("str", event.data) + queue = self.mass.player_queues.get_active_queue(self.player_id) + if event_data.startswith("button preset_") and event_data.endswith(".single"): + preset_id = event_data.split("preset_")[1].split(".")[0] + preset_index = int(preset_id) - 1 + if len(self.client.presets) >= preset_index + 1: + preset = self.client.presets[preset_index] + await self.mass.player_queues.play_media(queue.queue_id, preset.uri) + elif event_data == "button repeat": + if queue.repeat_mode == RepeatMode.OFF: + repeat_mode = RepeatMode.ONE + elif queue.repeat_mode == RepeatMode.ONE: + repeat_mode = RepeatMode.ALL + else: + repeat_mode = RepeatMode.OFF + self.mass.player_queues.set_repeat(queue.queue_id, repeat_mode) + self.client.extra_data["playlist repeat"] = REPEATMODE_MAP[queue.repeat_mode] + self.client.signal_update() + elif event.data == "button shuffle": + self.mass.player_queues.set_shuffle(queue.queue_id, not queue.shuffle_enabled) + self.client.extra_data["playlist shuffle"] = int(queue.shuffle_enabled) + self.client.signal_update() + elif event_data in ("button jump_fwd", "button fwd"): + await self.mass.player_queues.next(queue.queue_id) + elif event_data in ("button jump_rew", "button rew"): + await self.mass.player_queues.previous(queue.queue_id) + elif event_data.startswith("time "): + # seek request + _, param = event_data.split(" ", 1) + if param.isnumeric(): + await self.mass.player_queues.seek(queue.queue_id, int(param)) + self.logger.debug("CLI Event: %s", event_data) + + def _handle_sync(self) -> None: + """Synchronize audio of a sync slimplayer.""" + sync_master_id = self.synced_to + if not sync_master_id: + # we only correct sync members, not the sync master itself + return + if not (sync_master := self.provider.slimproto.get_player(sync_master_id)): + return # just here as a guard as bad things can happen - async def _handle_multi_client_enqueue(self, media: PlayerMedia) -> None: - """Handle multi-client enqueue for sync groups.""" - sync_clients = list(self.provider._get_sync_clients(self.player_id)) + if sync_master.state != SlimPlayerState.PLAYING: + return + if self.client.state != SlimPlayerState.PLAYING: + return - # Enqueue on all sync clients - async with TaskManager(self.mass) as tg: - for slimclient in sync_clients: - tg.create_task( - self._handle_play_url( - slimclient, - media.uri, - media, - send_flush=False, - auto_play=True, + # we collect a few playpoints of the player to determine + # average lag/drift so we can adjust accordingly + sync_playpoints = self._sync_playpoints + + now = time.time() + if now < self._do_not_resync_before: + return + + last_playpoint = sync_playpoints[-1] if sync_playpoints else None + if last_playpoint and (now - last_playpoint.timestamp) > 10: + # last playpoint is too old, invalidate + sync_playpoints.clear() + if last_playpoint and last_playpoint.sync_master != sync_master.player_id: + # this should not happen, but just in case + sync_playpoints.clear() + + diff = int( + self.provider.get_corrected_elapsed_milliseconds(sync_master) + - self.provider.get_corrected_elapsed_milliseconds(self.client) + ) + + sync_playpoints.append(SyncPlayPoint(now, sync_master.player_id, diff)) + + # ignore unexpected spikes + if ( + sync_playpoints + and abs(statistics.fmean(abs(x.diff) for x in sync_playpoints) - abs(diff)) + > DEVIATION_JUMP_IGNORE + ): + return + + min_req_playpoints = 2 if sync_master.elapsed_seconds < 2 else MIN_REQ_PLAYPOINTS + if len(sync_playpoints) < min_req_playpoints: + return + + # get the average diff + avg_diff = statistics.fmean(x.diff for x in sync_playpoints) + delta = int(abs(avg_diff)) + + if delta < MIN_DEVIATION_ADJUST: + return + + # resync the player by skipping ahead or pause for x amount of (milli)seconds + sync_playpoints.clear() + self._do_not_resync_before = now + 5 + if avg_diff > MAX_SKIP_AHEAD_MS: + # player lagging behind more than MAX_SKIP_AHEAD_MS, + # we need to correct the sync_master + self.logger.debug("%s resync: pauseFor %sms", sync_master.name, delta) + self.mass.create_task(sync_master.pause_for(delta)) + elif avg_diff > 0: + # handle player lagging behind, fix with skip_ahead + self.logger.debug("%s resync: skipAhead %sms", self.display_name, delta) + self.mass.create_task(self.client.skip_over(delta)) + else: + # handle player is drifting too far ahead, use pause_for to adjust + self.logger.debug("%s resync: pauseFor %sms", self.display_name, delta) + self.mass.create_task(self.client.pause_for(delta)) + + async def _set_preset_items(self) -> None: + """Set the presets for a player.""" + preset_items: list[SlimPreset] = [] + for preset_index in range(1, 11): + if preset_conf := self.mass.config.get_raw_player_config_value( + self.player_id, f"preset_{preset_index}" + ): + try: + media_item = await self.mass.music.get_item_by_uri(preset_conf) + preset_items.append( + SlimPreset( + uri=media_item.uri, + text=media_item.name, + icon=self.mass.metadata.get_image_url(media_item.image), + ) ) - ) + except MusicAssistantError: + # non-existing media item or some other edge case + preset_items.append( + SlimPreset( + uri=f"preset_{preset_index}", + text=f"ERROR ", + icon="", + ) + ) + else: + break + self.client.presets = preset_items + + async def _set_display(self) -> None: + """Set the display config for a player.""" + display_enabled = self.mass.config.get_raw_player_config_value( + self.player_id, + CONF_ENTRY_DISPLAY.key, + CONF_ENTRY_DISPLAY.default_value, + ) + visualization = self.mass.config.get_raw_player_config_value( + self.player_id, + CONF_ENTRY_VISUALIZATION.key, + CONF_ENTRY_VISUALIZATION.default_value, + ) + await self.client.configure_display( + visualisation=SlimVisualisationType(visualization), disabled=not display_enabled + ) + + def _get_sync_clients(self) -> Iterator[SlimClient]: + """Get all sync clients for a player.""" + yield self.client + for member_id in self.group_members: + if slimplayer := self.provider.slimproto.get_player(member_id): + yield slimplayer diff --git a/music_assistant/providers/squeezelite/provider.py b/music_assistant/providers/squeezelite/provider.py index f5afd6b8..9293a970 100644 --- a/music_assistant/providers/squeezelite/provider.py +++ b/music_assistant/providers/squeezelite/provider.py @@ -4,13 +4,15 @@ from __future__ import annotations import logging from dataclasses import dataclass -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, cast +from aioslimproto.models import EventType as SlimEventType +from aioslimproto.models import SlimEvent from aioslimproto.server import SlimServer from music_assistant_models.enums import ProviderFeature from music_assistant_models.errors import SetupFailedError -from music_assistant.constants import CONF_PORT, VERBOSE_LOG_LEVEL +from music_assistant.constants import CONF_PORT, CONF_SYNC_ADJUST, VERBOSE_LOG_LEVEL from music_assistant.helpers.util import is_port_in_use from music_assistant.models.player_provider import PlayerProvider @@ -20,7 +22,6 @@ from .player import SqueezelitePlayer if TYPE_CHECKING: from aioslimproto.client import SlimClient - from aioslimproto.models import EventType as SlimEventType @dataclass @@ -90,7 +91,7 @@ class SqueezelitePlayerProvider(PlayerProvider): async def loaded_in_mass(self) -> None: """Call after the provider has been loaded.""" await super().loaded_in_mass() - self.slimproto.subscribe(self._client_callback) + self.slimproto.subscribe(self._handle_slimproto_event) self.mass.streams.register_dynamic_route( "/slimproto/multi", self._serve_multi_client_stream ) @@ -109,31 +110,37 @@ class SqueezelitePlayerProvider(PlayerProvider): self.mass.streams.unregister_dynamic_route("/slimproto/multi") self.mass.streams.unregister_dynamic_route("/jsonrpc.js") - async def _player_join(self, slimplayer: SlimClient) -> None: - """Handle player joining the slimproto server.""" - player_id = slimplayer.player_id - if player_id in self._players: - return - - self.logger.debug("Player %s joined the server", player_id) + def get_corrected_elapsed_milliseconds(self, slimplayer: SlimClient) -> int: + """Return corrected elapsed milliseconds for a slimplayer.""" + sync_delay = self.mass.config.get_raw_player_config_value( + slimplayer.player_id, CONF_SYNC_ADJUST, 0 + ) + return slimplayer.elapsed_milliseconds - sync_delay - # Create SqueezelitePlayer instance - player = SqueezelitePlayer(self, player_id, slimplayer) - self._players[player_id] = player + def _handle_slimproto_event( + self, + event: SlimEvent, + ) -> None: + if self.mass.closing: + return - # Register with Music Assistant - await player.setup() + # handle new player connect (or reconnect of existing player) + if event.type == SlimEventType.PLAYER_CONNECTED: + if not (slimclient := self.slimproto.get_player(event.player_id)): + return # should not happen, but guard anyways + player = SqueezelitePlayer(self, event.player_id, slimclient) + self.mass.create_task(player.setup()) + return - async def _player_leave(self, player_id: str) -> None: - """Handle player leaving the slimproto server.""" - self.logger.debug("Player %s left the server", player_id) + if not (player := self.mass.players.get(event.player_id)): + return # guard for unknown player + if TYPE_CHECKING: + player = cast("SqueezelitePlayer", player) - if self._players.pop(player_id, None): - if mass_player := self.mass.players.get(player_id): - mass_player.available = False - self.mass.players.update(player_id) + # handle player disconnect + if event.type == SlimEventType.PLAYER_DISCONNECTED: + self.mass.create_task(self.mass.players.unregister(player.player_id)) + return - async def _player_update(self, player_id: str, event: SlimEventType) -> None: - """Handle player update from slimproto server.""" - if player := self._players.get(player_id): - await player.handle_slim_event(event) + # forward all other events to the player itself + player.handle_slim_event(event) -- 2.34.1