if player.playback_state != PlaybackState.IDLE:
await self.cmd_stop(player_id)
await asyncio.sleep(0.5) # small delay to allow stop to process
- player._attr_active_source = None
- player._attr_current_media = None
+ player.active_source = None
+ player.current_media = None
# check if source is a pluginsource
# in that case the source id is the instance_id of the plugin provider
if plugin_prov := self.mass.get_provider(source):
):
continue # already synced to this target
+ # Check if player is already part of another group and try to automatically ungroup it
+ # first. If that fails, power off the group
+ if child_player.active_group and child_player.active_group != target_player:
+ if (
+ other_group := self.get(child_player.active_group)
+ ) and PlayerFeature.SET_MEMBERS in other_group.supported_features:
+ self.logger.warning(
+ "Player %s is already part of another group (%s), "
+ "removing from that group first",
+ child_player.name,
+ child_player.active_group,
+ )
+ try:
+ await other_group.set_members(player_ids_to_remove=[child_player.player_id])
+ except UnsupportedFeaturedException as err:
+ self.logger.warning(
+ "Failed to remove player %s from group %s: %s, powering it off instead",
+ child_player.name,
+ child_player.active_group,
+ err,
+ )
+ await self.cmd_power(child_player.active_group, False)
+ else:
+ self.logger.warning(
+ "Player %s is already part of another group (%s), powering it off first",
+ child_player.name,
+ child_player.active_group,
+ )
+ await self.cmd_power(child_player.active_group, False)
+ elif child_player.synced_to and child_player.synced_to != target_player:
+ self.logger.warning(
+ "Player %s is already synced to another player, ungrouping first",
+ child_player.name,
+ )
+ await self.cmd_ungroup(child_player.player_id)
+
# power on the player if needed
if not child_player.powered and child_player.power_control != PLAYER_CONTROL_NONE:
await self.cmd_power(child_player.player_id, True, skip_update=True)
# ensure we fetch and set the latest/full config for the player
player_config = await self.mass.config.get_player_config(player_id)
player.set_config(player_config)
+ # call on_registered hook after the player is registered and config is set
+ await player.on_registered()
# always call update to fix special attributes like display name, group volume etc.
player.update_state()
removed_members = set(prev_group_members) - set(new_group_members)
for player_id in removed_members:
if removed_player := self.get(player_id):
- self.mass.loop.call_soon(removed_player.update_state, True)
+ removed_player.update_state()
# signal player update on the eventbus
self.mass.signal_event(EventType.PLAYER_UPDATED, object_id=player_id, data=player)
# update/signal group player(s) child's when group updates
for child_player in self.iter_group_members(player, exclude_self=True):
- self.mass.loop.call_soon(child_player.update_state, True)
+ child_player.update_state()
# update/signal group player(s) when child updates
for group_player in self._get_player_groups(player, powered_only=False):
- self.mass.loop.call_soon(group_player.update_state, True)
+ group_player.update_state()
# update/signal manually synced to player when child updates
if (synced_to := player.synced_to) and (synced_to_player := self.get(synced_to)):
- self.mass.loop.call_soon(synced_to_player.update_state, True)
+ synced_to_player.update_state()
+ # update/signal active groups when a group member updates
+ if (active_group := player.active_group) and (
+ active_group_player := self.get(active_group)
+ ):
+ active_group_player.update_state()
async def register_player_control(self, player_control: PlayerControl) -> None:
"""Register a new PlayerControl on the controller."""
for volume_player_id, prev_volume in prev_volumes.items():
tg.create_task(self.cmd_volume_set(volume_player_id, prev_volume))
await asyncio.sleep(0.2)
- player._attr_current_media = prev_media
- player._attr_active_source = prev_source
+ player.current_media = prev_media
+ player.active_source = prev_source
# either power off the player or resume playing
if not prev_power and player.power_control != PLAYER_CONTROL_NONE:
await self.cmd_power(player.player_id, False)
from __future__ import annotations
-import asyncio
import time
from abc import ABC, abstractmethod
from collections.abc import Callable
"""
return self._attr_active_source
+ @active_source.setter
+ def active_source(self, value: str | None) -> None:
+ """Set the active source of the player."""
+ self._attr_active_source = value
+
@property
def source_list(self) -> list[PlayerSource]:
"""Return list of available (native) sources for this player."""
"""Return the current media being played by the player."""
return self._attr_current_media
+ @current_media.setter
+ def current_media(self, value: PlayerMedia | None) -> None:
+ """Set the current media being played by the player."""
+ self._attr_current_media = value
+
@property
def needs_poll(self) -> bool:
"""Return if the player needs to be polled for state updates."""
),
]
+ async def on_registered(self) -> None:
+ """
+ Handle logic when the player is registered and config is set.
+
+ Override this method in your player implementation if you need
+ to perform any additional setup logic after the player is registered and
+ the self.config was loaded.
+ """
+ return
+
async def on_unload(self) -> None:
"""Handle logic when the player is unloaded from the Player controller."""
for callback in self._on_unload_callbacks:
If this player is not synced to another player (or is the sync leader itself),
this should return None.
+ If it is part of a (permanent) group, this should also return None.
"""
# default implementation: feel free to override
for player in self.mass.players.all():
if player.player_id == self.player_id:
# skip self
continue
- if self.player_id in player.group_members:
- # this player is a member of the group of the other player
+ if player.type == PlayerType.PLAYER and self.player_id in player.group_members:
+ # this player is synced to another player, but not part of a (permanent) group
return player.player_id
return None
@property
@final
- def active_group(self) -> str | None:
+ def active_groups(self) -> list[str]:
"""
- Return the player id of the (first) playergroup that is currently active for this player.
+ Return the player ids of all playergroups that are currently active for this player.
- This will return the id of the groupplayer if a group is active.
- If no group is currently active, this will return None.
+ This will return the ids of the groupplayers if any groups are active.
+ If no groups are currently active, this will return an empty list.
"""
+ active_groups = []
for player in self.mass.players.all(return_unavailable=False, return_disabled=False):
if player.type != PlayerType.GROUP:
continue
if not (player.powered or player.playback_state == PlaybackState.PLAYING):
continue
if self.player_id in player.group_members:
- return player.player_id
- return None
+ active_groups.append(player.player_id)
+ return active_groups
+
+ @property
+ @final
+ def active_group(self) -> str | None:
+ """
+ Return the player id of the (first) playergroup that is currently active for this player.
+
+ This will return the id of the groupplayer if a group is active.
+ If no group is currently active, this will return None.
+ """
+ active_groups = self.active_groups
+ return active_groups[0] if active_groups else None
@cached_property
@final
_attr_type: PlayerType = PlayerType.GROUP
+ @cached_property
+ def synced_to(self) -> str | None:
+ """Return the id of the player this player is synced to (sync leader)."""
+ # default implementation: groups can't be synced
+ return None
+
async def get_config_entries(self) -> list[ConfigEntry]:
"""Return all (provider/player specific) Config Entries for the player."""
# Return all base config entries for a group player.
"""Helper class for a (provider specific) SyncGroup player."""
_attr_type: PlayerType = PlayerType.GROUP
+ sync_leader: Player | None = None
+ """The active sync leader player for this syncgroup."""
@cached_property
def is_dynamic(self) -> bool:
self._attr_available = True
self._attr_powered = False # group players are always powered off by default
self._attr_active_source = player_id
- self._attr_group_members = cast("list[str]", self.config.get_value(CONF_GROUP_MEMBERS, []))
self._attr_device_info = DeviceInfo(model="Sync Group", manufacturer=provider.name)
- self._set_attributes()
-
- def _set_attributes(self) -> None:
- """Set player attributes."""
- player_features = {
+ self._attr_supported_features = {
PlayerFeature.POWER,
PlayerFeature.VOLUME_SET,
}
+
+ async def on_registered(self) -> None:
+ """Complete the initialization once the player was registered."""
+ # Config is only available after the player was registered
+ # Copy the list so not every added player becomes a static member
+ self._attr_group_members = list(
+ cast("list[str]", self.config.get_value(CONF_GROUP_MEMBERS, []))
+ )
+ # Uses self.config
if self.is_dynamic:
- player_features.add(PlayerFeature.SET_MEMBERS)
+ self._attr_supported_features.add(PlayerFeature.SET_MEMBERS)
+
+ @property
+ def supported_features(self) -> set[PlayerFeature]:
+ """Return the supported features of the player."""
+ return self._attr_supported_features
- self._attr_supported_features = player_features
+ @property
+ def playback_state(self) -> PlaybackState:
+ """Return the current playback state of the player."""
+ if self.power_state:
+ return self.sync_leader.playback_state if self.sync_leader else PlaybackState.IDLE
+ else:
+ return PlaybackState.IDLE
+
+ @cached_property
+ def flow_mode(self) -> bool:
+ """
+ Return if the player needs flow mode.
+
+ Will by default be set to True if the player does not support PlayerFeature.ENQUEUE
+ or has a flow mode config entry set to True.
+ """
+ if leader := self.sync_leader:
+ return leader.flow_mode
+ return False
+
+ @property
+ def elapsed_time(self) -> float | None:
+ """Return the elapsed time in (fractional) seconds of the current track (if any)."""
+ return self.sync_leader.elapsed_time if self.sync_leader else None
+
+ @elapsed_time.setter
+ def elapsed_time(self, value: float | None) -> None:
+ """Set the elapsed time on the player."""
+ raise NotImplementedError("elapsed_time is read-only on a SyncGroup player")
+
+ @property
+ def elapsed_time_last_updated(self) -> float | None:
+ """Return when the elapsed time was last updated."""
+ return self.sync_leader.elapsed_time_last_updated if self.sync_leader else None
+
+ @property
+ def can_group_with(self) -> set[str]:
+ """
+ Return the id's of players this player can group with.
+
+ This should return set of player_id's this player can group/sync with
+ or just the provider's instance_id if all players can group with each other.
+ """
+ if self.is_dynamic and (leader := self.sync_leader):
+ return leader.can_group_with
+ elif self.is_dynamic:
+ return {self.provider.lookup_key}
+ else:
+ return set()
async def get_config_entries(self) -> list[ConfigEntry]:
"""Return all (provider/player specific) Config Entries for the given player (if any)."""
description="Select all players you want to be part of this group",
required=False, # needed for dynamic members (which allows empty members list)
options=[
- ConfigValueOption(x.display_name, x.player_id) for x in self.provider.players
+ ConfigValueOption(x.display_name, x.player_id)
+ for x in self.provider.players
+ if x.type != PlayerType.GROUP
],
),
ConfigEntry(
key="dynamic_members",
type=ConfigEntryType.BOOLEAN,
label="Enable dynamic members",
- description="Allow (un)joining members dynamically, so the group more or less"
+ description="Allow (un)joining members dynamically, so the group more or less "
"behaves the same like manually syncing players together, "
- "with the main difference being that the groupplayer will hold the queue.",
+ "with the main difference being that the group player will hold the queue.",
default_value=False,
required=False,
),
if sync_leader := self.sync_leader:
await sync_leader.pause()
+ async def _handle_member_collisions(self, member: Player) -> None:
+ """Handle collisions when adding a member to the sync group."""
+ active_groups = member.active_groups
+ for group in active_groups:
+ if group == self.player_id:
+ continue
+ # collision: child player is part another group that is already active !
+ # solve this by trying to leave the group first
+ if other_group := self.mass.players.get(group):
+ try:
+ other_group.check_feature(PlayerFeature.SET_MEMBERS)
+ await other_group.set_members(player_ids_to_remove=[member.player_id])
+ except UnsupportedFeaturedException:
+ # if the other group does not support SET_MEMBERS or it is a static
+ # member, we need to power it off to leave the group
+ await other_group.power(False)
+ if (
+ member.synced_to is not None
+ and member.synced_to != self.sync_leader
+ and (synced_to_player := self.mass.players.get(member.synced_to))
+ and member.player_id in synced_to_player.group_members
+ ):
+ # collision: child player is synced to another player and still in that group
+ # ungroup it first
+ await synced_to_player.set_members(player_ids_to_remove=[member.player_id])
+
async def power(self, powered: bool) -> None:
"""Handle POWER command to group player."""
# always stop at power off
if not powered and self.playback_state in (PlaybackState.PLAYING, PlaybackState.PAUSED):
await self.stop()
- if powered:
- await self._form_syncgroup()
-
# optimistically set the group state
prev_power = self._attr_powered
self._attr_powered = powered
self.update_state()
if powered:
+ # Select sync leader and handle turn on
+ new_leader = self._select_sync_leader()
# handle TURN_ON of the group player by turning on all members
for member in self.mass.players.iter_group_members(
self, only_powered=False, active_only=False
):
- if member.active_group is not None and member.active_group != self.player_id:
- # collision: child player is part of multiple groups
- # and another group already active !
- # solve this by trying to leave the group first
- if (
- other_group := self.mass.players.get(member.active_group)
- ) and PlayerFeature.SET_MEMBERS in other_group.supported_features:
- await other_group.set_members(player_ids_to_remove=[member.player_id])
- else:
- # if the other group does not support SET_MEMBERS,
- # we need to power it off to leave the group
- await self.mass.players.cmd_power(member.active_group, False)
- await asyncio.sleep(1)
- await asyncio.sleep(1)
+ await self._handle_member_collisions(member)
if not member.powered and member.power_control != PLAYER_CONTROL_NONE:
- await self.mass.players.cmd_power(member.player_id, True)
- elif not prev_power:
- # handle TURN_OFF of the group player by ungrouping and turning off all members
- if (sync_leader := self.sync_leader) and sync_leader.group_members:
- # dissolve the temporary syncgroup from the sync leader
- sync_childs = [x for x in sync_leader.group_members if x != sync_leader.player_id]
- if sync_childs:
- await sync_leader.set_members(player_ids_to_remove=sync_childs)
+ await member.power(True)
+ # Set up the sync group with the new leader
+ await self._handle_leader_transition(new_leader)
+ elif prev_power:
+ # handle TURN_OFF of the group player by dissolving group and turning off all members
+ await self._dissolve_syncgroup()
# turn off all group members
for member in self.mass.players.iter_group_members(
self, only_powered=True, active_only=True
):
if member.powered and member.power_control != PLAYER_CONTROL_NONE:
- await self.mass.players.cmd_power(member.player_id, False)
+ await member.power(False)
if not powered:
- # reset the original group members when powered off
- self._attr_group_members = cast(
- "list[str]", self.config.get_value(CONF_GROUP_MEMBERS, [])
+ # reset the original group members when powered off and clear leader
+ self._attr_group_members = list(
+ cast("list[str]", self.config.get_value(CONF_GROUP_MEMBERS, []))
)
+ self.sync_leader = None
+
+ async def _dissolve_syncgroup(self) -> None:
+ """Dissolve the current syncgroup by ungrouping all members and restoring leader queue."""
+ if sync_leader := self.sync_leader:
+ # dissolve the temporary syncgroup from the sync leader
+ sync_children = [x for x in sync_leader.group_members if x != sync_leader.player_id]
+ if sync_children:
+ await sync_leader.set_members(player_ids_to_remove=sync_children)
+ # Reset the leaders queue since it is no longer part of this group
+ sync_leader.active_source = None
+ sync_leader.current_media = None
+ sync_leader.update_state()
+
+ async def _handle_leader_transition(self, new_leader: Player | None) -> None:
+ """Handle transition from current leader to new leader."""
+ prev_leader = self.sync_leader
+ was_playing = False
+
+ if prev_leader:
+ # Save current media and playback state for potential restart
+ was_playing = self.playback_state == PlaybackState.PLAYING
+ # Stop current playback and dissolve existing group
+ await self.stop()
+ await self._dissolve_syncgroup()
+
+ # Set new leader
+ self.sync_leader = new_leader
+
+ if new_leader:
+ # form a syncgroup with the new leader
+ await self._form_syncgroup()
+
+ # Restart playback if requested and we have media to play
+ if was_playing and self.current_media is not None:
+ await new_leader.play_media(self.current_media)
async def volume_set(self, volume_level: int) -> None:
"""Send VOLUME_SET command to given player."""
# simply forward the command to the sync leader
if sync_leader := self.sync_leader:
await sync_leader.play_media(media)
+ self._attr_current_media = media
+ self._attr_active_source = media.queue_id
+ self.update_state()
+ else:
+ raise RuntimeError("an empty group cannot play media, consider adding members first")
async def enqueue_next_media(self, media: PlayerMedia) -> None:
"""Handle enqueuing of a next media item on the player."""
self._attr_group_members.remove(player_id)
final_players_to_remove.append(player_id)
self.update_state()
- if self.powered and (player_ids_to_add or player_ids_to_remove):
- # if the group is powered on, we need to (re)sync the members
- sync_leader = await self._select_sync_leader()
- await sync_leader.set_members(
+ if not self.powered:
+ # Don't need to do anything else if the group is powered off
+ # The syncing will be done once powered on
+ return
+ next_leader = self._select_sync_leader()
+ prev_leader = self.sync_leader
+
+ if prev_leader and next_leader is None:
+ # Edge case: we no longer have any members in the group (and thus no leader)
+ await self._handle_leader_transition(None)
+ elif prev_leader != next_leader:
+ # Edge case: we had changed the leader (or just got one)
+ await self._handle_leader_transition(next_leader)
+ elif self.sync_leader and (player_ids_to_add or player_ids_to_remove):
+ # if the group still has the same leader, we need to (re)sync the members
+ # Handle collisions for newly added players
+ for player_id in final_players_to_add:
+ if player := self.mass.players.get(player_id):
+ await self._handle_member_collisions(player)
+
+ await self.sync_leader.set_members(
player_ids_to_add=final_players_to_add,
player_ids_to_remove=final_players_to_remove,
)
- @property
- def sync_leader(self) -> Player | None:
- """Get the active sync leader player for the syncgroup."""
- for child_player in self.mass.players.iter_group_members(
- self, only_powered=False, only_playing=False, active_only=False
- ):
- # the syncleader is just the first player in the group
- return child_player
- return None
-
async def _form_syncgroup(self) -> None:
- """Form syncgroup by sync all (possible) members."""
- sync_leader = await self._select_sync_leader()
+ """Form syncgroup by syncing all (possible) members."""
+ if self.sync_leader is None:
+ # This is an empty group, leader will be selected once a member is added
+ self._attr_group_members = []
+ self.update_state()
+ return
# ensure the sync leader is first in the list
self._attr_group_members = [
- sync_leader.player_id,
- *[x for x in self._attr_group_members if x != sync_leader.player_id],
+ self.sync_leader.player_id,
+ *[x for x in self._attr_group_members if x != self.sync_leader.player_id],
]
self.update_state()
members_to_sync: list[str] = []
for member in self.mass.players.iter_group_members(self, active_only=False):
- if member.synced_to and member.synced_to != sync_leader.player_id:
+ # Handle collisions before attempting to sync
+ await self._handle_member_collisions(member)
+
+ if member.synced_to and member.synced_to != self.sync_leader.player_id:
# ungroup first
- await self.mass.players.cmd_ungroup(member.player_id)
- if member.player_id == sync_leader.player_id:
+ await member.ungroup()
+ if member.player_id == self.sync_leader.player_id:
# skip sync leader
continue
if (
- member.synced_to == sync_leader.player_id
- and member.player_id in sync_leader.group_members
+ member.synced_to == self.sync_leader.player_id
+ and member.player_id in self.sync_leader.group_members
):
# already synced
continue
members_to_sync.append(member.player_id)
if members_to_sync:
- await sync_leader.set_members(members_to_sync)
+ await self.sync_leader.set_members(members_to_sync)
- async def _select_sync_leader(self) -> Player:
+ def _select_sync_leader(self) -> Player | None:
"""Select the active sync leader player for a syncgroup."""
+ if self.sync_leader and self.sync_leader.player_id in self.group_members:
+ # Don't change the sync leader if we already have one
+ return self.sync_leader
for prefer_sync_leader in (True, False):
for child_player in self.mass.players.iter_group_members(self):
if prefer_sync_leader and child_player.synced_to:
- # prefer the first player that already has sync childs
+ # prefer the first player that already has sync children
continue
if child_player.active_group not in (
None,
# but guard it just in case bad things happen
continue
return child_player
- raise RuntimeError("No players available to form syncgroup")
+ return None
__all__ = [
MediaType,
PlaybackState,
PlayerFeature,
+ PlayerType,
)
from music_assistant_models.errors import UnsupportedFeaturedException
from music_assistant_models.media_items import AudioFormat
from music_assistant.models.player import DeviceInfo, GroupPlayer, PlayerMedia
from music_assistant.providers.universal_group.constants import UGP_FORMAT
-from .constants import CONF_ENTRY_SAMPLE_RATES_UGP, CONFIG_ENTRY_UGP_NOTE, UGP_PREFIX
+from .constants import CONF_ENTRY_SAMPLE_RATES_UGP, CONFIG_ENTRY_UGP_NOTE
from .ugp_stream import UGPStream
if TYPE_CHECKING:
self._attr_available = True
self._attr_powered = False # group players are always powered off by default
self._attr_active_source = player_id
- self._attr_group_members = cast("list[str]", self.config.get_value(CONF_GROUP_MEMBERS, []))
self._attr_device_info = DeviceInfo(model="Universal Group", manufacturer=provider.name)
self._attr_supported_features = {*BASE_FEATURES}
+ self._attr_needs_poll = True
+ self._attr_poll_interval = 30
# register dynamic route for the ugp stream
self._on_unload_callbacks.append(
self.mass.streams.register_dynamic_route(
}
self._set_attributes()
+ async def on_registered(self) -> None:
+ """Complete the initialization once the player was registered."""
+ # Config entries are only fully available after the player was registered
+ self._attr_group_members = list(
+ cast("list[str]", self.config.get_value(CONF_GROUP_MEMBERS, []))
+ )
+
@cached_property
def is_dynamic(self) -> bool:
"""Return if the player is a dynamic group player."""
options=[
ConfigValueOption(x.display_name, x.player_id)
for x in self.mass.players.all(True, False)
- if not x.player_id.startswith(UGP_PREFIX)
+ if x.type != PlayerType.GROUP
],
),
ConfigEntry(
# abort the stream session
if self.stream and not self.stream.done:
await self.stream.stop()
+ self.stream = None
+ self._set_attributes()
async def power(self, powered: bool) -> None:
"""Handle POWER command to group player."""
# optimistically set the group state
prev_power = self._attr_powered
self._attr_powered = powered
- self.update_state()
if powered:
# handle TURN_ON of the group player by turning on all members
self._attr_group_members = cast(
"list[str]", self.config.get_value(CONF_GROUP_MEMBERS, [])
)
+ self.update_state()
async def volume_set(self, volume_level: int) -> None:
"""Send VOLUME_SET command to given player."""
self._attr_elapsed_time = 0
self._attr_elapsed_time_last_updated = time() - 1
self._attr_playback_state = PlaybackState.PLAYING
+ self._attr_active_source = media.queue_id
self.update_state()
# forward to downstream play_media commands
async def poll(self) -> None:
"""Poll player for state updates."""
self._set_attributes()
- self.update_state()
async def on_unload(self) -> None:
"""Handle logic when the player is unloaded from the Player controller."""
self._attr_supported_features.discard(PlayerFeature.SET_MEMBERS)
# grab current media and state from one of the active players
for child_player in self.mass.players.iter_group_members(self, active_only=True):
- self._attr_available = True
self._attr_playback_state = child_player.playback_state
if child_player.elapsed_time:
self._attr_elapsed_time = child_player.elapsed_time
break
else:
self._attr_playback_state = PlaybackState.IDLE
- self._attr_available = False
self.update_state()
async def _serve_ugp_stream(self, request: web.Request) -> web.StreamResponse: