From: Marcel van der Veldt Date: Mon, 12 Jun 2023 22:08:27 +0000 (+0200) Subject: Fix several issues with Universal Player Group provider (#711) X-Git-Url: https://git.kitaultman.com/?a=commitdiff_plain;h=ba1472a6abdf388c72bfa8b6371d0d785354b63a;p=music-assistant-server.git Fix several issues with Universal Player Group provider (#711) * fix hidden_by field * optimize some code * Simplify and fix universal group players --- diff --git a/music_assistant/common/helpers/util.py b/music_assistant/common/helpers/util.py index 0c8bee14..de082794 100755 --- a/music_assistant/common/helpers/util.py +++ b/music_assistant/common/helpers/util.py @@ -231,21 +231,36 @@ def get_changed_keys( ignore_keys: list[str] | None = None, ) -> set[str]: """Compare 2 dicts and return set of changed keys.""" + return get_changed_values(dict1, dict2, ignore_keys).keys() + + +def get_changed_values( + dict1: dict[str, Any], + dict2: dict[str, Any], + ignore_keys: list[str] | None = None, +) -> dict[str, tuple[Any, Any]]: + """ + Compare 2 dicts and return dict of changed values. + + dict key is the changed key, value is tuple of old and new values. + """ + if not dict1 and not dict2: + return {} if not dict1: - return set(dict2.keys()) + return {key: (None, value) for key, value in dict2.items()} if not dict2: - return set(dict1.keys()) - changed_keys = set() + return {key: (None, value) for key, value in dict1.items()} + changed_values = {} for key, value in dict2.items(): if ignore_keys and key in ignore_keys: continue if key not in dict1: - changed_keys.add(key) + changed_values[key] = (None, value) elif isinstance(value, dict): - changed_keys.update(get_changed_keys(dict1[key], value, ignore_keys)) + changed_values.update(get_changed_values(dict1[key], value, ignore_keys)) elif dict1[key] != value: - changed_keys.add(key) - return changed_keys + changed_values[key] = (dict1[key], value) + return changed_values def empty_queue(q: asyncio.Queue) -> None: diff --git a/music_assistant/server/controllers/config.py b/music_assistant/server/controllers/config.py index c1aa09d7..34411034 100644 --- a/music_assistant/server/controllers/config.py +++ b/music_assistant/server/controllers/config.py @@ -360,6 +360,9 @@ class ConfigController: # signal update to the player manager with suppress(PlayerUnavailableError, AttributeError): player = self.mass.players.get(config.player_id) + if config.enabled: + player_prov = self.mass.players.get_player_provider(player_id) + await player_prov.poll_player(player_id) player.enabled = config.enabled self.mass.players.update(config.player_id, force_update=True) diff --git a/music_assistant/server/controllers/player_queues.py b/music_assistant/server/controllers/player_queues.py index 84f1fed6..5af41f29 100755 --- a/music_assistant/server/controllers/player_queues.py +++ b/music_assistant/server/controllers/player_queues.py @@ -5,7 +5,7 @@ import logging import random import time from collections.abc import AsyncGenerator -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Any from music_assistant.common.helpers.util import get_changed_keys from music_assistant.common.models.enums import ( @@ -517,7 +517,9 @@ class PlayerQueuesController: # always call update to calculate state etc self.on_player_update(player, {}) - def on_player_update(self, player: Player, changed_keys: set[str]) -> None: + def on_player_update( + self, player: Player, changed_values: dict[str, tuple[Any, Any]] # noqa: ARG002 + ) -> None: """Call when a PlayerQueue needs to be updated (e.g. when player updates).""" if player.player_id not in self._queues: # race condition diff --git a/music_assistant/server/controllers/players.py b/music_assistant/server/controllers/players.py index 952b5905..aa1a870a 100755 --- a/music_assistant/server/controllers/players.py +++ b/music_assistant/server/controllers/players.py @@ -6,7 +6,7 @@ import logging from collections.abc import Iterator from typing import TYPE_CHECKING, cast -from music_assistant.common.helpers.util import get_changed_keys +from music_assistant.common.helpers.util import get_changed_values from music_assistant.common.models.enums import ( EventType, PlayerFeature, @@ -186,25 +186,10 @@ class PlayerController: player.state = PlayerState.IDLE elif not player.powered: player.state = PlayerState.OFF - # handle automatic hiding of group child's feature - for group_player in self._get_player_groups(player_id): - try: - hide_group_childs = self.mass.config.get_raw_player_config_value( - group_player.player_id, CONF_HIDE_GROUP_CHILDS, "active" - ) - except KeyError: - continue - if hide_group_childs == "always": - player.hidden_by.add(group_player.player_id) - elif group_player.powered: - if hide_group_childs == "active": - player.hidden_by.add(group_player.player_id) - elif group_player.player_id in player.hidden_by: - player.hidden_by.remove(group_player.player_id) # basic throttle: do not send state changed events if player did not actually change prev_state = self._prev_states.get(player_id, {}) new_state = self._players[player_id].to_dict() - changed_keys = get_changed_keys( + changed_values = get_changed_values( prev_state, new_state, ignore_keys=["elapsed_time", "elapsed_time_last_updated", "seq_no"], @@ -216,9 +201,9 @@ class PlayerController: return # always signal update to the playerqueue - self.queues.on_player_update(player, changed_keys) + self.queues.on_player_update(player, changed_values) - if len(changed_keys) == 0 and not force_update: + if len(changed_values) == 0 and not force_update: return self.mass.signal_event(EventType.PLAYER_UPDATED, object_id=player_id, data=player) @@ -227,15 +212,25 @@ class PlayerController: return if player.type == PlayerType.GROUP: # update group player child's when parent updates - for child_player_id in player.group_childs: - if child_player_id == player_id: + hide_group_childs = self.mass.config.get_raw_player_config_value( + player.player_id, CONF_HIDE_GROUP_CHILDS, "active" + ) + for child_player in self._get_child_players(player): + if child_player.player_id == player.player_id: continue - self.update(child_player_id, skip_forward=True) + # handle 'hide group childs' feature here + if hide_group_childs == "always": # noqa: SIM114 + child_player.hidden_by.add(player.player_id) + elif player.powered and hide_group_childs == "active": + child_player.hidden_by.add(player.player_id) + elif not player.powered and player.player_id in child_player.hidden_by: + child_player.hidden_by.remove(player.player_id) + self.update(child_player.player_id, skip_forward=True) # update group player(s) when child updates for group_player in self._get_player_groups(player_id): player_prov = self.get_player_provider(group_player.player_id) - player_prov.on_child_state(group_player.player_id, player, changed_keys) + player_prov.on_child_state(group_player.player_id, player, changed_values) def get_player_provider(self, player_id: str) -> PlayerProvider: """Return PlayerProvider for given player.""" @@ -460,6 +455,7 @@ class PlayerController: if child_player.state == PlayerState.PLAYING: await self.cmd_stop(player_id) # all checks passed, forward command to the player provider + child_player.hidden_by.add(target_player) player_provider = self.get_player_provider(player_id) await player_provider.cmd_sync(player_id, target_player) @@ -484,6 +480,8 @@ class PlayerController: return # all checks passed, forward command to the player provider + if player.synced_to in player.hidden_by: + player.hidden_by.remove(player.synced_to) player_provider = self.get_player_provider(player_id) await player_provider.cmd_unsync(player_id) @@ -504,7 +502,7 @@ class PlayerController: def _get_player_groups(self, player_id: str) -> tuple[Player, ...]: """Return all (player_ids of) any groupplayers the given player belongs to.""" - return tuple(x for x in self if player_id in x.group_childs) + return tuple(x for x in self if x.type == PlayerType.GROUP and player_id in x.group_childs) def _get_active_source(self, player: Player) -> str: """Return the active_source id for given player.""" diff --git a/music_assistant/server/models/player_provider.py b/music_assistant/server/models/player_provider.py index e7f6b610..6127da97 100644 --- a/music_assistant/server/models/player_provider.py +++ b/music_assistant/server/models/player_provider.py @@ -2,7 +2,7 @@ from __future__ import annotations from abc import abstractmethod -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Any from music_assistant.common.models.player import Player from music_assistant.common.models.queue_item import QueueItem @@ -141,7 +141,9 @@ class PlayerProvider(Provider): If the player does not need any polling, simply do not override this method. """ - def on_child_state(self, player_id: str, child_player: Player, changed_keys: set[str]) -> None: + def on_child_state( + self, player_id: str, child_player: Player, changed_values: dict[str, tuple[Any, Any]] + ) -> None: """Call when the state of a child player updates.""" # default implementation: simply update the state of the group player self.mass.players.update(player_id, skip_forward=True) diff --git a/music_assistant/server/providers/slimproto/__init__.py b/music_assistant/server/providers/slimproto/__init__.py index 654096a1..1107eeff 100644 --- a/music_assistant/server/providers/slimproto/__init__.py +++ b/music_assistant/server/providers/slimproto/__init__.py @@ -483,7 +483,8 @@ class SlimprotoProvider(PlayerProvider): # playback needs to be restarted to get all players in sync # TODO: If there is any need, we could make this smarter where the new # sync child waits for the next track. - await self.mass.players.queues.resume(parent_player.player_id) + active_queue = self.mass.players.queues.get_active_queue(parent_player.player_id) + await self.mass.players.queues.resume(active_queue.queue_id) async def cmd_unsync(self, player_id: str) -> None: """Handle UNSYNC command for given player.""" diff --git a/music_assistant/server/providers/ugp/__init__.py b/music_assistant/server/providers/ugp/__init__.py new file mode 100644 index 00000000..46792a9f --- /dev/null +++ b/music_assistant/server/providers/ugp/__init__.py @@ -0,0 +1,399 @@ +""" +Universal Group Player provider. + +This is more like a "virtual" player provider, +allowing the user to create player groups from all players known in the system. +""" +from __future__ import annotations + +import asyncio +from typing import TYPE_CHECKING, Any + +from music_assistant.common.models.config_entries import ( + CONF_ENTRY_FLOW_MODE, + CONF_ENTRY_GROUPED_POWER_ON, + CONF_ENTRY_HIDE_GROUP_MEMBERS, + CONF_ENTRY_OUTPUT_CHANNELS, + ConfigEntry, + ConfigValueOption, + ConfigValueType, +) +from music_assistant.common.models.enums import ( + ConfigEntryType, + PlayerFeature, + PlayerState, + PlayerType, +) +from music_assistant.common.models.player import DeviceInfo, Player +from music_assistant.common.models.queue_item import QueueItem +from music_assistant.constants import CONF_GROUPED_POWER_ON +from music_assistant.server.models.player_provider import PlayerProvider + +if TYPE_CHECKING: + from music_assistant.common.models.config_entries import ProviderConfig + from music_assistant.common.models.provider import ProviderManifest + from music_assistant.server import MusicAssistant + from music_assistant.server.models import ProviderInstanceType + + +CONF_GROUP_MEMBERS = "group_members" + +CONF_ENTRY_OUTPUT_CHANNELS_FORCED_STEREO = ConfigEntry.from_dict( + { + **CONF_ENTRY_OUTPUT_CHANNELS.to_dict(), + "hidden": True, + "default_value": "stereo", + "value": "stereo", + } +) +CONF_ENTRY_FORCED_FLOW_MODE = ConfigEntry.from_dict( + {**CONF_ENTRY_FLOW_MODE.to_dict(), "default_value": True, "value": True} +) +# ruff: noqa: ARG002 + + +async def setup( + mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig +) -> ProviderInstanceType: + """Initialize provider(instance) with given configuration.""" + prov = UniversalGroupProvider(mass, manifest, config) + await prov.handle_setup() + return prov + + +async def get_config_entries( + mass: MusicAssistant, + instance_id: str | None = None, + action: str | None = None, + values: dict[str, ConfigValueType] | None = None, +) -> tuple[ConfigEntry, ...]: + """ + Return Config entries to setup this provider. + + instance_id: id of an existing provider instance (None if new instance setup). + action: [optional] action key called from config entries UI. + values: the (intermediate) raw values for config entries sent with the action. + """ + # ruff: noqa: ARG001 + # dynamically extend the amount of entries when needed + if values.get("ugp_15"): + player_count = 20 + elif values.get("ugp_10"): + player_count = 15 + elif values.get("ugp_5"): + player_count = 10 + else: + player_count = 5 + player_entries = tuple( + ConfigEntry( + key=f"ugp_{index}", + type=ConfigEntryType.STRING, + label=f"Group player {index}: Group members", + default_value=[], + options=tuple( + ConfigValueOption(x.display_name, x.player_id) + for x in mass.players.all(True, True, False) + if x.player_id != f"ugp_{index}" + ), + description="Select all players you want to be part of this group", + multi_value=True, + required=False, + ) + for index in range(1, player_count + 1) + ) + return player_entries + + +class UniversalGroupProvider(PlayerProvider): + """Base/builtin provider for universally grouping players.""" + + prev_sync_leaders: dict[str, tuple[str]] | None = None + + async def handle_setup(self) -> None: + """Handle async initialization of the provider.""" + self.prev_sync_leaders = {} + + for index in range(1, 100): + conf_key = f"ugp_{index}" + try: + player_conf = self.config.get_value(conf_key) + except KeyError: + break + if player_conf == []: + # cleanup player config if player config is removed/reset + self.mass.players.remove(conf_key) + continue + elif not player_conf: + continue + + player = Player( + player_id=conf_key, + provider=self.domain, + type=PlayerType.GROUP, + name=f"{self.name}: {index}", + available=True, + powered=False, + device_info=DeviceInfo(model=self.manifest.name, manufacturer="Music Assistant"), + # TODO: derive playerfeatures from (all) underlying child players? + supported_features=( + PlayerFeature.POWER, + PlayerFeature.PAUSE, + PlayerFeature.VOLUME_SET, + PlayerFeature.VOLUME_MUTE, + PlayerFeature.SET_MEMBERS, + ), + active_source=conf_key, + group_childs=player_conf, + ) + player.extra_data["optimistic_state"] = PlayerState.IDLE + self.prev_sync_leaders[conf_key] = None + self.mass.players.register_or_update(player) + + async def get_player_config_entries(self, player_id: str) -> tuple[ConfigEntry]: # noqa: ARG002 + """Return all (provider/player specific) Config Entries for the given player (if any).""" + return ( + CONF_ENTRY_HIDE_GROUP_MEMBERS, + CONF_ENTRY_GROUPED_POWER_ON, + CONF_ENTRY_OUTPUT_CHANNELS_FORCED_STEREO, + CONF_ENTRY_FORCED_FLOW_MODE, + ) + + async def cmd_stop(self, player_id: str) -> None: + """Send STOP command to given player.""" + group_player = self.mass.players.get(player_id) + group_player.extra_data["optimistic_state"] = PlayerState.IDLE + # forward command to player and any connected sync child's + async with asyncio.TaskGroup() as tg: + for member in self._get_active_members( + player_id, only_powered=True, skip_sync_childs=True + ): + if member.state == PlayerState.IDLE: + continue + tg.create_task(self.mass.players.cmd_stop(member.player_id)) + + async def cmd_play(self, player_id: str) -> None: + """Send PLAY command to given player.""" + group_player = self.mass.players.get(player_id) + group_player.extra_data["optimistic_state"] = PlayerState.PLAYING + async with asyncio.TaskGroup() as tg: + for member in self._get_active_members( + player_id, only_powered=True, skip_sync_childs=True + ): + tg.create_task(self.mass.players.cmd_play(member.player_id)) + + async def cmd_play_media( + self, + player_id: str, + queue_item: QueueItem, + seek_position: int = 0, + fade_in: bool = False, + flow_mode: bool = False, + ) -> None: + """Send PLAY MEDIA command to given player. + + This is called when the Queue wants the player to start playing a specific QueueItem. + The player implementation can decide how to process the request, such as playing + queue items one-by-one or enqueue all/some items. + + - player_id: player_id of the player to handle the command. + - queue_item: the QueueItem to start playing on the player. + - seek_position: start playing from this specific position. + - fade_in: fade in the music at start (e.g. at resume). + """ + # send stop first + await self.cmd_stop(player_id) + # power ON + await self.cmd_power(player_id, True) + group_player = self.mass.players.get(player_id) + group_player.extra_data["optimistic_state"] = PlayerState.PLAYING + # forward command to all (powered) group child's + async with asyncio.TaskGroup() as tg: + for member in self._get_active_members( + player_id, only_powered=True, skip_sync_childs=True + ): + player_prov = self.mass.players.get_player_provider(member.player_id) + tg.create_task( + player_prov.cmd_play_media( + member.player_id, + queue_item=queue_item, + seek_position=seek_position, + fade_in=fade_in, + flow_mode=flow_mode, + ) + ) + + async def cmd_pause(self, player_id: str) -> None: + """Send PAUSE command to given player.""" + group_player = self.mass.players.get(player_id) + group_player.extra_data["optimistic_state"] = PlayerState.PAUSED + async with asyncio.TaskGroup() as tg: + for member in self._get_active_members( + player_id, only_powered=True, skip_sync_childs=True + ): + tg.create_task(self.mass.players.cmd_pause(member.player_id)) + + async def cmd_power(self, player_id: str, powered: bool) -> None: + """Send POWER command to given player.""" + group_power_on = await self.mass.config.get_player_config_value( + player_id, CONF_GROUPED_POWER_ON + ) + group_player = self.mass.players.get(player_id) + + async def set_child_power(child_player: Player) -> None: + await self.mass.players.cmd_power(child_player.player_id, powered) + # set optimistic state on child player to prevent race conditions in other actions + child_player.powered = powered + + if not powered or group_power_on: + # turn on/off child players + async with asyncio.TaskGroup() as tg: + for member in self._get_active_members( + player_id, only_powered=not powered, skip_sync_childs=False + ): + if member.powered == member: + continue + tg.create_task(set_child_power(member)) + + group_player.powered = powered + group_player.extra_data["optimistic_state"] = PlayerState.IDLE + self.mass.players.update(player_id) + if powered: + # sync all players on power on + await self._sync_players(player_id) + else: + group_player.extra_data["optimistic_state"] = PlayerState.OFF + + async def cmd_volume_set(self, player_id: str, volume_level: int) -> None: + """Send VOLUME_SET command to given player.""" + # group volume is already handled in the player manager + + async def cmd_volume_mute(self, player_id: str, muted: bool) -> None: + """Send VOLUME MUTE command to given player.""" + + async def poll_player(self, player_id: str) -> None: + """Poll player for state updates.""" + self.update_attributes(player_id) + self.mass.players.update(player_id, skip_forward=True) + + def update_attributes(self, player_id: str) -> None: + """Update player attributes.""" + group_player = self.mass.players.get(player_id) + all_members = self._get_active_members( + player_id, only_powered=False, skip_sync_childs=False + ) + group_player.group_childs = list(x.player_id for x in all_members) + # read the state from the first powered child player + for member in all_members: + if member.synced_to: + continue + if not member.powered: + continue + if member.state not in (PlayerState.PLAYING, PlayerState.PAUSED): + continue + group_player.current_item_id = member.current_item_id + group_player.current_url = member.current_url + group_player.elapsed_time = member.elapsed_time + group_player.elapsed_time_last_updated = member.elapsed_time_last_updated + group_player.state = member.state + break + else: + group_player.state = PlayerState.IDLE + group_player.current_item_id = None + group_player.current_url = None + + def on_child_state( + self, player_id: str, child_player: Player, changed_values: dict[str, tuple[Any, Any]] + ) -> None: + """Call when the state of a child player updates.""" + self.update_attributes(player_id) + group_player = self.mass.players.get(player_id) + self.mass.players.update(player_id, skip_forward=True) + if "powered" in changed_values and (prev_power := changed_values["powered"][0]) != ( + new_power := changed_values["powered"][1] + ): + powered_players = self._get_active_members(player_id, True, False) + if group_player.powered and prev_power is True and len(powered_players) == 0: + # the last player of a group turned off, turn off the group + self.mass.create_task(self.cmd_power, player_id, False) + # ruff: noqa: SIM114 + elif ( + new_power is True + and group_player.extra_data["optimistic_state"] == PlayerState.PLAYING + ): + # a child player turned ON while the group player is already playing + # we need to resync/resume + if group_player.state == PlayerState.PLAYING and ( + sync_leader := next( + ( + x + for x in child_player.can_sync_with + if x in self.prev_sync_leaders[player_id] + ), + None, + ) + ): + # prevent resume when player platform supports sync + # and one of its players is already playing + self.mass.create_task( + self.mass.players.cmd_sync, child_player.player_id, sync_leader + ) + else: + self.mass.create_task(self.mass.players.queues.resume, player_id) + elif ( + not child_player.powered + and group_player.extra_data["optimistic_state"] == PlayerState.PLAYING + and child_player.player_id in self.prev_sync_leaders[player_id] + ): + # a sync master player turned OFF while the group player + # should still be playing - we need to resync/resume + self.mass.create_task(self.mass.players.queues.resume, player_id) + + def _get_active_members( + self, player_id: str, only_powered: bool = False, skip_sync_childs: bool = True + ) -> list[Player]: + """Get (child) players attached to a grouped player.""" + child_players: list[Player] = [] + conf_members: list[str] = self.config.get_value(player_id) + ignore_ids = set() + for child_id in conf_members: + if child_player := self.mass.players.get(child_id, False): + if not (not only_powered or child_player.powered): + continue + if child_player.synced_to and skip_sync_childs: + continue + allowed_sources = [child_player.player_id, player_id] + conf_members + if child_player.active_source not in allowed_sources: + # edge case: the child player has another group already active! + continue + if child_player.synced_to and child_player.synced_to not in allowed_sources: + # edge case: the child player is already synced to another player + continue + child_players.append(child_player) + # handle edge case where a group is in the group and both the group + # and (one of its) child's are added to this universal group. + if child_player.type == PlayerType.GROUP: + ignore_ids.update( + x for x in child_player.group_childs if x != child_player.player_id + ) + return [x for x in child_players if x.player_id not in ignore_ids] + + async def _sync_players(self, player_id: str) -> None: + """Sync all (possible) players.""" + sync_leaders = set() + # TODO: sort members on sync master priority attribute ? + for member in self._get_active_members(player_id, only_powered=True): + if member.synced_to is not None: + continue + if not member.can_sync_with: + continue + # check if we can join this player to an already chosen sync leader + if existing_leader := next( + (x for x in member.can_sync_with if x in sync_leaders), None + ): + await self.mass.players.cmd_sync(member.player_id, existing_leader) + # set optimistic state to prevent race condition in play media + member.synced_to = existing_leader + continue + # pick this member as new sync leader + sync_leaders.add(member.player_id) + self.prev_sync_leaders[player_id] = tuple(sync_leaders) diff --git a/music_assistant/server/providers/ugp/manifest.json b/music_assistant/server/providers/ugp/manifest.json new file mode 100644 index 00000000..c8b38de3 --- /dev/null +++ b/music_assistant/server/providers/ugp/manifest.json @@ -0,0 +1,13 @@ +{ + "type": "player", + "domain": "ugp", + "name": "Universal Group Player", + "description": "Create Player Groups with your favorite players, regardless of type and model.", + "codeowners": ["@music-assistant"], + "requirements": [], + "documentation": "", + "multi_instance": false, + "builtin": false, + "load_by_default": false, + "icon": "mdi:mdi-speaker-multiple" +} diff --git a/music_assistant/server/providers/universal_group/__init__.py b/music_assistant/server/providers/universal_group/__init__.py deleted file mode 100644 index 736c8b1d..00000000 --- a/music_assistant/server/providers/universal_group/__init__.py +++ /dev/null @@ -1,353 +0,0 @@ -""" -Universal Group Player provider. - -This is more like a "virtual" player provider, -allowing the user to create player groups from all players known in the system. -""" -from __future__ import annotations - -import asyncio -from typing import TYPE_CHECKING - -from music_assistant.common.models.config_entries import ( - CONF_ENTRY_FLOW_MODE, - CONF_ENTRY_GROUPED_POWER_ON, - CONF_ENTRY_HIDE_GROUP_MEMBERS, - CONF_ENTRY_OUTPUT_CHANNELS, - ConfigEntry, - ConfigValueOption, - ConfigValueType, -) -from music_assistant.common.models.enums import ( - ConfigEntryType, - PlayerFeature, - PlayerState, - PlayerType, -) -from music_assistant.common.models.player import DeviceInfo, Player -from music_assistant.common.models.queue_item import QueueItem -from music_assistant.constants import CONF_GROUPED_POWER_ON, CONF_PROVIDERS -from music_assistant.server.models.player_provider import PlayerProvider - -if TYPE_CHECKING: - from music_assistant.common.models.config_entries import ProviderConfig - from music_assistant.common.models.provider import ProviderManifest - from music_assistant.server import MusicAssistant - from music_assistant.server.models import ProviderInstanceType - - -CONF_GROUP_MEMBERS = "group_members" - -CONF_ENTRY_OUTPUT_CHANNELS_FORCED_STEREO = ConfigEntry.from_dict( - { - **CONF_ENTRY_OUTPUT_CHANNELS.to_dict(), - "hidden": True, - "default_value": "stereo", - "value": "stereo", - } -) -CONF_ENTRY_FORCED_FLOW_MODE = ConfigEntry.from_dict( - {**CONF_ENTRY_FLOW_MODE.to_dict(), "default_value": True, "value": True} -) -SUPPORTS_NATIVE_SYNC = ("sonos",) -# ruff: noqa: ARG002 - - -async def setup( - mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig -) -> ProviderInstanceType: - """Initialize provider(instance) with given configuration.""" - prov = UniversalGroupProvider(mass, manifest, config) - await prov.handle_setup() - return prov - - -async def get_config_entries( - mass: MusicAssistant, - instance_id: str | None = None, - action: str | None = None, - values: dict[str, ConfigValueType] | None = None, -) -> tuple[ConfigEntry, ...]: - """ - Return Config entries to setup this provider. - - instance_id: id of an existing provider instance (None if new instance setup). - action: [optional] action key called from config entries UI. - values: the (intermediate) raw values for config entries sent with the action. - """ - # ruff: noqa: ARG001 - all_players = tuple( - ConfigValueOption(x.display_name, x.player_id) - for x in mass.players.all(True, True, False) - if x.player_id != instance_id - ) - return ( - ConfigEntry( - key=CONF_GROUP_MEMBERS, - type=ConfigEntryType.STRING, - label="Group members", - default_value=[], - options=all_players, - description="Select all players you want to be part of this group", - multi_value=True, - ), - ) - - -class UniversalGroupProvider(PlayerProvider): - """Base/builtin provider for universally grouping players.""" - - prev_sync_leaders: tuple[str] | None = None - optimistic_state: PlayerState | None = None - - async def handle_setup(self) -> None: - """Handle async initialization of the provider.""" - self.player = player = Player( - player_id=self.instance_id, - provider=self.domain, - type=PlayerType.GROUP, - name=self.name, - available=True, - powered=False, - device_info=DeviceInfo(model=self.manifest.name, manufacturer="Music Assistant"), - # TODO: derive playerfeatures from (all) underlying child players - supported_features=( - PlayerFeature.POWER, - PlayerFeature.PAUSE, - PlayerFeature.VOLUME_SET, - PlayerFeature.VOLUME_MUTE, - PlayerFeature.SET_MEMBERS, - ), - active_source=self.instance_id, - group_childs=self.config.get_value(CONF_GROUP_MEMBERS), - ) - self.mass.players.register_or_update(player) - - async def unload(self) -> None: - """Handle close/cleanup of the provider.""" - # cleanup player config if provider is removed - if self.mass.config.get(f"{CONF_PROVIDERS}/{self.instance_id}") is not None: - return - self.mass.players.remove(self.instance_id) - - async def get_player_config_entries(self, player_id: str) -> tuple[ConfigEntry]: # noqa: ARG002 - """Return all (provider/player specific) Config Entries for the given player (if any).""" - return ( - CONF_ENTRY_HIDE_GROUP_MEMBERS, - CONF_ENTRY_GROUPED_POWER_ON, - CONF_ENTRY_OUTPUT_CHANNELS_FORCED_STEREO, - CONF_ENTRY_FORCED_FLOW_MODE, - ) - - async def cmd_stop(self, player_id: str) -> None: - """Send STOP command to given player.""" - self.optimistic_state = PlayerState.IDLE - # forward command to player and any connected sync child's - async with asyncio.TaskGroup() as tg: - for member in self._get_active_members(only_powered=True, skip_sync_childs=True): - if member.state == PlayerState.IDLE: - continue - tg.create_task(self.mass.players.cmd_stop(member.player_id)) - - async def cmd_play(self, player_id: str) -> None: - """Send PLAY command to given player.""" - self.optimistic_state = PlayerState.PLAYING - async with asyncio.TaskGroup() as tg: - for member in self._get_active_members(only_powered=True, skip_sync_childs=True): - tg.create_task(self.mass.players.cmd_play(member.player_id)) - - async def cmd_play_media( - self, - player_id: str, - queue_item: QueueItem, - seek_position: int = 0, - fade_in: bool = False, - flow_mode: bool = False, - ) -> None: - """Send PLAY MEDIA command to given player. - - This is called when the Queue wants the player to start playing a specific QueueItem. - The player implementation can decide how to process the request, such as playing - queue items one-by-one or enqueue all/some items. - - - player_id: player_id of the player to handle the command. - - queue_item: the QueueItem to start playing on the player. - - seek_position: start playing from this specific position. - - fade_in: fade in the music at start (e.g. at resume). - """ - # send stop first - await self.cmd_stop(player_id) - # power ON - await self.cmd_power(player_id, True) - self.optimistic_state = PlayerState.PLAYING - # forward command to all (powered) group child's - async with asyncio.TaskGroup() as tg: - for member in self._get_active_members(only_powered=True, skip_sync_childs=True): - player_prov = self.mass.players.get_player_provider(member.player_id) - tg.create_task( - player_prov.cmd_play_media( - member.player_id, - queue_item=queue_item, - seek_position=seek_position, - fade_in=fade_in, - flow_mode=flow_mode, - ) - ) - - async def cmd_pause(self, player_id: str) -> None: - """Send PAUSE command to given player.""" - self.optimistic_state = PlayerState.PAUSED - async with asyncio.TaskGroup() as tg: - for member in self._get_active_members(only_powered=True, skip_sync_childs=True): - tg.create_task(self.mass.players.cmd_pause(member.player_id)) - - async def cmd_power(self, player_id: str, powered: bool) -> None: - """Send POWER command to given player.""" - group_power_on = await self.mass.config.get_player_config_value( - player_id, CONF_GROUPED_POWER_ON - ) - - async def set_child_power(child_player: Player) -> None: - await self.mass.players.cmd_power(child_player.player_id, powered) - # set optimistic state on child player to prevent race conditions in other actions - child_player.powered = powered - - if not powered or group_power_on: - # turn on/off child players - async with asyncio.TaskGroup() as tg: - for member in self._get_active_members( - only_powered=not powered, skip_sync_childs=False - ): - if member.powered == member: - continue - tg.create_task(set_child_power(member)) - - self.player.powered = powered - self.mass.players.update(self.instance_id) - if powered: - # sync all players on power on - await self._sync_players() - else: - self.optimistic_state = PlayerState.OFF - - async def cmd_volume_set(self, player_id: str, volume_level: int) -> None: - """Send VOLUME_SET command to given player.""" - - async def cmd_volume_mute(self, player_id: str, muted: bool) -> None: - """Send VOLUME MUTE command to given player.""" - - async def poll_player(self, player_id: str) -> None: - """Poll player for state updates.""" - self.update_attributes() - self.mass.players.update(player_id, skip_forward=True) - - def update_attributes(self) -> None: - """Update player attributes.""" - all_members = self._get_active_members(only_powered=False, skip_sync_childs=False) - self.player.group_childs = list(x.player_id for x in all_members) - # read the state from the first powered child player - for member in all_members: - if member.synced_to: - continue - if not member.powered: - continue - if member.state not in (PlayerState.PLAYING, PlayerState.PAUSED): - continue - self.player.current_item_id = member.current_item_id - self.player.current_url = member.current_url - self.player.elapsed_time = member.elapsed_time - self.player.elapsed_time_last_updated = member.elapsed_time_last_updated - self.player.state = member.state - break - else: - self.player.state = PlayerState.IDLE - self.player.current_item_id = None - self.player.current_url = None - - def on_child_state(self, player_id: str, child_player: Player, changed_keys: set[str]) -> None: - """Call when the state of a child player updates.""" - powered_players = self._get_active_members(True, False) - if "powered" in changed_keys: - if not child_player.powered and len(powered_players) == 0: - # the last player of a group turned off - # turn off the group - self.mass.create_task(self.cmd_power, player_id, False) - # ruff: noqa: SIM114 - elif child_player.powered and self.optimistic_state == PlayerState.PLAYING: - # a child player turned ON while the group player is already playing - # we need to resync/resume - if ( - child_player.provider in SUPPORTS_NATIVE_SYNC - and self.player.state == PlayerState.PLAYING - and ( - sync_leader := next( - (x for x in child_player.can_sync_with if x in self.prev_sync_leaders), - None, - ) - ) - ): - # prevent resume when ecosystem supports native sync - # and one of its players is already playing - self.mass.create_task(self.mass.players.cmd_sync, player_id, sync_leader) - else: - self.mass.create_task(self.mass.players.queues.resume, player_id) - elif ( - not child_player.powered - and self.optimistic_state == PlayerState.PLAYING - and child_player.player_id in self.prev_sync_leaders - ): - # a sync master player turned OFF while the group player - # should still be playing - we need to resync/resume - self.mass.create_task(self.mass.players.queues.resume, player_id) - self.update_attributes() - self.mass.players.update(player_id, skip_forward=True) - - def _get_active_members( - self, only_powered: bool = False, skip_sync_childs: bool = True - ) -> list[Player]: - """Get (child) players attached to a grouped player.""" - child_players: list[Player] = [] - conf_members: list[str] = self.config.get_value(CONF_GROUP_MEMBERS) - ignore_ids = set() - for child_id in conf_members: - if child_player := self.mass.players.get(child_id, False): - if not (not only_powered or child_player.powered): - continue - if child_player.synced_to and skip_sync_childs: - continue - allowed_sources = [child_player.player_id, self.instance_id] + conf_members - if child_player.active_source not in allowed_sources: - # edge case: the child player has another group already active! - continue - if child_player.synced_to and child_player.synced_to not in conf_members: - # edge case: the child player is already synced to another player - continue - child_players.append(child_player) - # handle edge case where a group is in the group and both the group - # and (one of its) child's are added to this universal group. - if child_player.type == PlayerType.GROUP: - ignore_ids.update( - x for x in child_player.group_childs if x != child_player.player_id - ) - return [x for x in child_players if x.player_id not in ignore_ids] - - async def _sync_players(self) -> None: - """Sync all (possible) players.""" - sync_leaders = set() - # TODO: sort members on sync master priority attribute ? - for member in self._get_active_members(only_powered=True): - if member.synced_to is not None: - continue - if not member.can_sync_with: - continue - # check if we can join this player to an already chosen sync leader - if existing_leader := next( - (x for x in member.can_sync_with if x in sync_leaders), None - ): - await self.mass.players.cmd_sync(member.player_id, existing_leader) - # set optimistic state to prevent race condition in play media - member.synced_to = existing_leader - continue - # pick this member as new sync leader - sync_leaders.add(member.player_id) - self.prev_sync_leaders = tuple(sync_leaders) diff --git a/music_assistant/server/providers/universal_group/manifest.json b/music_assistant/server/providers/universal_group/manifest.json deleted file mode 100644 index 3d613128..00000000 --- a/music_assistant/server/providers/universal_group/manifest.json +++ /dev/null @@ -1,13 +0,0 @@ -{ - "type": "player", - "domain": "universal_group", - "name": "Universal Group Player", - "description": "Create a Player Group with your favorite players, regardless of type and model.", - "codeowners": ["@music-assistant"], - "requirements": [], - "documentation": "", - "multi_instance": true, - "builtin": false, - "load_by_default": false, - "icon": "mdi:mdi-speaker-multiple" -}