# PlayerGroup related endpoints/commands
- async def create_group(self, provider: str, name: str, members: list[str]) -> Player:
- """Create new (permanent) Player/Sync Group on given PlayerProvider with name and members.
+ async def create_syncgroup(self, name: str, members: list[str]) -> Player:
+ """Create a new Sync Group with name and members.
- - provider: provider domain or instance id to create the new group on.
- name: Name for the new group to create.
- members: A list of player_id's that should be part of this group.
Returns the newly created player on success.
- NOTE: Fails if the given provider does not support creating new groups
- or members are given that can not be handled by the provider.
"""
return Player.from_dict(
- await self.client.send_command(
- "players/create_group", provider=provider, name=name, members=members
- )
+ await self.client.send_command("players/create_syncgroup", name=name, members=members)
)
async def set_player_group_volume(self, player_id: str, volume_level: int) -> None:
#
# PLAYERPROVIDER FEATURES
#
- PLAYER_GROUP_CREATE = "player_group_create"
SYNC_PLAYERS = "sync_players"
#
CONF_BIND_PORT: Final[str] = "bind_port"
CONF_PUBLISH_IP: Final[str] = "publish_ip"
CONF_AUTO_PLAY: Final[str] = "auto_play"
-CONF_GROUP_PLAYERS: Final[str] = "group_players"
CONF_CROSSFADE: Final[str] = "crossfade"
CONF_GROUP_MEMBERS: Final[str] = "group_members"
CONF_HIDE_PLAYER: Final[str] = "hide_player"
import base64
import logging
import os
-from contextlib import suppress
from typing import TYPE_CHECKING, Any
from uuid import uuid4
PlayerConfig,
ProviderConfig,
)
-from music_assistant.common.models.enums import EventType, PlayerState, ProviderType
-from music_assistant.common.models.errors import (
- InvalidDataError,
- PlayerUnavailableError,
- ProviderUnavailableError,
-)
+from music_assistant.common.models.enums import EventType, ProviderType
+from music_assistant.common.models.errors import InvalidDataError, ProviderUnavailableError
from music_assistant.constants import (
CONF_CORE,
CONF_PLAYERS,
)
from music_assistant.server.helpers.api import api_command
from music_assistant.server.helpers.util import load_provider_module
-from music_assistant.server.models.player_provider import PlayerProvider
if TYPE_CHECKING:
import asyncio
data=config,
)
# signal update to the player manager
- player = self.mass.players.get(config.player_id)
- with suppress(PlayerUnavailableError, AttributeError, KeyError):
- if config.enabled:
- player_prov = self.mass.players.get_player_provider(player_id)
- await player_prov.poll_player(player_id)
- player.enabled = config.enabled
- self.mass.players.update(config.player_id, force_update=True)
-
- # signal player provider that the config changed
- with suppress(PlayerUnavailableError):
- if provider := self.mass.get_provider(config.provider):
- provider.on_player_config_changed(config, changed_keys)
- # if the player was playing, restart playback
- if player and player.state == PlayerState.PLAYING:
- self.mass.create_task(self.mass.player_queues.resume(player.active_source))
+ self.mass.players.on_player_config_changed(config, changed_keys)
# return full player config (just in case)
return await self.get_player_config(player_id)
msg = f"Player {player_id} does not exist"
raise KeyError(msg)
self.remove(conf_key)
- if (player := self.mass.players.get(player_id)) and player.available:
- player.enabled = False
- self.mass.players.update(player_id, force_update=True)
- if provider := self.mass.get_provider(existing["provider"]):
- assert isinstance(provider, PlayerProvider)
- provider.on_player_config_removed(player_id)
- if not player:
- self.mass.signal_event(EventType.PLAYER_REMOVED, player_id)
+ # signal update to the player manager
+ self.mass.players.on_player_config_removed(player_id)
def create_default_player_config(
self,
def set_shuffle(self, queue_id: str, shuffle_enabled: bool) -> None:
"""Configure shuffle setting on the the queue."""
# always fetch the underlying player so we can raise early if its not available
- player = self.mass.players.get(queue_id, True)
- if player.announcement_in_progress:
+ queue_player = self.mass.players.get(queue_id, True)
+ if queue_player.announcement_in_progress:
self.logger.warning("Ignore queue command: An announcement is in progress")
return
queue = self._queues[queue_id]
def set_repeat(self, queue_id: str, repeat_mode: RepeatMode) -> None:
"""Configure repeat setting on the the queue."""
# always fetch the underlying player so we can raise early if its not available
- player = self.mass.players.get(queue_id, True)
- if player.announcement_in_progress:
+ queue_player = self.mass.players.get(queue_id, True)
+ if queue_player.announcement_in_progress:
self.logger.warning("Ignore queue command: An announcement is in progress")
return
queue = self._queues[queue_id]
# ruff: noqa: PLR0915,PLR0912
queue = self._queues[queue_id]
# always fetch the underlying player so we can raise early if its not available
- player = self.mass.players.get(queue_id, True)
- if player.announcement_in_progress:
+ queue_player = self.mass.players.get(queue_id, True)
+ if queue_player.announcement_in_progress:
self.logger.warning("Ignore queue command: An announcement is in progress")
return
- pos_shift: move item to top of queue as next item if 0.
"""
# always fetch the underlying player so we can raise early if its not available
- player = self.mass.players.get(queue_id, True)
- if player.announcement_in_progress:
+ queue_player = self.mass.players.get(queue_id, True)
+ if queue_player.announcement_in_progress:
self.logger.warning("Ignore queue command: An announcement is in progress")
return
queue = self._queues[queue_id]
def delete_item(self, queue_id: str, item_id_or_index: int | str) -> None:
"""Delete item (by id or index) from the queue."""
# always fetch the underlying player so we can raise early if its not available
- player = self.mass.players.get(queue_id, True)
- if player.announcement_in_progress:
+ queue_player = self.mass.players.get(queue_id, True)
+ if queue_player.announcement_in_progress:
self.logger.warning("Ignore queue command: An announcement is in progress")
return
if isinstance(item_id_or_index, str):
def clear(self, queue_id: str) -> None:
"""Clear all items in the queue."""
# always fetch the underlying player so we can raise early if its not available
- player = self.mass.players.get(queue_id, True)
- if player.announcement_in_progress:
+ queue_player = self.mass.players.get(queue_id, True)
+ if queue_player.announcement_in_progress:
self.logger.warning("Ignore queue command: An announcement is in progress")
return
queue = self._queues[queue_id]
if queue_player.announcement_in_progress:
self.logger.warning("Ignore queue command: An announcement is in progress")
return
- if (queue := self._queues.get(queue_id)) and queue.state == PlayerState.PAUSED:
+ if (
+ (queue := self._queues.get(queue_id))
+ and queue_player.powered
+ and queue.state == PlayerState.PAUSED
+ ):
# forward the actual command to the player controller
await self.mass.players.cmd_play(queue_id, skip_forward=True)
else:
import asyncio
import functools
import time
+from collections.abc import Iterable
+from contextlib import suppress
from typing import TYPE_CHECKING, Any, Concatenate, ParamSpec, TypeVar, cast
+import shortuuid
+
from music_assistant.common.helpers.util import get_changed_values
from music_assistant.common.models.config_entries import (
CONF_ENTRY_ANNOUNCE_VOLUME,
CONF_ENTRY_ANNOUNCE_VOLUME_STRATEGY,
CONF_ENTRY_PLAYER_ICON,
CONF_ENTRY_PLAYER_ICON_GROUP,
+ PlayerConfig,
)
from music_assistant.common.models.enums import (
EventType,
UnsupportedFeaturedException,
)
from music_assistant.common.models.media_items import UniqueList
-from music_assistant.common.models.player import Player, PlayerMedia
+from music_assistant.common.models.player import DeviceInfo, Player, PlayerMedia
from music_assistant.constants import (
CONF_AUTO_PLAY,
CONF_GROUP_MEMBERS,
)
from music_assistant.server.helpers.api import api_command
from music_assistant.server.helpers.tags import parse_tags
+from music_assistant.server.helpers.throttle_retry import Throttler
from music_assistant.server.helpers.util import TaskManager
from music_assistant.server.models.core_controller import CoreController
from music_assistant.server.models.player_provider import PlayerProvider
)
self.manifest.icon = "speaker-multiple"
self._poll_task: asyncio.Task | None = None
- self._player_locks: dict[str, asyncio.Lock] = {}
+ self._player_throttlers: dict[str, Throttler] = {}
async def setup(self, config: CoreConfig) -> None:
"""Async initialize of module."""
if not skip_forward and player.active_source == player_id:
await self.mass.player_queues.stop(player_id)
return
+ # handle syncgroup: redirect to syncgroup-leader if needed
+ if player_id.startswith(SYNCGROUP_PREFIX):
+ if sync_leader := self.get_sync_leader(player):
+ await self.cmd_stop(sync_leader.player_id)
+ return
if player_provider := self.get_player_provider(player_id):
await player_provider.cmd_stop(player_id)
if not skip_forward and player.active_source == player_id:
await self.mass.player_queues.play(player_id)
return
+ # handle syncgroup: redirect to syncgroup-leader if needed
+ if player_id.startswith(SYNCGROUP_PREFIX):
+ if sync_leader := self.get_sync_leader(player):
+ await self.cmd_play(sync_leader.player_id)
+ return
player_provider = self.get_player_provider(player_id)
- async with self._player_locks[player_id]:
+ async with self._player_throttlers[player_id]:
await player_provider.cmd_play(player_id)
@api_command("players/cmd/pause")
# if player does not support pause, we need to send stop
await self.cmd_stop(player_id)
return
+ # handle syncgroup: redirect to syncgroup-leader if needed
+ if player_id.startswith(SYNCGROUP_PREFIX):
+ if sync_leader := self.get_sync_leader(player):
+ await self.cmd_pause(sync_leader.player_id)
+ return
player_provider = self.get_player_provider(player_id)
await player_provider.cmd_pause(player_id)
if PlayerFeature.POWER in player.supported_features:
# forward to player provider
player_provider = self.get_player_provider(player_id)
- async with self._player_locks[player_id]:
+ async with self._player_throttlers[player_id]:
await player_provider.cmd_power(player_id, powered)
else:
# allow the stop command to process and prevent race conditions
msg = f"Player {player.display_name} does not support volume_set"
raise UnsupportedFeaturedException(msg)
player_provider = self.get_player_provider(player_id)
- async with self._player_locks[player_id]:
+ async with self._player_throttlers[player_id]:
await player_provider.cmd_volume_set(player_id, volume_level)
@api_command("players/cmd/volume_up")
@api_command("players/cmd/group_power")
async def cmd_group_power(self, player_id: str, power: bool) -> None:
- """Handle power command for a SyncGroup."""
+ """Handle power command for a (Sync/Player)Group."""
group_player = self.get(player_id, True)
if group_player.powered == power:
for member in self.iter_group_members(group_player, only_powered=True):
any_member_powered = True
if power:
+ if member.state in (PlayerState.PLAYING, PlayerState.PAUSED):
+ # stop playing existing content on member if we start the group player
+ tg.create_task(self.cmd_stop(member.player_id))
# set active source to group player if the group (is going to be) powered
- member.active_group = group_player.player_id
+ member.active_group = group_player.active_group
member.active_source = group_player.active_source
+ self.update(member.player_id, skip_forward=True)
else:
# turn off child player when group turns off
tg.create_task(self.cmd_power(member.player_id, False))
+ # reset active source on player
member.active_source = None
member.active_group = None
+ self.update(member.player_id, skip_forward=True)
# edge case: group turned on but no members are powered, power them all!
if not any_member_powered and power:
for member in self.iter_group_members(group_player, only_powered=False):
msg = f"Player {player.display_name} does not support muting"
raise UnsupportedFeaturedException(msg)
player_provider = self.get_player_provider(player_id)
- async with self._player_locks[player_id]:
+ async with self._player_throttlers[player_id]:
await player_provider.cmd_volume_mute(player_id, muted)
@api_command("players/cmd/seek")
- player_id: player_id of the player to handle the command.
- media: The Media that needs to be played on the player.
"""
+ # handle syncgroup: redirect to syncgroup-leader if needed
if player_id.startswith(SYNCGROUP_PREFIX):
- # redirect to syncgroup-leader if needed
await self.cmd_group_power(player_id, True)
group_player = self.get(player_id, True)
if sync_leader := self.get_sync_leader(group_player):
await self.play_media(sync_leader.player_id, media=media)
group_player.state = PlayerState.PLAYING
return
+ # power on the player if needed
+ player = self.get(player_id, True)
+ if not player.powered:
+ await self.cmd_power(player_id, True)
player_prov = self.mass.players.get_player_provider(player_id)
await player_prov.play_media(
player_id=player_id,
)
return
player_prov = self.mass.players.get_player_provider(player_id)
- async with self._player_locks[player_id]:
+ async with self._player_throttlers[player_id]:
await player_prov.enqueue_next_media(player_id=player_id, media=media)
@api_command("players/cmd/sync")
# forward command to the player provider after all (base) sanity checks
player_provider = self.get_player_provider(target_player)
- async with self._player_locks[target_player]:
+ async with self._player_throttlers[target_player]:
await player_provider.cmd_sync_many(target_player, child_player_ids)
@api_command("players/cmd/unsync_many")
player_provider = self.get_player_provider(final_player_ids[0])
await player_provider.cmd_unsync_many(final_player_ids)
- @api_command("players/create_group")
- async def create_group(self, provider: str, name: str, members: list[str]) -> Player:
- """Create new Player/Sync Group on given PlayerProvider with name and members.
-
- - provider: provider domain or instance id to create the new group on.
- - name: Name for the new group to create.
- - members: A list of player_id's that should be part of this group.
-
- Returns the newly created player on success.
- NOTE: Fails if the given provider does not support creating new groups
- or members are given that can not be handled by the provider.
- """
- # perform basic checks
- if (player_prov := self.mass.get_provider(provider)) is None:
- msg = f"Provider {provider} is not available!"
- raise ProviderUnavailableError(msg)
- if ProviderFeature.PLAYER_GROUP_CREATE in player_prov.supported_features:
- # Provider supports group create feature: forward request to provider.
- # NOTE: The provider is itself responsible for
- # checking if the members can be used for grouping.
- return await player_prov.create_group(name, members=members)
- msg = f"Provider {player_prov.name} does not support creating groups"
- raise UnsupportedFeaturedException(msg)
-
def set(self, player: Player) -> None:
"""Set/Update player details on the controller."""
if player.player_id not in self._players:
# register playerqueue for this player
self.mass.create_task(self.mass.player_queues.on_player_register(player))
- # register lock for this player
- self._player_locks[player_id] = asyncio.Lock()
+ # register throttler for this player
+ self._player_throttlers[player_id] = Throttler(1, 0.2)
self._players[player_id] = player
player = self._players[player_id]
# calculate active group and active source
player.active_group = self._get_active_player_group(player)
- player.active_source = self._get_active_source(player)
+ if player.active_source is None:
+ player.active_source = self._get_active_source(player)
player.volume_level = player.volume_level or 0 # guard for None volume
# correct group_members if needed
if player.group_childs == {player.player_id}:
or player.name
or player.player_id
)
- if (
- not player.powered
- and player.state == PlayerState.PLAYING
- and PlayerFeature.POWER not in player.supported_features
- and player.active_source == player_id
- ):
- # mark player as powered if its playing
- # could happen for players that do not officially support power commands
- player.powered = True
player.hidden = self.mass.config.get_raw_player_config_value(
player.player_id, CONF_HIDE_PLAYER, False
)
if player.type in (PlayerType.GROUP, PlayerType.SYNC_GROUP)
else CONF_ENTRY_PLAYER_ICON.default_value,
)
- # handle syncgroup - get attributes from first player that has this group as source
+ # handle syncgroup - get attributes from sync leader
if player.player_id.startswith(SYNCGROUP_PREFIX):
- if player.powered and (sync_leader := self.get_sync_leader(player)):
+ sync_leader = self.get_sync_leader(player)
+ if sync_leader and sync_leader.active_source == player.active_source:
player.state = sync_leader.state
- player.current_item_id = sync_leader.current_item_id
+ player.active_source = sync_leader.active_source
+ player.current_media = sync_leader.current_media
player.elapsed_time = sync_leader.elapsed_time
player.elapsed_time_last_updated = sync_leader.elapsed_time_last_updated
else:
player.state = PlayerState.IDLE
+ player.active_source = player.player_id
# basic throttle: do not send state changed events if player did not actually change
prev_state = self._prev_states.get(player_id, {})
def _check_redirect(self, player_id: str) -> str:
"""Check if playback related command should be redirected."""
player = self.get(player_id, True)
- if player_id.startswith(SYNCGROUP_PREFIX) and (sync_leader := self.get_sync_leader(player)):
- return sync_leader.player_id
if player.synced_to:
sync_leader = self.get(player.synced_to, True)
self.logger.warning(
self.mass.loop.call_soon(self.update, player_id)
await asyncio.sleep(1)
+ def on_player_config_changed(self, config: PlayerConfig, changed_keys: set[str]) -> None:
+ """Call (by config manager) when the configuration of a player changes."""
+ player = self.mass.players.get(config.player_id)
+ if config.enabled:
+ player_prov = self.mass.players.get_player_provider(config.player_id)
+ self.mass.create_task(player_prov.poll_player(config.player_id))
+ player.enabled = config.enabled
+ self.mass.players.update(config.player_id, force_update=True)
+ if config.player_id.startswith(SYNCGROUP_PREFIX):
+ # handle syncgroup
+ if f"values/{CONF_GROUP_MEMBERS}" in changed_keys:
+ player = self.mass.players.get(config.player_id)
+ player.group_childs = config.get_value(CONF_GROUP_MEMBERS)
+ self.mass.players.update(config.player_id)
+ else:
+ # signal player provider that the config changed
+ with suppress(PlayerUnavailableError):
+ if provider := self.mass.get_provider(config.provider):
+ provider.on_player_config_changed(config, changed_keys)
+ # if the player was playing, restart playback
+ if player and player.state == PlayerState.PLAYING:
+ self.mass.create_task(self.mass.player_queues.resume(player.active_source))
+
+ def on_player_config_removed(self, player_id: str) -> None:
+ """Call (by config manager) when the configuration of a player is removed."""
+ if (player := self.mass.players.get(player_id)) and player.available:
+ player.enabled = False
+ self.mass.players.update(player_id, force_update=True)
+ if player and (provider := self.mass.get_provider(player.provider)):
+ assert isinstance(provider, PlayerProvider)
+ provider.on_player_config_removed(player_id)
+ if not player:
+ self.mass.signal_event(EventType.PLAYER_REMOVED, player_id)
+
# Syncgroup specific functions/helpers
+ @api_command("players/create_syncgroup")
+ async def create_syncgroup(self, name: str, members: list[str]) -> Player:
+ """Create a new Sync Group with name and members.
+
+ - name: Name for the new group to create.
+ - members: A list of player_id's that should be part of this group.
+
+ Returns the newly created player on success.
+ """
+ base_player = self.get(members[0], True)
+ # perform basic checks
+ if (player_prov := self.mass.get_provider(base_player.provider)) is None:
+ msg = f"Provider {base_player.provider} is not available!"
+ raise ProviderUnavailableError(msg)
+ if ProviderFeature.SYNC_PLAYERS not in player_prov.supported_features:
+ msg = f"Provider {player_prov.name} does not support creating groups"
+ raise UnsupportedFeaturedException(msg)
+ new_group_id = f"{SYNCGROUP_PREFIX}{shortuuid.random(8).lower()}"
+ # cleanup list, just in case the frontend sends some garbage
+ members = [
+ x
+ for x in members
+ if (x in base_player.can_sync_with or x == base_player.player_id)
+ and not x.startswith(SYNCGROUP_PREFIX)
+ ]
+ # create default config with the user chosen name
+ self.mass.config.create_default_player_config(
+ new_group_id,
+ player_prov.instance_id,
+ name=name,
+ enabled=True,
+ values={CONF_GROUP_MEMBERS: members},
+ )
+ return self.register_syncgroup(group_player_id=new_group_id, name=name, members=members)
+
+ def register_syncgroup(self, group_player_id: str, name: str, members: Iterable[str]) -> Player:
+ """Register a (virtual/fake) syncgroup player."""
+ # extract player features from first/random player
+ for member in members:
+ if first_player := self.mass.players.get(member):
+ break
+ else:
+ # edge case: no child player is (yet) available; postpone register
+ return None
+ player_prov = self.mass.get_provider(first_player.provider)
+ if TYPE_CHECKING:
+ assert player_prov
+ player = Player(
+ player_id=group_player_id,
+ provider=player_prov.instance_id,
+ type=PlayerType.SYNC_GROUP,
+ name=name,
+ available=True,
+ powered=False,
+ device_info=DeviceInfo(model="SyncGroup", manufacturer=player_prov.name),
+ supported_features=first_player.supported_features,
+ group_childs=set(members),
+ active_source=group_player_id,
+ )
+ self.mass.players.register_or_update(player)
+ return player
+
def get_sync_leader(self, group_player: Player) -> Player | None:
"""Get the active sync leader player for a syncgroup or synced player."""
if group_player.synced_to:
# should not happen but just in case...
return group_player.synced_to
+ # current sync leader: return the (first/only) player that has group childs
for child_player in self.iter_group_members(
- group_player, only_powered=True, only_playing=False
+ group_player, only_powered=False, only_playing=False
):
- if child_player.synced_to and child_player.synced_to in group_player.group_childs:
- return self.get(child_player.synced_to)
- elif child_player.synced_to:
- # player is already synced to a member outside this group ?!
- continue
- elif child_player.group_childs:
+ if child_player.group_childs:
return child_player
# select new sync leader: return the first playing player
for child_player in self.iter_group_members(
group_player, only_powered=True, only_playing=False
):
return child_player
+ # fallback select new sync leader: simply return the first player
+ for child_player in self.iter_group_members(
+ group_player, only_powered=False, only_playing=False
+ ):
+ return child_player
return None
async def sync_syncgroup(self, player_id: str) -> None:
"""Sync all (possible) players of a syncgroup."""
group_player = self.get(player_id, True)
- sync_leader = self.get_sync_leader(group_player)
+ if not (sync_leader := self.get_sync_leader(group_player)):
+ raise RuntimeError("No sync leader found for syncgroup")
for member in self.iter_group_members(group_player, only_powered=True):
if not member.can_sync_with:
continue
- if not sync_leader:
- # elect the first member as the sync leader if we do not have one
- sync_leader = member
- continue
if sync_leader.player_id == member.player_id:
continue
await self.cmd_sync(member.player_id, sync_leader.player_id)
for player_config in player_configs:
if not player_config.player_id.startswith(SYNCGROUP_PREFIX):
continue
- if not (player_prov := self.mass.get_provider(player_config.provider)):
- continue
members = self.mass.config.get_raw_player_config_value(
player_config.player_id, CONF_GROUP_MEMBERS
)
- player_prov.register_syncgroup(
+ self.register_syncgroup(
group_player_id=player_config.player_id,
name=player_config.name or player_config.default_name,
members=members,
if self.input_format.content_type == ContentType.UNKNOWN:
content_type_raw = line.split(": Audio: ")[1].split(" ")[0]
content_type = ContentType.try_parse(content_type_raw)
- self.logger.info(
+ self.logger.debug(
"Detected (input) content type: %s (%s)", content_type, content_type_raw
)
self.input_format.content_type = content_type
from __future__ import annotations
from abc import abstractmethod
-from collections.abc import Iterable
-
-import shortuuid
from music_assistant.common.models.config_entries import (
CONF_ENTRY_ANNOUNCE_VOLUME,
ConfigValueOption,
PlayerConfig,
)
-from music_assistant.common.models.enums import (
- ConfigEntryType,
- PlayerFeature,
- PlayerState,
- PlayerType,
- ProviderFeature,
-)
-from music_assistant.common.models.player import DeviceInfo, Player, PlayerMedia
-from music_assistant.constants import CONF_GROUP_MEMBERS, CONF_GROUP_PLAYERS, SYNCGROUP_PREFIX
+from music_assistant.common.models.enums import ConfigEntryType, PlayerState, ProviderFeature
+from music_assistant.common.models.player import Player, PlayerMedia
+from music_assistant.constants import CONF_GROUP_MEMBERS, SYNCGROUP_PREFIX
from .provider import Provider
options=tuple(
ConfigValueOption(x.display_name, x.player_id)
for x in self.mass.players.all(True, False)
- if x.player_id != player_id and x.provider == self.instance_id
+ if x.player_id != player_id
+ and x.provider == self.instance_id
+ and not x.player_id.startswith(SYNCGROUP_PREFIX)
),
description="Select all players you want to be part of this group",
multi_value=True,
def on_player_config_changed(self, config: PlayerConfig, changed_keys: set[str]) -> None:
"""Call (by config manager) when the configuration of a player changes."""
- if f"values/{CONF_GROUP_MEMBERS}" in changed_keys:
- player = self.mass.players.get(config.player_id)
- player.group_childs = config.get_value(CONF_GROUP_MEMBERS)
- self.mass.players.update(config.player_id)
def on_player_config_removed(self, player_id: str) -> None:
"""Call (by config manager) when the configuration of a player is removed."""
- # ensure that any group players get removed
- group_players = self.mass.config.get_raw_provider_config_value(
- self.instance_id, CONF_GROUP_PLAYERS, {}
- )
- if player_id in group_players:
- del group_players[player_id]
- self.mass.config.set_raw_provider_config_value(
- self.instance_id, CONF_GROUP_PLAYERS, group_players
- )
@abstractmethod
async def cmd_stop(self, player_id: str) -> None:
# default implementation, simply call the cmd_sync for all player_ids
await self.cmd_unsync(player_id)
- async def create_group(self, name: str, members: list[str]) -> Player:
- """Create new PlayerGroup on this provider.
-
- Create a new SyncGroup (or PlayerGroup) with given name and members.
-
- - name: Name for the new group to create.
- - members: A list of player_id's that should be part of this group.
- """
- # should only be called for providers with PLAYER_GROUP_CREATE feature set.
- if ProviderFeature.PLAYER_GROUP_CREATE not in self.supported_features:
- raise NotImplementedError
- # default implementation: create syncgroup
- new_group_id = f"{SYNCGROUP_PREFIX}{shortuuid.random(8).lower()}"
- # cleanup list, filter groups (should be handled by frontend, but just in case)
- members = [
- x.player_id
- for x in self.players
- if x.player_id in members
- if not x.player_id.startswith(SYNCGROUP_PREFIX)
- if x.provider == self.instance_id and PlayerFeature.SYNC in x.supported_features
- ]
- # create default config with the user chosen name
- self.mass.config.create_default_player_config(
- new_group_id,
- self.instance_id,
- name=name,
- enabled=True,
- values={CONF_GROUP_MEMBERS: members},
- )
- return self.register_syncgroup(group_player_id=new_group_id, name=name, members=members)
-
async def poll_player(self, player_id: str) -> None:
"""Poll player for state updates.
def on_child_power(self, player_id: str, child_player_id: str, new_power: bool) -> None:
"""
- Call when a power command was executed on one of the child players of a Sync group.
+ Call when a power command was executed on one of the child players of a Sync/Player group.
This is used to handle special actions such as (re)syncing.
"""
self.cmd_sync(child_player_id, sync_leader.player_id),
)
- def register_syncgroup(self, group_player_id: str, name: str, members: Iterable[str]) -> Player:
- """Register a (virtual/fake) syncgroup player."""
- # extract player features from first/random player
- for member in members:
- if first_player := self.mass.players.get(member):
- break
- else:
- # edge case: no child player is (yet) available; postpone register
- return None
- player = Player(
- player_id=group_player_id,
- provider=self.instance_id,
- type=PlayerType.SYNC_GROUP,
- name=name,
- available=True,
- powered=False,
- device_info=DeviceInfo(model="SyncGroup", manufacturer=self.name),
- supported_features=first_player.supported_features,
- group_childs=set(members),
- active_source=group_player_id,
- )
- self.mass.players.register_or_update(player)
- return player
-
# DO NOT OVERRIDE BELOW
@property
@property
def supported_features(self) -> tuple[ProviderFeature, ...]:
"""Return the features supported by this Provider."""
- return (ProviderFeature.SYNC_PLAYERS, ProviderFeature.PLAYER_GROUP_CREATE)
+ return (ProviderFeature.SYNC_PLAYERS,)
async def handle_async_init(self) -> None:
"""Handle async initialization of the provider."""
@property
def supported_features(self) -> tuple[ProviderFeature, ...]:
"""Return the features supported by this Provider."""
- return (ProviderFeature.SYNC_PLAYERS, ProviderFeature.PLAYER_GROUP_CREATE)
+ return (ProviderFeature.SYNC_PLAYERS,)
async def handle_async_init(self) -> None:
"""Handle async initialization of the provider."""
@property
def supported_features(self) -> tuple[ProviderFeature, ...]:
"""Return the features supported by this Provider."""
- return (ProviderFeature.SYNC_PLAYERS, ProviderFeature.PLAYER_GROUP_CREATE)
+ return (ProviderFeature.SYNC_PLAYERS,)
async def handle_async_init(self) -> None:
"""Handle async initialization of the provider."""
self.mass.call_later(5, self.cmd_sync_many(player_id, group_childs))
return
+ if media.queue_id.startswith("ugp_"):
+ # TODO - this needs some more work
+ raise NotImplementedError("Sonos does not support UGP queues yet.")
+
if media.queue_id:
# create a sonos cloud queue and load it
await sonos_player.client.player.group.create_playback_session()
from __future__ import annotations
from time import time
-from typing import TYPE_CHECKING
+from typing import TYPE_CHECKING, Final, cast
import shortuuid
from aiohttp import web
ConfigEntry,
ConfigValueOption,
ConfigValueType,
+ PlayerConfig,
create_sample_rates_config_entry,
)
from music_assistant.common.models.enums import (
UGP_FORMAT = AudioFormat(
content_type=ContentType.from_bit_depth(24), sample_rate=48000, bit_depth=24
)
+UGP_PREFIX = "ugp_"
+CONF_ACTION_CREATE_PLAYER = "create_player"
+CONF_ACTION_CREATE_PLAYER_SAVE = "create_player_save"
CONF_ENTRY_SAMPLE_RATES_UGP = create_sample_rates_config_entry(48000, 24, 48000, 24, True)
+CONF_GROUP_PLAYERS: Final[str] = "group_players"
+CONF_NEW_GROUP_NAME: Final[str] = "name"
+CONF_NEW_GROUP_MEMBERS: Final[list[str]] = "members"
+
+CONFIG_ENTRY_UGP_NOTE = ConfigEntry(
+ key="ugp_note",
+ type=ConfigEntryType.LABEL,
+ label="Please note that although the universal group "
+ "allows you to group any player, it will not enable audio sync "
+ "between players of different ecosystems. It is advised to always use native "
+ "player groups or sync groups when available for your player type(s) and use "
+ "the Universal Group only to group players of different ecosystems.",
+ required=False,
+)
async def setup(
async def get_config_entries(
- mass: MusicAssistant, # noqa: ARG001
- instance_id: str | None = None, # noqa: ARG001
- action: str | None = None, # noqa: ARG001
- values: dict[str, ConfigValueType] | None = None, # noqa: ARG001
+ mass: MusicAssistant,
+ instance_id: str | None = None,
+ action: str | None = None,
+ values: dict[str, ConfigValueType] | None = None,
) -> tuple[ConfigEntry, ...]:
"""
Return Config entries to setup this provider.
action: [optional] action key called from config entries UI.
values: the (intermediate) raw values for config entries sent with the action.
"""
- return ()
+ if not (ugp_provider := mass.get_provider(instance_id)):
+ # UGP provider is not (yet) loaded
+ return ()
+ if TYPE_CHECKING:
+ ugp_provider = cast(UniversalGroupProvider, ugp_provider)
+ if action == CONF_ACTION_CREATE_PLAYER:
+ # create new group player
+ name = values.pop(CONF_NEW_GROUP_NAME)
+ members: list[str] = values.pop(CONF_GROUP_MEMBERS)
+ members = ugp_provider._filter_members(members)
+ await ugp_provider.create_group(name, members)
+ return (
+ ConfigEntry(
+ key="ugp_note",
+ type=ConfigEntryType.LABEL,
+ label=f"Your new Universal Group Player {name} has been created and "
+ "is available in the players list.",
+ required=False,
+ ),
+ )
+ return (
+ ConfigEntry(
+ key="ugp_new",
+ type=ConfigEntryType.LABEL,
+ label="Fill in the details below to create a new Universal Group "
+ "Player and click the 'Create new universal group' button.",
+ required=False,
+ ),
+ ConfigEntry(
+ key=CONF_NEW_GROUP_NAME,
+ type=ConfigEntryType.STRING,
+ label="Name",
+ required=False,
+ ),
+ ConfigEntry(
+ key=CONF_GROUP_MEMBERS,
+ type=ConfigEntryType.STRING,
+ label=CONF_NEW_GROUP_MEMBERS,
+ default_value=[],
+ options=tuple(
+ ConfigValueOption(x.display_name, x.player_id)
+ for x in mass.players.all(True, False)
+ ),
+ multi_value=True,
+ required=False,
+ ),
+ ConfigEntry(
+ key=CONF_ACTION_CREATE_PLAYER,
+ type=ConfigEntryType.ACTION,
+ label="Create new Universal Player Group",
+ required=False,
+ ),
+ CONFIG_ENTRY_UGP_NOTE,
+ )
class UniversalGroupProvider(PlayerProvider):
@property
def supported_features(self) -> tuple[ProviderFeature, ...]:
"""Return the features supported by this Provider."""
- return (ProviderFeature.PLAYER_GROUP_CREATE,)
+ return ()
def __init__(
self, mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
options=tuple(
ConfigValueOption(x.display_name, x.player_id)
for x in self.mass.players.all(True, False)
- if x.player_id != player_id
+ if x.player_id != player_id and not x.player_id.startswith(UGP_PREFIX)
),
description="Select all players you want to be part of this universal group",
multi_value=True,
required=True,
),
- ConfigEntry(
- key="ugp_note",
- type=ConfigEntryType.ALERT,
- label="Please note that although the universal group "
- "allows you to group any player, it will not enable audio sync "
- "between players of different ecosystems.",
- required=False,
- ),
+ CONFIG_ENTRY_UGP_NOTE,
CONF_ENTRY_CROSSFADE,
CONF_ENTRY_CROSSFADE_DURATION,
CONF_ENTRY_SAMPLE_RATES_UGP,
)
+ def on_player_config_changed(self, config: PlayerConfig, changed_keys: set[str]) -> None:
+ """Call (by config manager) when the configuration of a player changes."""
+ if f"values/{CONF_GROUP_MEMBERS}" in changed_keys:
+ player = self.mass.players.get(config.player_id)
+ members = config.get_value(CONF_GROUP_MEMBERS)
+ # ensure we filter invalid members
+ members = self._filter_members(members)
+ player.group_childs = members
+ self.mass.config.set_raw_player_config_value(
+ config.player_id, CONF_GROUP_PLAYERS, members
+ )
+ self.mass.players.update(config.player_id)
+
+ def on_player_config_removed(self, player_id: str) -> None:
+ """Call (by config manager) when the configuration of a player is removed."""
+ # ensure that any group players get removed
+ group_players = self.mass.config.get_raw_provider_config_value(
+ self.instance_id, CONF_GROUP_PLAYERS, {}
+ )
+ if player_id in group_players:
+ del group_players[player_id]
+ self.mass.config.set_raw_provider_config_value(
+ self.instance_id, CONF_GROUP_PLAYERS, group_players
+ )
+
async def cmd_stop(self, player_id: str) -> None:
"""Send STOP command to given player."""
group_player = self.mass.players.get(player_id)
"""Send PLAY command to given player."""
async def cmd_power(self, player_id: str, powered: bool) -> None:
- """Send POWER command to given player."""
- await self.mass.players.cmd_group_power(player_id, powered)
+ """Send POWER command to given UGP group player."""
+ group_player = self.mass.players.get(player_id, True)
+
+ if group_player.powered == powered:
+ return # nothing to do
+
+ # make sure to update the group power state
+ group_player.powered = powered
+
+ any_member_powered = False
+ async with TaskManager(self.mass) as tg:
+ for member in self.mass.players.iter_group_members(group_player, only_powered=True):
+ any_member_powered = True
+ if powered:
+ if member.state in (PlayerState.PLAYING, PlayerState.PAUSED):
+ # stop playing existing content on member if we start the group player
+ tg.create_task(self.cmd_stop(member.player_id))
+ # set active source to group player if the group (is going to be) powered
+ member.active_group = group_player.active_group
+ member.active_source = group_player.active_source
+ self.mass.players.update(member.player_id, skip_forward=True)
+ else:
+ # turn off child player when group turns off
+ tg.create_task(self.cmd_power(member.player_id, False))
+ # reset active source on player
+ member.active_source = None
+ member.active_group = None
+ self.mass.players.update(member.player_id, skip_forward=True)
+ # edge case: group turned on but no members are powered, power them all!
+ # TODO: Do we want to make this configurable ?
+ if not any_member_powered and powered:
+ for member in self.mass.players.iter_group_members(
+ group_player, only_powered=False
+ ):
+ tg.create_task(self.cmd_power(member.player_id, True))
+ member.active_group = group_player.player_id
+ member.active_source = group_player.active_source
+
+ self.mass.players.update(player_id)
async def cmd_volume_set(self, player_id: str, volume_level: int) -> None:
"""Send VOLUME_SET command to given player."""
# forward to downstream play_media commands
async with TaskManager(self.mass) as tg:
for member in self.mass.players.iter_group_members(group_player, only_powered=True):
- if member.player_id.startswith(SYNCGROUP_PREFIX):
- member = self.mass.players.get_sync_leader(member) # noqa: PLW2901
- if member is None:
- continue
tg.create_task(
self.mass.players.play_media(
member.player_id,
- name: Name for the new group to create.
- members: A list of player_id's that should be part of this group.
"""
- new_group_id = f"{self.domain}_{shortuuid.random(8).lower()}"
+ new_group_id = f"{UGP_PREFIX}{shortuuid.random(8).lower()}"
# cleanup list, filter groups (should be handled by frontend, but just in case)
- members = [
- x.player_id
- for x in self.mass.players
- if x.player_id in members
- if x.provider != self.instance_id
- ]
+ members = self._filter_members(members)
# create default config with the user chosen name
self.mass.config.create_default_player_config(
new_group_id,
player = Player(
player_id=group_player_id,
provider=self.instance_id,
- type=PlayerType.SYNC_GROUP,
+ type=PlayerType.GROUP,
name=name,
available=True,
powered=False,
)
headers = {
**DEFAULT_STREAM_HEADERS,
- "Content-Type": "faudio/{fmt}",
+ "Content-Type": f"audio/{fmt}",
"Accept-Ranges": "none",
"Cache-Control": "no-cache",
"Connection": "close",
resp.content_length = get_chunksize(output_format, 24 * 3600)
elif http_profile == "chunked":
resp.enable_chunked_encoding()
+
await resp.prepare(request)
# return early if this is not a GET request
break
return resp
+
+ def _filter_members(self, members: list[str]) -> list[str]:
+ """Filter out members that are not valid players."""
+ # cleanup members - filter out impossible choices
+ syncgroup_childs: list[str] = []
+ for member in members:
+ if not member.startswith(SYNCGROUP_PREFIX):
+ continue
+ if syncgroup := self.mass.players.get(member):
+ syncgroup_childs.extend(syncgroup.group_childs)
+ # we filter out other UGP players and syncgroup childs
+ # if their parent is already in the list
+ return [
+ x
+ for x in members
+ if self.mass.players.get(x)
+ and x not in syncgroup_childs
+ and not x.startswith(UGP_PREFIX)
+ ]