ignore_keys: list[str] | None = None,
) -> set[str]:
"""Compare 2 dicts and return set of changed keys."""
+ return get_changed_values(dict1, dict2, ignore_keys).keys()
+
+
+def get_changed_values(
+ dict1: dict[str, Any],
+ dict2: dict[str, Any],
+ ignore_keys: list[str] | None = None,
+) -> dict[str, tuple[Any, Any]]:
+ """
+ Compare 2 dicts and return dict of changed values.
+
+ dict key is the changed key, value is tuple of old and new values.
+ """
+ if not dict1 and not dict2:
+ return {}
if not dict1:
- return set(dict2.keys())
+ return {key: (None, value) for key, value in dict2.items()}
if not dict2:
- return set(dict1.keys())
- changed_keys = set()
+ return {key: (None, value) for key, value in dict1.items()}
+ changed_values = {}
for key, value in dict2.items():
if ignore_keys and key in ignore_keys:
continue
if key not in dict1:
- changed_keys.add(key)
+ changed_values[key] = (None, value)
elif isinstance(value, dict):
- changed_keys.update(get_changed_keys(dict1[key], value, ignore_keys))
+ changed_values.update(get_changed_values(dict1[key], value, ignore_keys))
elif dict1[key] != value:
- changed_keys.add(key)
- return changed_keys
+ changed_values[key] = (dict1[key], value)
+ return changed_values
def empty_queue(q: asyncio.Queue) -> None:
# signal update to the player manager
with suppress(PlayerUnavailableError, AttributeError):
player = self.mass.players.get(config.player_id)
+ if config.enabled:
+ player_prov = self.mass.players.get_player_provider(player_id)
+ await player_prov.poll_player(player_id)
player.enabled = config.enabled
self.mass.players.update(config.player_id, force_update=True)
import random
import time
from collections.abc import AsyncGenerator
-from typing import TYPE_CHECKING
+from typing import TYPE_CHECKING, Any
from music_assistant.common.helpers.util import get_changed_keys
from music_assistant.common.models.enums import (
# always call update to calculate state etc
self.on_player_update(player, {})
- def on_player_update(self, player: Player, changed_keys: set[str]) -> None:
+ def on_player_update(
+ self, player: Player, changed_values: dict[str, tuple[Any, Any]] # noqa: ARG002
+ ) -> None:
"""Call when a PlayerQueue needs to be updated (e.g. when player updates)."""
if player.player_id not in self._queues:
# race condition
from collections.abc import Iterator
from typing import TYPE_CHECKING, cast
-from music_assistant.common.helpers.util import get_changed_keys
+from music_assistant.common.helpers.util import get_changed_values
from music_assistant.common.models.enums import (
EventType,
PlayerFeature,
player.state = PlayerState.IDLE
elif not player.powered:
player.state = PlayerState.OFF
- # handle automatic hiding of group child's feature
- for group_player in self._get_player_groups(player_id):
- try:
- hide_group_childs = self.mass.config.get_raw_player_config_value(
- group_player.player_id, CONF_HIDE_GROUP_CHILDS, "active"
- )
- except KeyError:
- continue
- if hide_group_childs == "always":
- player.hidden_by.add(group_player.player_id)
- elif group_player.powered:
- if hide_group_childs == "active":
- player.hidden_by.add(group_player.player_id)
- elif group_player.player_id in player.hidden_by:
- player.hidden_by.remove(group_player.player_id)
# basic throttle: do not send state changed events if player did not actually change
prev_state = self._prev_states.get(player_id, {})
new_state = self._players[player_id].to_dict()
- changed_keys = get_changed_keys(
+ changed_values = get_changed_values(
prev_state,
new_state,
ignore_keys=["elapsed_time", "elapsed_time_last_updated", "seq_no"],
return
# always signal update to the playerqueue
- self.queues.on_player_update(player, changed_keys)
+ self.queues.on_player_update(player, changed_values)
- if len(changed_keys) == 0 and not force_update:
+ if len(changed_values) == 0 and not force_update:
return
self.mass.signal_event(EventType.PLAYER_UPDATED, object_id=player_id, data=player)
return
if player.type == PlayerType.GROUP:
# update group player child's when parent updates
- for child_player_id in player.group_childs:
- if child_player_id == player_id:
+ hide_group_childs = self.mass.config.get_raw_player_config_value(
+ player.player_id, CONF_HIDE_GROUP_CHILDS, "active"
+ )
+ for child_player in self._get_child_players(player):
+ if child_player.player_id == player.player_id:
continue
- self.update(child_player_id, skip_forward=True)
+ # handle 'hide group childs' feature here
+ if hide_group_childs == "always": # noqa: SIM114
+ child_player.hidden_by.add(player.player_id)
+ elif player.powered and hide_group_childs == "active":
+ child_player.hidden_by.add(player.player_id)
+ elif not player.powered and player.player_id in child_player.hidden_by:
+ child_player.hidden_by.remove(player.player_id)
+ self.update(child_player.player_id, skip_forward=True)
# update group player(s) when child updates
for group_player in self._get_player_groups(player_id):
player_prov = self.get_player_provider(group_player.player_id)
- player_prov.on_child_state(group_player.player_id, player, changed_keys)
+ player_prov.on_child_state(group_player.player_id, player, changed_values)
def get_player_provider(self, player_id: str) -> PlayerProvider:
"""Return PlayerProvider for given player."""
if child_player.state == PlayerState.PLAYING:
await self.cmd_stop(player_id)
# all checks passed, forward command to the player provider
+ child_player.hidden_by.add(target_player)
player_provider = self.get_player_provider(player_id)
await player_provider.cmd_sync(player_id, target_player)
return
# all checks passed, forward command to the player provider
+ if player.synced_to in player.hidden_by:
+ player.hidden_by.remove(player.synced_to)
player_provider = self.get_player_provider(player_id)
await player_provider.cmd_unsync(player_id)
def _get_player_groups(self, player_id: str) -> tuple[Player, ...]:
"""Return all (player_ids of) any groupplayers the given player belongs to."""
- return tuple(x for x in self if player_id in x.group_childs)
+ return tuple(x for x in self if x.type == PlayerType.GROUP and player_id in x.group_childs)
def _get_active_source(self, player: Player) -> str:
"""Return the active_source id for given player."""
from __future__ import annotations
from abc import abstractmethod
-from typing import TYPE_CHECKING
+from typing import TYPE_CHECKING, Any
from music_assistant.common.models.player import Player
from music_assistant.common.models.queue_item import QueueItem
If the player does not need any polling, simply do not override this method.
"""
- def on_child_state(self, player_id: str, child_player: Player, changed_keys: set[str]) -> None:
+ def on_child_state(
+ self, player_id: str, child_player: Player, changed_values: dict[str, tuple[Any, Any]]
+ ) -> None:
"""Call when the state of a child player updates."""
# default implementation: simply update the state of the group player
self.mass.players.update(player_id, skip_forward=True)
# playback needs to be restarted to get all players in sync
# TODO: If there is any need, we could make this smarter where the new
# sync child waits for the next track.
- await self.mass.players.queues.resume(parent_player.player_id)
+ active_queue = self.mass.players.queues.get_active_queue(parent_player.player_id)
+ await self.mass.players.queues.resume(active_queue.queue_id)
async def cmd_unsync(self, player_id: str) -> None:
"""Handle UNSYNC command for given player."""
--- /dev/null
+"""
+Universal Group Player provider.
+
+This is more like a "virtual" player provider,
+allowing the user to create player groups from all players known in the system.
+"""
+from __future__ import annotations
+
+import asyncio
+from typing import TYPE_CHECKING, Any
+
+from music_assistant.common.models.config_entries import (
+ CONF_ENTRY_FLOW_MODE,
+ CONF_ENTRY_GROUPED_POWER_ON,
+ CONF_ENTRY_HIDE_GROUP_MEMBERS,
+ CONF_ENTRY_OUTPUT_CHANNELS,
+ ConfigEntry,
+ ConfigValueOption,
+ ConfigValueType,
+)
+from music_assistant.common.models.enums import (
+ ConfigEntryType,
+ PlayerFeature,
+ PlayerState,
+ PlayerType,
+)
+from music_assistant.common.models.player import DeviceInfo, Player
+from music_assistant.common.models.queue_item import QueueItem
+from music_assistant.constants import CONF_GROUPED_POWER_ON
+from music_assistant.server.models.player_provider import PlayerProvider
+
+if TYPE_CHECKING:
+ from music_assistant.common.models.config_entries import ProviderConfig
+ from music_assistant.common.models.provider import ProviderManifest
+ from music_assistant.server import MusicAssistant
+ from music_assistant.server.models import ProviderInstanceType
+
+
+CONF_GROUP_MEMBERS = "group_members"
+
+CONF_ENTRY_OUTPUT_CHANNELS_FORCED_STEREO = ConfigEntry.from_dict(
+ {
+ **CONF_ENTRY_OUTPUT_CHANNELS.to_dict(),
+ "hidden": True,
+ "default_value": "stereo",
+ "value": "stereo",
+ }
+)
+CONF_ENTRY_FORCED_FLOW_MODE = ConfigEntry.from_dict(
+ {**CONF_ENTRY_FLOW_MODE.to_dict(), "default_value": True, "value": True}
+)
+# ruff: noqa: ARG002
+
+
+async def setup(
+ mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
+) -> ProviderInstanceType:
+ """Initialize provider(instance) with given configuration."""
+ prov = UniversalGroupProvider(mass, manifest, config)
+ await prov.handle_setup()
+ return prov
+
+
+async def get_config_entries(
+ mass: MusicAssistant,
+ instance_id: str | None = None,
+ action: str | None = None,
+ values: dict[str, ConfigValueType] | None = None,
+) -> tuple[ConfigEntry, ...]:
+ """
+ Return Config entries to setup this provider.
+
+ instance_id: id of an existing provider instance (None if new instance setup).
+ action: [optional] action key called from config entries UI.
+ values: the (intermediate) raw values for config entries sent with the action.
+ """
+ # ruff: noqa: ARG001
+ # dynamically extend the amount of entries when needed
+ if values.get("ugp_15"):
+ player_count = 20
+ elif values.get("ugp_10"):
+ player_count = 15
+ elif values.get("ugp_5"):
+ player_count = 10
+ else:
+ player_count = 5
+ player_entries = tuple(
+ ConfigEntry(
+ key=f"ugp_{index}",
+ type=ConfigEntryType.STRING,
+ label=f"Group player {index}: Group members",
+ default_value=[],
+ options=tuple(
+ ConfigValueOption(x.display_name, x.player_id)
+ for x in mass.players.all(True, True, False)
+ if x.player_id != f"ugp_{index}"
+ ),
+ description="Select all players you want to be part of this group",
+ multi_value=True,
+ required=False,
+ )
+ for index in range(1, player_count + 1)
+ )
+ return player_entries
+
+
+class UniversalGroupProvider(PlayerProvider):
+ """Base/builtin provider for universally grouping players."""
+
+ prev_sync_leaders: dict[str, tuple[str]] | None = None
+
+ async def handle_setup(self) -> None:
+ """Handle async initialization of the provider."""
+ self.prev_sync_leaders = {}
+
+ for index in range(1, 100):
+ conf_key = f"ugp_{index}"
+ try:
+ player_conf = self.config.get_value(conf_key)
+ except KeyError:
+ break
+ if player_conf == []:
+ # cleanup player config if player config is removed/reset
+ self.mass.players.remove(conf_key)
+ continue
+ elif not player_conf:
+ continue
+
+ player = Player(
+ player_id=conf_key,
+ provider=self.domain,
+ type=PlayerType.GROUP,
+ name=f"{self.name}: {index}",
+ available=True,
+ powered=False,
+ device_info=DeviceInfo(model=self.manifest.name, manufacturer="Music Assistant"),
+ # TODO: derive playerfeatures from (all) underlying child players?
+ supported_features=(
+ PlayerFeature.POWER,
+ PlayerFeature.PAUSE,
+ PlayerFeature.VOLUME_SET,
+ PlayerFeature.VOLUME_MUTE,
+ PlayerFeature.SET_MEMBERS,
+ ),
+ active_source=conf_key,
+ group_childs=player_conf,
+ )
+ player.extra_data["optimistic_state"] = PlayerState.IDLE
+ self.prev_sync_leaders[conf_key] = None
+ self.mass.players.register_or_update(player)
+
+ async def get_player_config_entries(self, player_id: str) -> tuple[ConfigEntry]: # noqa: ARG002
+ """Return all (provider/player specific) Config Entries for the given player (if any)."""
+ return (
+ CONF_ENTRY_HIDE_GROUP_MEMBERS,
+ CONF_ENTRY_GROUPED_POWER_ON,
+ CONF_ENTRY_OUTPUT_CHANNELS_FORCED_STEREO,
+ CONF_ENTRY_FORCED_FLOW_MODE,
+ )
+
+ async def cmd_stop(self, player_id: str) -> None:
+ """Send STOP command to given player."""
+ group_player = self.mass.players.get(player_id)
+ group_player.extra_data["optimistic_state"] = PlayerState.IDLE
+ # forward command to player and any connected sync child's
+ async with asyncio.TaskGroup() as tg:
+ for member in self._get_active_members(
+ player_id, only_powered=True, skip_sync_childs=True
+ ):
+ if member.state == PlayerState.IDLE:
+ continue
+ tg.create_task(self.mass.players.cmd_stop(member.player_id))
+
+ async def cmd_play(self, player_id: str) -> None:
+ """Send PLAY command to given player."""
+ group_player = self.mass.players.get(player_id)
+ group_player.extra_data["optimistic_state"] = PlayerState.PLAYING
+ async with asyncio.TaskGroup() as tg:
+ for member in self._get_active_members(
+ player_id, only_powered=True, skip_sync_childs=True
+ ):
+ tg.create_task(self.mass.players.cmd_play(member.player_id))
+
+ async def cmd_play_media(
+ self,
+ player_id: str,
+ queue_item: QueueItem,
+ seek_position: int = 0,
+ fade_in: bool = False,
+ flow_mode: bool = False,
+ ) -> None:
+ """Send PLAY MEDIA command to given player.
+
+ This is called when the Queue wants the player to start playing a specific QueueItem.
+ The player implementation can decide how to process the request, such as playing
+ queue items one-by-one or enqueue all/some items.
+
+ - player_id: player_id of the player to handle the command.
+ - queue_item: the QueueItem to start playing on the player.
+ - seek_position: start playing from this specific position.
+ - fade_in: fade in the music at start (e.g. at resume).
+ """
+ # send stop first
+ await self.cmd_stop(player_id)
+ # power ON
+ await self.cmd_power(player_id, True)
+ group_player = self.mass.players.get(player_id)
+ group_player.extra_data["optimistic_state"] = PlayerState.PLAYING
+ # forward command to all (powered) group child's
+ async with asyncio.TaskGroup() as tg:
+ for member in self._get_active_members(
+ player_id, only_powered=True, skip_sync_childs=True
+ ):
+ player_prov = self.mass.players.get_player_provider(member.player_id)
+ tg.create_task(
+ player_prov.cmd_play_media(
+ member.player_id,
+ queue_item=queue_item,
+ seek_position=seek_position,
+ fade_in=fade_in,
+ flow_mode=flow_mode,
+ )
+ )
+
+ async def cmd_pause(self, player_id: str) -> None:
+ """Send PAUSE command to given player."""
+ group_player = self.mass.players.get(player_id)
+ group_player.extra_data["optimistic_state"] = PlayerState.PAUSED
+ async with asyncio.TaskGroup() as tg:
+ for member in self._get_active_members(
+ player_id, only_powered=True, skip_sync_childs=True
+ ):
+ tg.create_task(self.mass.players.cmd_pause(member.player_id))
+
+ async def cmd_power(self, player_id: str, powered: bool) -> None:
+ """Send POWER command to given player."""
+ group_power_on = await self.mass.config.get_player_config_value(
+ player_id, CONF_GROUPED_POWER_ON
+ )
+ group_player = self.mass.players.get(player_id)
+
+ async def set_child_power(child_player: Player) -> None:
+ await self.mass.players.cmd_power(child_player.player_id, powered)
+ # set optimistic state on child player to prevent race conditions in other actions
+ child_player.powered = powered
+
+ if not powered or group_power_on:
+ # turn on/off child players
+ async with asyncio.TaskGroup() as tg:
+ for member in self._get_active_members(
+ player_id, only_powered=not powered, skip_sync_childs=False
+ ):
+ if member.powered == member:
+ continue
+ tg.create_task(set_child_power(member))
+
+ group_player.powered = powered
+ group_player.extra_data["optimistic_state"] = PlayerState.IDLE
+ self.mass.players.update(player_id)
+ if powered:
+ # sync all players on power on
+ await self._sync_players(player_id)
+ else:
+ group_player.extra_data["optimistic_state"] = PlayerState.OFF
+
+ async def cmd_volume_set(self, player_id: str, volume_level: int) -> None:
+ """Send VOLUME_SET command to given player."""
+ # group volume is already handled in the player manager
+
+ async def cmd_volume_mute(self, player_id: str, muted: bool) -> None:
+ """Send VOLUME MUTE command to given player."""
+
+ async def poll_player(self, player_id: str) -> None:
+ """Poll player for state updates."""
+ self.update_attributes(player_id)
+ self.mass.players.update(player_id, skip_forward=True)
+
+ def update_attributes(self, player_id: str) -> None:
+ """Update player attributes."""
+ group_player = self.mass.players.get(player_id)
+ all_members = self._get_active_members(
+ player_id, only_powered=False, skip_sync_childs=False
+ )
+ group_player.group_childs = list(x.player_id for x in all_members)
+ # read the state from the first powered child player
+ for member in all_members:
+ if member.synced_to:
+ continue
+ if not member.powered:
+ continue
+ if member.state not in (PlayerState.PLAYING, PlayerState.PAUSED):
+ continue
+ group_player.current_item_id = member.current_item_id
+ group_player.current_url = member.current_url
+ group_player.elapsed_time = member.elapsed_time
+ group_player.elapsed_time_last_updated = member.elapsed_time_last_updated
+ group_player.state = member.state
+ break
+ else:
+ group_player.state = PlayerState.IDLE
+ group_player.current_item_id = None
+ group_player.current_url = None
+
+ def on_child_state(
+ self, player_id: str, child_player: Player, changed_values: dict[str, tuple[Any, Any]]
+ ) -> None:
+ """Call when the state of a child player updates."""
+ self.update_attributes(player_id)
+ group_player = self.mass.players.get(player_id)
+ self.mass.players.update(player_id, skip_forward=True)
+ if "powered" in changed_values and (prev_power := changed_values["powered"][0]) != (
+ new_power := changed_values["powered"][1]
+ ):
+ powered_players = self._get_active_members(player_id, True, False)
+ if group_player.powered and prev_power is True and len(powered_players) == 0:
+ # the last player of a group turned off, turn off the group
+ self.mass.create_task(self.cmd_power, player_id, False)
+ # ruff: noqa: SIM114
+ elif (
+ new_power is True
+ and group_player.extra_data["optimistic_state"] == PlayerState.PLAYING
+ ):
+ # a child player turned ON while the group player is already playing
+ # we need to resync/resume
+ if group_player.state == PlayerState.PLAYING and (
+ sync_leader := next(
+ (
+ x
+ for x in child_player.can_sync_with
+ if x in self.prev_sync_leaders[player_id]
+ ),
+ None,
+ )
+ ):
+ # prevent resume when player platform supports sync
+ # and one of its players is already playing
+ self.mass.create_task(
+ self.mass.players.cmd_sync, child_player.player_id, sync_leader
+ )
+ else:
+ self.mass.create_task(self.mass.players.queues.resume, player_id)
+ elif (
+ not child_player.powered
+ and group_player.extra_data["optimistic_state"] == PlayerState.PLAYING
+ and child_player.player_id in self.prev_sync_leaders[player_id]
+ ):
+ # a sync master player turned OFF while the group player
+ # should still be playing - we need to resync/resume
+ self.mass.create_task(self.mass.players.queues.resume, player_id)
+
+ def _get_active_members(
+ self, player_id: str, only_powered: bool = False, skip_sync_childs: bool = True
+ ) -> list[Player]:
+ """Get (child) players attached to a grouped player."""
+ child_players: list[Player] = []
+ conf_members: list[str] = self.config.get_value(player_id)
+ ignore_ids = set()
+ for child_id in conf_members:
+ if child_player := self.mass.players.get(child_id, False):
+ if not (not only_powered or child_player.powered):
+ continue
+ if child_player.synced_to and skip_sync_childs:
+ continue
+ allowed_sources = [child_player.player_id, player_id] + conf_members
+ if child_player.active_source not in allowed_sources:
+ # edge case: the child player has another group already active!
+ continue
+ if child_player.synced_to and child_player.synced_to not in allowed_sources:
+ # edge case: the child player is already synced to another player
+ continue
+ child_players.append(child_player)
+ # handle edge case where a group is in the group and both the group
+ # and (one of its) child's are added to this universal group.
+ if child_player.type == PlayerType.GROUP:
+ ignore_ids.update(
+ x for x in child_player.group_childs if x != child_player.player_id
+ )
+ return [x for x in child_players if x.player_id not in ignore_ids]
+
+ async def _sync_players(self, player_id: str) -> None:
+ """Sync all (possible) players."""
+ sync_leaders = set()
+ # TODO: sort members on sync master priority attribute ?
+ for member in self._get_active_members(player_id, only_powered=True):
+ if member.synced_to is not None:
+ continue
+ if not member.can_sync_with:
+ continue
+ # check if we can join this player to an already chosen sync leader
+ if existing_leader := next(
+ (x for x in member.can_sync_with if x in sync_leaders), None
+ ):
+ await self.mass.players.cmd_sync(member.player_id, existing_leader)
+ # set optimistic state to prevent race condition in play media
+ member.synced_to = existing_leader
+ continue
+ # pick this member as new sync leader
+ sync_leaders.add(member.player_id)
+ self.prev_sync_leaders[player_id] = tuple(sync_leaders)
--- /dev/null
+{
+ "type": "player",
+ "domain": "ugp",
+ "name": "Universal Group Player",
+ "description": "Create Player Groups with your favorite players, regardless of type and model.",
+ "codeowners": ["@music-assistant"],
+ "requirements": [],
+ "documentation": "",
+ "multi_instance": false,
+ "builtin": false,
+ "load_by_default": false,
+ "icon": "mdi:mdi-speaker-multiple"
+}
+++ /dev/null
-"""
-Universal Group Player provider.
-
-This is more like a "virtual" player provider,
-allowing the user to create player groups from all players known in the system.
-"""
-from __future__ import annotations
-
-import asyncio
-from typing import TYPE_CHECKING
-
-from music_assistant.common.models.config_entries import (
- CONF_ENTRY_FLOW_MODE,
- CONF_ENTRY_GROUPED_POWER_ON,
- CONF_ENTRY_HIDE_GROUP_MEMBERS,
- CONF_ENTRY_OUTPUT_CHANNELS,
- ConfigEntry,
- ConfigValueOption,
- ConfigValueType,
-)
-from music_assistant.common.models.enums import (
- ConfigEntryType,
- PlayerFeature,
- PlayerState,
- PlayerType,
-)
-from music_assistant.common.models.player import DeviceInfo, Player
-from music_assistant.common.models.queue_item import QueueItem
-from music_assistant.constants import CONF_GROUPED_POWER_ON, CONF_PROVIDERS
-from music_assistant.server.models.player_provider import PlayerProvider
-
-if TYPE_CHECKING:
- from music_assistant.common.models.config_entries import ProviderConfig
- from music_assistant.common.models.provider import ProviderManifest
- from music_assistant.server import MusicAssistant
- from music_assistant.server.models import ProviderInstanceType
-
-
-CONF_GROUP_MEMBERS = "group_members"
-
-CONF_ENTRY_OUTPUT_CHANNELS_FORCED_STEREO = ConfigEntry.from_dict(
- {
- **CONF_ENTRY_OUTPUT_CHANNELS.to_dict(),
- "hidden": True,
- "default_value": "stereo",
- "value": "stereo",
- }
-)
-CONF_ENTRY_FORCED_FLOW_MODE = ConfigEntry.from_dict(
- {**CONF_ENTRY_FLOW_MODE.to_dict(), "default_value": True, "value": True}
-)
-SUPPORTS_NATIVE_SYNC = ("sonos",)
-# ruff: noqa: ARG002
-
-
-async def setup(
- mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
-) -> ProviderInstanceType:
- """Initialize provider(instance) with given configuration."""
- prov = UniversalGroupProvider(mass, manifest, config)
- await prov.handle_setup()
- return prov
-
-
-async def get_config_entries(
- mass: MusicAssistant,
- instance_id: str | None = None,
- action: str | None = None,
- values: dict[str, ConfigValueType] | None = None,
-) -> tuple[ConfigEntry, ...]:
- """
- Return Config entries to setup this provider.
-
- instance_id: id of an existing provider instance (None if new instance setup).
- action: [optional] action key called from config entries UI.
- values: the (intermediate) raw values for config entries sent with the action.
- """
- # ruff: noqa: ARG001
- all_players = tuple(
- ConfigValueOption(x.display_name, x.player_id)
- for x in mass.players.all(True, True, False)
- if x.player_id != instance_id
- )
- return (
- ConfigEntry(
- key=CONF_GROUP_MEMBERS,
- type=ConfigEntryType.STRING,
- label="Group members",
- default_value=[],
- options=all_players,
- description="Select all players you want to be part of this group",
- multi_value=True,
- ),
- )
-
-
-class UniversalGroupProvider(PlayerProvider):
- """Base/builtin provider for universally grouping players."""
-
- prev_sync_leaders: tuple[str] | None = None
- optimistic_state: PlayerState | None = None
-
- async def handle_setup(self) -> None:
- """Handle async initialization of the provider."""
- self.player = player = Player(
- player_id=self.instance_id,
- provider=self.domain,
- type=PlayerType.GROUP,
- name=self.name,
- available=True,
- powered=False,
- device_info=DeviceInfo(model=self.manifest.name, manufacturer="Music Assistant"),
- # TODO: derive playerfeatures from (all) underlying child players
- supported_features=(
- PlayerFeature.POWER,
- PlayerFeature.PAUSE,
- PlayerFeature.VOLUME_SET,
- PlayerFeature.VOLUME_MUTE,
- PlayerFeature.SET_MEMBERS,
- ),
- active_source=self.instance_id,
- group_childs=self.config.get_value(CONF_GROUP_MEMBERS),
- )
- self.mass.players.register_or_update(player)
-
- async def unload(self) -> None:
- """Handle close/cleanup of the provider."""
- # cleanup player config if provider is removed
- if self.mass.config.get(f"{CONF_PROVIDERS}/{self.instance_id}") is not None:
- return
- self.mass.players.remove(self.instance_id)
-
- async def get_player_config_entries(self, player_id: str) -> tuple[ConfigEntry]: # noqa: ARG002
- """Return all (provider/player specific) Config Entries for the given player (if any)."""
- return (
- CONF_ENTRY_HIDE_GROUP_MEMBERS,
- CONF_ENTRY_GROUPED_POWER_ON,
- CONF_ENTRY_OUTPUT_CHANNELS_FORCED_STEREO,
- CONF_ENTRY_FORCED_FLOW_MODE,
- )
-
- async def cmd_stop(self, player_id: str) -> None:
- """Send STOP command to given player."""
- self.optimistic_state = PlayerState.IDLE
- # forward command to player and any connected sync child's
- async with asyncio.TaskGroup() as tg:
- for member in self._get_active_members(only_powered=True, skip_sync_childs=True):
- if member.state == PlayerState.IDLE:
- continue
- tg.create_task(self.mass.players.cmd_stop(member.player_id))
-
- async def cmd_play(self, player_id: str) -> None:
- """Send PLAY command to given player."""
- self.optimistic_state = PlayerState.PLAYING
- async with asyncio.TaskGroup() as tg:
- for member in self._get_active_members(only_powered=True, skip_sync_childs=True):
- tg.create_task(self.mass.players.cmd_play(member.player_id))
-
- async def cmd_play_media(
- self,
- player_id: str,
- queue_item: QueueItem,
- seek_position: int = 0,
- fade_in: bool = False,
- flow_mode: bool = False,
- ) -> None:
- """Send PLAY MEDIA command to given player.
-
- This is called when the Queue wants the player to start playing a specific QueueItem.
- The player implementation can decide how to process the request, such as playing
- queue items one-by-one or enqueue all/some items.
-
- - player_id: player_id of the player to handle the command.
- - queue_item: the QueueItem to start playing on the player.
- - seek_position: start playing from this specific position.
- - fade_in: fade in the music at start (e.g. at resume).
- """
- # send stop first
- await self.cmd_stop(player_id)
- # power ON
- await self.cmd_power(player_id, True)
- self.optimistic_state = PlayerState.PLAYING
- # forward command to all (powered) group child's
- async with asyncio.TaskGroup() as tg:
- for member in self._get_active_members(only_powered=True, skip_sync_childs=True):
- player_prov = self.mass.players.get_player_provider(member.player_id)
- tg.create_task(
- player_prov.cmd_play_media(
- member.player_id,
- queue_item=queue_item,
- seek_position=seek_position,
- fade_in=fade_in,
- flow_mode=flow_mode,
- )
- )
-
- async def cmd_pause(self, player_id: str) -> None:
- """Send PAUSE command to given player."""
- self.optimistic_state = PlayerState.PAUSED
- async with asyncio.TaskGroup() as tg:
- for member in self._get_active_members(only_powered=True, skip_sync_childs=True):
- tg.create_task(self.mass.players.cmd_pause(member.player_id))
-
- async def cmd_power(self, player_id: str, powered: bool) -> None:
- """Send POWER command to given player."""
- group_power_on = await self.mass.config.get_player_config_value(
- player_id, CONF_GROUPED_POWER_ON
- )
-
- async def set_child_power(child_player: Player) -> None:
- await self.mass.players.cmd_power(child_player.player_id, powered)
- # set optimistic state on child player to prevent race conditions in other actions
- child_player.powered = powered
-
- if not powered or group_power_on:
- # turn on/off child players
- async with asyncio.TaskGroup() as tg:
- for member in self._get_active_members(
- only_powered=not powered, skip_sync_childs=False
- ):
- if member.powered == member:
- continue
- tg.create_task(set_child_power(member))
-
- self.player.powered = powered
- self.mass.players.update(self.instance_id)
- if powered:
- # sync all players on power on
- await self._sync_players()
- else:
- self.optimistic_state = PlayerState.OFF
-
- async def cmd_volume_set(self, player_id: str, volume_level: int) -> None:
- """Send VOLUME_SET command to given player."""
-
- async def cmd_volume_mute(self, player_id: str, muted: bool) -> None:
- """Send VOLUME MUTE command to given player."""
-
- async def poll_player(self, player_id: str) -> None:
- """Poll player for state updates."""
- self.update_attributes()
- self.mass.players.update(player_id, skip_forward=True)
-
- def update_attributes(self) -> None:
- """Update player attributes."""
- all_members = self._get_active_members(only_powered=False, skip_sync_childs=False)
- self.player.group_childs = list(x.player_id for x in all_members)
- # read the state from the first powered child player
- for member in all_members:
- if member.synced_to:
- continue
- if not member.powered:
- continue
- if member.state not in (PlayerState.PLAYING, PlayerState.PAUSED):
- continue
- self.player.current_item_id = member.current_item_id
- self.player.current_url = member.current_url
- self.player.elapsed_time = member.elapsed_time
- self.player.elapsed_time_last_updated = member.elapsed_time_last_updated
- self.player.state = member.state
- break
- else:
- self.player.state = PlayerState.IDLE
- self.player.current_item_id = None
- self.player.current_url = None
-
- def on_child_state(self, player_id: str, child_player: Player, changed_keys: set[str]) -> None:
- """Call when the state of a child player updates."""
- powered_players = self._get_active_members(True, False)
- if "powered" in changed_keys:
- if not child_player.powered and len(powered_players) == 0:
- # the last player of a group turned off
- # turn off the group
- self.mass.create_task(self.cmd_power, player_id, False)
- # ruff: noqa: SIM114
- elif child_player.powered and self.optimistic_state == PlayerState.PLAYING:
- # a child player turned ON while the group player is already playing
- # we need to resync/resume
- if (
- child_player.provider in SUPPORTS_NATIVE_SYNC
- and self.player.state == PlayerState.PLAYING
- and (
- sync_leader := next(
- (x for x in child_player.can_sync_with if x in self.prev_sync_leaders),
- None,
- )
- )
- ):
- # prevent resume when ecosystem supports native sync
- # and one of its players is already playing
- self.mass.create_task(self.mass.players.cmd_sync, player_id, sync_leader)
- else:
- self.mass.create_task(self.mass.players.queues.resume, player_id)
- elif (
- not child_player.powered
- and self.optimistic_state == PlayerState.PLAYING
- and child_player.player_id in self.prev_sync_leaders
- ):
- # a sync master player turned OFF while the group player
- # should still be playing - we need to resync/resume
- self.mass.create_task(self.mass.players.queues.resume, player_id)
- self.update_attributes()
- self.mass.players.update(player_id, skip_forward=True)
-
- def _get_active_members(
- self, only_powered: bool = False, skip_sync_childs: bool = True
- ) -> list[Player]:
- """Get (child) players attached to a grouped player."""
- child_players: list[Player] = []
- conf_members: list[str] = self.config.get_value(CONF_GROUP_MEMBERS)
- ignore_ids = set()
- for child_id in conf_members:
- if child_player := self.mass.players.get(child_id, False):
- if not (not only_powered or child_player.powered):
- continue
- if child_player.synced_to and skip_sync_childs:
- continue
- allowed_sources = [child_player.player_id, self.instance_id] + conf_members
- if child_player.active_source not in allowed_sources:
- # edge case: the child player has another group already active!
- continue
- if child_player.synced_to and child_player.synced_to not in conf_members:
- # edge case: the child player is already synced to another player
- continue
- child_players.append(child_player)
- # handle edge case where a group is in the group and both the group
- # and (one of its) child's are added to this universal group.
- if child_player.type == PlayerType.GROUP:
- ignore_ids.update(
- x for x in child_player.group_childs if x != child_player.player_id
- )
- return [x for x in child_players if x.player_id not in ignore_ids]
-
- async def _sync_players(self) -> None:
- """Sync all (possible) players."""
- sync_leaders = set()
- # TODO: sort members on sync master priority attribute ?
- for member in self._get_active_members(only_powered=True):
- if member.synced_to is not None:
- continue
- if not member.can_sync_with:
- continue
- # check if we can join this player to an already chosen sync leader
- if existing_leader := next(
- (x for x in member.can_sync_with if x in sync_leaders), None
- ):
- await self.mass.players.cmd_sync(member.player_id, existing_leader)
- # set optimistic state to prevent race condition in play media
- member.synced_to = existing_leader
- continue
- # pick this member as new sync leader
- sync_leaders.add(member.player_id)
- self.prev_sync_leaders = tuple(sync_leaders)
+++ /dev/null
-{
- "type": "player",
- "domain": "universal_group",
- "name": "Universal Group Player",
- "description": "Create a Player Group with your favorite players, regardless of type and model.",
- "codeowners": ["@music-assistant"],
- "requirements": [],
- "documentation": "",
- "multi_instance": true,
- "builtin": false,
- "load_by_default": false,
- "icon": "mdi:mdi-speaker-multiple"
-}