"players/cmd/group_volume", player_id=player_id, volume_level=volume_level
)
- async def set_player_group_power(self, player_id: str, power: bool) -> None:
- """Handle power command for a (Sync)Group."""
- await self.client.send_command("players/cmd/group_volume", player_id=player_id, power=power)
-
async def set_player_group_members(self, player_id: str, members: list[str]) -> None:
"""
Update the memberlist of the given PlayerGroup.
player: A regular player.
stereo_pair: Same as player but a dedicated stereo pair of 2 speakers.
- group: A (dedicated) group player or (universal) playergroup.
- sync_group: A group/preset of players that can be synced together.
+ group: A (dedicated) (sync)group player or (universal) playergroup.
"""
PLAYER = "player"
STEREO_PAIR = "stereo_pair"
GROUP = "group"
- SYNC_GROUP = "sync_group"
UNKNOWN = "unknown"
@classmethod
active_source: str | None = None
# active_source: return player_id of the active group for this player (if any)
- # if the player is grouped and a group is active, this will be set to the group's player_id
+ # if the player is grouped and a group is active,
+ # this should be set to the group's player_id by the group player implementation.
active_group: str | None = None
# current_media: return current active/loaded item on the player
# includes metadata if supported by the provider/player
current_media: PlayerMedia | None = None
- # can_sync_with: return tuple of player_ids that can be synced to/with this player
- # usually this is just a list of all player_ids within the playerprovider
- can_sync_with: tuple[str, ...] = field(default=())
-
# synced_to: player_id of the player this player is currently synced to
# also referred to as "sync master"
synced_to: str | None = None
domain: str
name: str
instance_id: str
+ lookup_key: str
supported_features: list[ProviderFeature]
available: bool
icon: str | None = None
CONF_LANGUAGE: Final[str] = "language"
CONF_SAMPLE_RATES: Final[str] = "sample_rates"
CONF_HTTP_PROFILE: Final[str] = "http_profile"
-CONF_SYNC_LEADER: Final[str] = "sync_leader"
CONF_BYPASS_NORMALIZATION_RADIO: Final[str] = "bypass_normalization_radio"
-CONF_PREVENT_SYNC_LEADER_OFF: Final[str] = "prevent_sync_leader_off"
-CONF_SYNCGROUP_DEFAULT_ON: Final[str] = "syncgroup_default_on"
CONF_ENABLE_ICY_METADATA: Final[str] = "enable_icy_metadata"
CONF_VOLUME_NORMALIZATION_RADIO: Final[str] = "volume_normalization_radio"
CONF_VOLUME_NORMALIZATION_TRACKS: Final[str] = "volume_normalization_tracks"
"music",
"player_queues",
)
-SYNCGROUP_PREFIX: Final[str] = "syncgroup_"
VERBOSE_LOG_LEVEL: Final[int] = 5
PROVIDERS_WITH_SHAREABLE_URLS = ("spotify", "qobuz")
ProviderConfig,
)
from music_assistant.common.models.enums import EventType, ProviderType
-from music_assistant.common.models.errors import InvalidDataError, ProviderUnavailableError
+from music_assistant.common.models.errors import InvalidDataError
from music_assistant.constants import (
CONF_CORE,
CONF_PLAYERS,
LOGGER = logging.getLogger(__name__)
DEFAULT_SAVE_DELAY = 5
+BASE_KEYS = ("enabled", "name", "available", "default_name", "provider", "type")
isfile = wrap(os.path.isfile)
remove = wrap(os.remove)
raise KeyError(msg)
if encrypted:
value = self.encrypt_string(value)
- # also update the cached value in the provider itself
- if not (prov := self.mass.get_provider(provider_instance, return_unavailable=True)):
- raise ProviderUnavailableError(provider_instance)
- prov.config.values[key].value = value
+ if key in BASE_KEYS:
+ self.set(f"{CONF_PROVIDERS}/{provider_instance}/{key}", value)
+ return
self.set(f"{CONF_PROVIDERS}/{provider_instance}/values/{key}", value)
+ # also update the cached value in the provider itself
+ if prov := self.mass.get_provider(provider_instance, return_unavailable=True):
+ prov.config.values[key].value = value
def set_raw_core_config_value(self, core_module: str, key: str, value: ConfigValueType) -> None:
"""
# only allow setting raw values if main entry exists
msg = f"Invalid player_id: {player_id}"
raise KeyError(msg)
- self.set(f"{CONF_PLAYERS}/{player_id}/values/{key}", value)
+ if key in BASE_KEYS:
+ self.set(f"{CONF_PLAYERS}/{player_id}/{key}", value)
+ else:
+ self.set(f"{CONF_PLAYERS}/{player_id}/values/{key}", value)
def save(self, immediate: bool = False) -> None:
"""Schedule save of data to disk."""
# we need to restart playback
self.mass.create_task(self.resume(queue_id))
else:
- self.mass.call_later(5, self._enqueue_next(queue, queue.current_index))
+ task_id = f"enqueue_next_{queue_id}"
+ self.mass.call_later(2, self._enqueue_next, queue, queue.current_index, task_id=task_id)
@api_command("player_queues/play_media")
async def play_media(
queue.stream_finished = None
queue.end_of_track_reached = None
# forward the actual command to the player controller
- await self.mass.players.cmd_stop(queue_id, skip_forward=True)
+ await self.mass.players.cmd_stop(queue_id, skip_redirect=True)
@api_command("player_queues/play")
async def play(self, queue_id: str) -> None:
and queue.state == PlayerState.PAUSED
):
# forward the actual command to the player controller
- await self.mass.players.cmd_play(queue_id, skip_forward=True)
+ await self.mass.players.cmd_play(queue_id, skip_redirect=True)
else:
await self.resume(queue_id)
# it has started buffering the given queue item
if not queue.flow_mode:
task_id = f"enqueue_next_{queue_id}"
- self.mass.call_later(2, self._enqueue_next, queue, item_id, task_id=task_id)
+ self.mass.call_later(5, self._enqueue_next, queue, item_id, task_id=task_id)
# Main queue manipulation methods
import asyncio
import functools
import time
-from collections.abc import Iterable
from contextlib import suppress
from typing import TYPE_CHECKING, Any, Concatenate, ParamSpec, TypeVar, cast
-import shortuuid
-
from music_assistant.common.helpers.util import get_changed_values
from music_assistant.common.models.config_entries import (
CONF_ENTRY_ANNOUNCE_VOLUME,
PlayerFeature,
PlayerState,
PlayerType,
- ProviderFeature,
ProviderType,
)
from music_assistant.common.models.errors import (
AlreadyRegisteredError,
PlayerCommandFailed,
PlayerUnavailableError,
- ProviderUnavailableError,
UnsupportedFeaturedException,
)
from music_assistant.common.models.media_items import UniqueList
-from music_assistant.common.models.player import DeviceInfo, Player, PlayerMedia
+from music_assistant.common.models.player import Player, PlayerMedia
from music_assistant.constants import (
CONF_AUTO_PLAY,
- CONF_GROUP_MEMBERS,
CONF_HIDE_PLAYER,
CONF_PLAYERS,
- CONF_PREVENT_SYNC_LEADER_OFF,
- CONF_SYNC_LEADER,
- CONF_SYNCGROUP_DEFAULT_ON,
CONF_TTS_PRE_ANNOUNCE,
- SYNCGROUP_PREFIX,
)
from music_assistant.server.helpers.api import api_command
from music_assistant.server.helpers.tags import parse_tags
@api_command("players/cmd/stop")
@handle_player_command
- async def cmd_stop(self, player_id: str, skip_forward: bool = False) -> None:
+ async def cmd_stop(self, player_id: str, skip_redirect: bool = False) -> None:
"""Send STOP command to given player.
- player_id: player_id of the player to handle the command.
"""
- player_id = self._check_redirect(player_id)
- player = self.get(player_id, True)
+ player = self._get_player_with_redirect(player_id, skip_redirect=skip_redirect)
# Redirect to queue controller if active (as it also handles some other logic)
- # Note that skip_forward will be set by the queue controller
+ # Note that skip_redirect will be set by the queue controller
# to prevent an endless loop.
- if not skip_forward and player.active_source == player_id:
+ if not skip_redirect and player.active_source == player_id:
await self.mass.player_queues.stop(player_id)
return
- # handle syncgroup: redirect to syncgroup-leader if needed
- if player_id.startswith(SYNCGROUP_PREFIX):
- if sync_leader := self.get_sync_leader(player):
- await self.cmd_stop(sync_leader.player_id)
- return
if player_provider := self.get_player_provider(player_id):
await player_provider.cmd_stop(player_id)
@api_command("players/cmd/play")
@handle_player_command
- async def cmd_play(self, player_id: str, skip_forward: bool = False) -> None:
+ async def cmd_play(self, player_id: str, skip_redirect: bool = False) -> None:
"""Send PLAY (unpause) command to given player.
- player_id: player_id of the player to handle the command.
"""
- player_id = self._check_redirect(player_id)
- player = self.get(player_id, True)
+ player = self._get_player_with_redirect(player_id, skip_redirect=skip_redirect)
if player.announcement_in_progress:
self.logger.warning("Ignore queue command: An announcement is in progress")
return
# Redirect to queue controller if active (as it also handles some other logic)
- # Note that skip_forward will be set by the queue controller
+ # Note that skip_redirect will be set by the queue controller
# to prevent an endless loop.
- if not skip_forward and player.active_source == player_id:
+ if not skip_redirect and player.active_source == player_id:
await self.mass.player_queues.play(player_id)
return
- # handle syncgroup: redirect to syncgroup-leader if needed
- if player_id.startswith(SYNCGROUP_PREFIX):
- if sync_leader := self.get_sync_leader(player):
- await self.cmd_play(sync_leader.player_id)
- return
player_provider = self.get_player_provider(player_id)
async with self._player_throttlers[player_id]:
await player_provider.cmd_play(player_id)
@api_command("players/cmd/pause")
@handle_player_command
- async def cmd_pause(self, player_id: str) -> None:
+ async def cmd_pause(self, player_id: str, skip_redirect: bool = False) -> None:
"""Send PAUSE command to given player.
- player_id: player_id of the player to handle the command.
"""
- player_id = self._check_redirect(player_id)
- player = self.get(player_id, True)
+ player = self._get_player_with_redirect(player_id, skip_redirect=skip_redirect)
if player.announcement_in_progress:
self.logger.warning("Ignore command: An announcement is in progress")
return
# if player does not support pause, we need to send stop
await self.cmd_stop(player_id)
return
- # handle syncgroup: redirect to syncgroup-leader if needed
- if player_id.startswith(SYNCGROUP_PREFIX):
- if sync_leader := self.get_sync_leader(player):
- await self.cmd_pause(sync_leader.player_id)
- return
player_provider = self.get_player_provider(player_id)
await player_provider.cmd_pause(player_id)
- player_id: player_id of the player to handle the command.
"""
- player = self.get(player_id, True)
+ player = self._get_player_with_redirect(player_id, skip_redirect=False)
if player.state == PlayerState.PLAYING:
await self.cmd_pause(player_id)
else:
@api_command("players/cmd/power")
@handle_player_command
- async def cmd_power(self, player_id: str, powered: bool) -> None:
+ async def cmd_power(self, player_id: str, powered: bool, skip_redirect: bool = False) -> None:
"""Send POWER command to given player.
- player_id: player_id of the player to handle the command.
- powered: bool if player should be powered on or off.
"""
- # forward to syncgroup if needed
- if player_id.startswith(SYNCGROUP_PREFIX):
- await self.cmd_group_power(player_id, powered)
- return
-
player = self.get(player_id, True)
if player.powered == powered:
return # nothing to do
- # grab info about any groups this player is active in
- # to handle actions on the group when a (sync)group child turns on/off
- if active_group_player_id := self._get_active_player_group(player):
- active_group_player = self.get(active_group_player_id)
- group_player_state = active_group_player.state
- if not powered and active_group_player.type == PlayerType.SYNC_GROUP:
- # handle 'prevent sync leader off' feature
- powered_members = list(self.iter_group_members(active_group_player, True))
- sync_leader = self.get_sync_leader(active_group_player)
- if (
- len(powered_members) > 1
- and (sync_leader == player)
- and self.mass.config.get_raw_player_config_value(
- active_group_player_id, CONF_PREVENT_SYNC_LEADER_OFF, False
- )
- ):
- raise PlayerCommandFailed(
- f"{player.display_name} is the sync "
- "leader of a syncgroup and cannot be turned off"
+ # redirect to active group player if player is group child
+ if not skip_redirect and player.active_group:
+ if group_player_provider := self.get_player_provider(player.active_group):
+ async with self._player_throttlers[player.active_group]:
+ await group_player_provider.on_group_child_power(
+ player.active_group, player_id, powered
)
- else:
- active_group_player = None
+ return
# always stop player at power off
if (
not powered
- and player.powered
- and player.state in (PlayerState.PLAYING, PlayerState.PAUSED)
and not player.synced_to
+ and player.state in (PlayerState.PLAYING, PlayerState.PAUSED)
):
await self.cmd_stop(player_id)
# unsync player at power off
- if not powered:
- if player.synced_to or player.group_childs:
- await self.cmd_unsync(player_id)
+ if not powered and (
+ player.synced_to or (player.type == PlayerType.PLAYER and player.group_childs)
+ ):
+ await self.cmd_unsync(player_id)
+ # elif not powered and player.type == PlayerType.PLAYER and player.group_childs:
+ # async with TaskManager(self.mass) as tg:
+ # for member in self.iter_group_members(player, True):
+ # tg.create_task(self.cmd_power(member.player_id, False))
+ # handle actual power command
if PlayerFeature.POWER in player.supported_features:
# player supports power command: forward to player provider
player_provider = self.get_player_provider(player_id)
# always optimistically set the power state to update the UI
# as fast as possible and prevent race conditions
player.powered = powered
- # reset active source
- player.active_source = None
+ # reset active source on power off
+ if not powered:
+ player.active_source = None
self.update(player_id)
- # handle 'auto play on power on' feature
+ # handle 'auto play on power on' feature
if (
- not active_group_player
+ not player.active_group
and powered
and self.mass.config.get_raw_player_config_value(player_id, CONF_AUTO_PLAY, False)
and player.active_source in (None, player_id)
):
await self.mass.player_queues.resume(player_id)
- # handle group player actions
- if not (active_group_player and active_group_player.powered):
- return
-
- # run actions suitable for every type of group player
- powered_childs = list(self.mass.players.iter_group_members(active_group_player, True))
- if not powered and player in powered_childs:
- powered_childs.remove(player.player_id)
- elif powered and player.player_id not in powered_childs:
- powered_childs.append(player.player_id)
- # if the last player of a group turned off, turn off the group
- if len(powered_childs) == 0:
- self.logger.debug(
- "Group %s has no more powered members, turning off group player",
- active_group_player.display_name,
- )
- self.mass.create_task(self.mass.players.cmd_power(active_group_player.player_id, False))
- return
- # forward to either syncgroup logic or group player logic
- if active_group_player.type == PlayerType.SYNC_GROUP:
- self._on_syncgroup_child_power(active_group_player, player, powered, group_player_state)
- elif active_group_player.type == PlayerType.GROUP:
- player_prov = self.mass.get_provider(active_group_player.provider)
- player_prov.on_group_child_power(
- active_group_player, player, powered, group_player_state
- )
-
@api_command("players/cmd/volume_set")
@handle_player_command
async def cmd_volume_set(self, player_id: str, volume_level: int) -> None:
"""
# TODO: Implement PlayerControl
player = self.get(player_id, True)
- if player.type in (PlayerType.GROUP, PlayerType.SYNC_GROUP):
+ if player.type == PlayerType.GROUP:
# redirect to group volume control
await self.cmd_group_volume(player_id, volume_level)
return
new_volume = volume_level
volume_dif = new_volume - cur_volume
coros = []
- for child_player in self.iter_group_members(group_player, True):
+ for child_player in self.iter_group_members(
+ group_player, only_powered=True, exclude_self=False
+ ):
if PlayerFeature.VOLUME_SET not in child_player.supported_features:
continue
cur_child_volume = child_player.volume_level
coros.append(self.cmd_volume_set(child_player.player_id, new_child_volume))
await asyncio.gather(*coros)
- @api_command("players/cmd/group_power")
- async def cmd_group_power(self, player_id: str, power: bool) -> None:
- """Handle power command for a (Sync/Player)Group."""
- group_player = self.get(player_id, True)
-
- if group_player.powered == power:
- return # nothing to do
-
- if group_player.type == PlayerType.GROUP:
- # this is a native group player, redirect
- await self.cmd_power(player_id, power)
- return
-
- if not (group_player.type == PlayerType.SYNC_GROUP or group_player.group_childs):
- # this is not a (temporary) sync group - nothing to do
- raise UnsupportedFeaturedException("Player is not a sync group")
-
- # make sure to update the group power state
- group_player.powered = power
-
- # always stop (group/master)player at power off
- if not power and group_player.state in (PlayerState.PLAYING, PlayerState.PAUSED):
- await self.cmd_stop(player_id)
-
- default_on_pref = self.mass.config.get_raw_player_config_value(
- group_player.player_id, CONF_SYNCGROUP_DEFAULT_ON, "powered_only"
- )
-
- # handle syncgroup - this will also work for temporary syncgroups
- # where players are manually synced against a group leader
- any_member_powered = False
- async with TaskManager(self.mass) as tg:
- for member in self.iter_group_members(
- group_player, only_powered=(default_on_pref != "always_all")
- ):
- any_member_powered = True
- if power:
- if member.state in (PlayerState.PLAYING, PlayerState.PAUSED):
- # stop playing existing content on member if we start the group player
- tg.create_task(self.cmd_stop(member.player_id))
- # set active source to group player if the group (is going to be) powered
- member.active_group = group_player.active_group
- member.active_source = group_player.active_source
- self.update(member.player_id, skip_forward=True)
- else:
- # turn off child player when group turns off
- tg.create_task(self.cmd_power(member.player_id, False))
- # reset active source on player
- member.active_source = None
- member.active_group = None
- self.update(member.player_id, skip_forward=True)
- # handle default power ON
- if power:
- sync_leader = self.get_sync_leader(group_player)
- for member in self.iter_group_members(group_player, only_powered=False):
- if default_on_pref == "always_all" or (
- sync_leader
- and default_on_pref == "always_leader"
- and member.player_id == sync_leader.player_id
- ):
- tg.create_task(self.cmd_power(member.player_id, True))
- member.active_group = group_player.player_id
- member.active_source = group_player.active_source
- any_member_powered = True
- if not any_member_powered:
- return
-
- if power and group_player.player_id.startswith(SYNCGROUP_PREFIX):
- await self.sync_syncgroup(group_player.player_id)
- self.update(player_id)
-
@api_command("players/cmd/volume_mute")
@handle_player_command
async def cmd_volume_mute(self, player_id: str, muted: bool) -> None:
- player_id: player_id of the player to handle the command.
- position: position in seconds to seek to in the current playing item.
"""
- player_id = self._check_redirect(player_id)
-
- player = self.get(player_id, True)
+ player = self._get_player_with_redirect(player_id)
if PlayerFeature.SEEK not in player.supported_features:
msg = f"Player {player.display_name} does not support seeking"
raise UnsupportedFeaturedException(msg)
player.active_group, url, use_pre_announce, volume_level
)
return
- if player.type in (PlayerType.SYNC_GROUP, PlayerType.GROUP) and not player.powered:
+ if player.type == PlayerType.GROUP and not player.powered:
# announcement request sent to inactive group, check if any child's are playing
if len(list(self.iter_group_members(player, True, True))) > 0:
# just for the sake of simplicity we handle this request per-player
finally:
player.announcement_in_progress = False
- async def play_media(self, player_id: str, media: PlayerMedia) -> None:
+ async def play_media(
+ self, player_id: str, media: PlayerMedia, skip_redirect: bool = False
+ ) -> None:
"""Handle PLAY MEDIA on given player.
- player_id: player_id of the player to handle the command.
- media: The Media that needs to be played on the player.
"""
- # handle syncgroup: redirect to syncgroup-leader if needed
- if player_id.startswith(SYNCGROUP_PREFIX):
- await self.cmd_group_power(player_id, True)
- group_player = self.get(player_id, True)
- if sync_leader := self.get_sync_leader(group_player):
- await self.play_media(sync_leader.player_id, media=media)
- group_player.state = PlayerState.PLAYING
- return
+ player = self._get_player_with_redirect(player_id, skip_redirect=skip_redirect)
# power on the player if needed
- player = self.get(player_id, True)
if not player.powered:
await self.cmd_power(player_id, True)
player_prov = self.mass.players.get_player_provider(player_id)
async def enqueue_next_media(self, player_id: str, media: PlayerMedia) -> None:
"""Handle enqueuing of a next media item on the player."""
- if player_id.startswith(SYNCGROUP_PREFIX):
- # redirect to syncgroup-leader if needed
- group_player = self.get(player_id, True)
- if sync_leader := self.get_sync_leader(group_player):
- await self.enqueue_next_media(
- sync_leader.player_id,
- media=media,
- )
- return
player_prov = self.mass.players.get_player_provider(player_id)
async with self._player_throttlers[player_id]:
await player_prov.enqueue_next_media(player_id=player_id, media=media)
- player_id: player_id of the player to handle the command.
"""
- if (player := self.get(player_id)) and player.group_childs:
- # this player is a syncgroup leader, unsync all children
- await self.cmd_unsync_many(player.group_childs)
+ if not (player := self.get(player_id)):
+ self.logger.warning("Player %s is not available", player_id)
+ return
+ if PlayerFeature.SYNC not in player.supported_features:
+ self.logger.warning("Player %s does not support (un)sync commands", player.name)
return
- await self.cmd_unsync_many([player_id])
+ if not (player.synced_to or player.group_childs):
+ return # nothing to do
+
+ # reset active source player if it is unsynced
+ player.active_source = None
+ # forward command to the player provider
+ if player_provider := self.get_player_provider(player_id):
+ await player_provider.cmd_unsync(player_id)
@api_command("players/cmd/sync_many")
async def cmd_sync_many(self, target_player: str, child_player_ids: list[str]) -> None:
"Player %s is already synced, unsyncing first", child_player.name
)
await self.cmd_unsync(child_player.player_id)
-
- if child_player_id not in parent_player.can_sync_with:
- self.logger.warning(
- "Player %s can not be synced with %s",
- child_player.display_name,
- parent_player.display_name,
- )
- continue
+ # power on the player if needed
+ if not child_player.powered:
+ await self.cmd_power(child_player.player_id, True)
# if we reach here, all checks passed
final_player_ids.append(child_player_id)
# set active source if player is synced
@api_command("players/cmd/unsync_many")
async def cmd_unsync_many(self, player_ids: list[str]) -> None:
"""Handle UNSYNC command for all the given players."""
- # filter all player ids on compatibility and availability
for player_id in list(player_ids):
- if not (child_player := self.get(player_id)):
- self.logger.warning("Player %s is not available", player_id)
- continue
- if PlayerFeature.SYNC not in child_player.supported_features:
- self.logger.warning(
- "Player %s does not support (un)sync commands", child_player.name
- )
- continue
- if not child_player.synced_to:
- continue
- # reset active source player if it is unsynced
- child_player.active_source = None
- # forward command to the player provider
- if player_provider := self.get_player_provider(player_id):
- await player_provider.cmd_unsync(player_id)
+ await self.cmd_unsync(player_id)
def set(self, player: Player) -> None:
"""Set/Update player details on the controller."""
if not player.enabled:
return
- # initialize sync groups as soon as a player is registered
- self.mass.loop.create_task(self._register_syncgroups())
-
self.logger.info(
"Player registered: %s/%s",
player_id,
self.mass.signal_event(EventType.PLAYER_REMOVED, player_id)
def update(
- self, player_id: str, skip_forward: bool = False, force_update: bool = False
+ self, player_id: str, skip_redirect: bool = False, force_update: bool = False
) -> None:
"""Update player state."""
if self.mass.closing:
if player_id not in self._players:
return
player = self._players[player_id]
- # calculate active group and active source
- player.active_group = self._get_active_player_group(player)
- if player.active_source is None:
- player.active_source = self._get_active_source(player)
+ player.active_source = self._get_active_source(player)
player.volume_level = player.volume_level or 0 # guard for None volume
# correct group_members if needed
if player.group_childs == {player.player_id}:
player.group_childs = set()
# calculate group volume
player.group_volume = self._get_group_volume_level(player)
- if player.type in (PlayerType.GROUP, PlayerType.SYNC_GROUP):
+ if player.type == PlayerType.GROUP:
player.volume_level = player.group_volume
# prefer any overridden name from config
player.display_name = (
player.player_id,
CONF_ENTRY_PLAYER_ICON.key,
CONF_ENTRY_PLAYER_ICON_GROUP.default_value
- if player.type in (PlayerType.GROUP, PlayerType.SYNC_GROUP)
+ if player.type == PlayerType.GROUP
else CONF_ENTRY_PLAYER_ICON.default_value,
)
- # handle syncgroup - get attributes from sync leader
- if player.player_id.startswith(SYNCGROUP_PREFIX):
- sync_leader = self.get_sync_leader(player)
- if sync_leader and sync_leader.active_source == player.active_source:
- player.state = sync_leader.state
- player.active_source = sync_leader.active_source
- player.current_media = sync_leader.current_media
- player.elapsed_time = sync_leader.elapsed_time
- player.elapsed_time_last_updated = sync_leader.elapsed_time_last_updated
- else:
- player.state = PlayerState.IDLE
- player.active_source = player.player_id
# basic throttle: do not send state changed events if player did not actually change
prev_state = self._prev_states.get(player_id, {})
changed_values = get_changed_values(
prev_state,
new_state,
- ignore_keys=["elapsed_time", "elapsed_time_last_updated", "seq_no", "last_poll"],
+ ignore_keys=[
+ "elapsed_time",
+ "elapsed_time_last_updated",
+ "seq_no",
+ "last_poll",
+ ],
)
self._prev_states[player_id] = new_state
self.mass.signal_event(EventType.PLAYER_UPDATED, object_id=player_id, data=player)
- if skip_forward:
+ if skip_redirect:
return
+
# update/signal group player(s) child's when group updates
- if player.type in (PlayerType.GROUP, PlayerType.SYNC_GROUP):
- for child_player in self.iter_group_members(player):
- if child_player.player_id == player.player_id:
- continue
- self.update(child_player.player_id, skip_forward=True)
+ if player.type == PlayerType.GROUP:
+ for child_player in self.iter_group_members(player, exclude_self=True):
+ self.update(child_player.player_id, skip_redirect=True)
# update/signal group player(s) when child updates
for group_player in self._get_player_groups(player, powered_only=False):
- player_prov = self.get_player_provider(group_player.player_id)
- if not player_prov:
- continue
- if group_player.player_id.startswith(SYNCGROUP_PREFIX):
- self.update(group_player.player_id, skip_forward=True)
- else:
+ if player_prov := self.mass.get_provider(group_player.provider):
self.mass.create_task(player_prov.poll_player(group_player.player_id))
def get_player_provider(self, player_id: str) -> PlayerProvider:
# ensure the result is an integer
return None if volume_level is None else int(volume_level)
- def _check_redirect(self, player_id: str) -> str:
- """Check if playback related command should be redirected."""
+ def _get_player_with_redirect(self, player_id: str, skip_redirect: bool = False) -> Player:
+ """Get player with check if playback related command should be redirected."""
player = self.get(player_id, True)
- if player.synced_to:
- sync_leader = self.get(player.synced_to, True)
- self.logger.warning(
+ if skip_redirect:
+ return player
+ if player.synced_to and (sync_leader := self.get(player.synced_to)):
+ self.logger.info(
"Player %s is synced to %s and can not accept "
"playback related commands itself, "
"redirected the command to the sync leader.",
player.name,
sync_leader.name,
)
- return player.synced_to
- return player_id
+ return sync_leader
+ if player.active_group and (active_group := self.get(player.active_group)):
+ self.logger.info(
+ "Player %s is part of a playergroup and can not accept "
+ "playback related commands itself, "
+ "redirected the command to the group leader.",
+ player.name,
+ )
+ return active_group
+ if (
+ player.active_source
+ and player.active_source != player.player_id
+ and (active_source := self.get(player.active_source))
+ ):
+ self.logger.info(
+ "Player %s has a different source active (%s), "
+ "redirected the command to the source player.",
+ player.name,
+ active_source.display_name,
+ )
+ return active_source
+ return player
def _get_player_groups(
self, player: Player, available_only: bool = True, powered_only: bool = False
for _player in self:
if _player.player_id == player.player_id:
continue
- if _player.type not in (PlayerType.GROUP, PlayerType.SYNC_GROUP):
+ if _player.type != PlayerType.GROUP:
continue
if available_only and not _player.available:
continue
if powered_only and not _player.powered:
continue
- if (
- player.player_id in _player.group_childs
- or player.active_source == _player.player_id
- ):
+ if player.player_id in _player.group_childs:
yield _player
- def _get_active_player_group(self, player: Player) -> str | None:
- """Return the currently active groupplayer for the given player (if any)."""
- # prefer active source group
- for group_player in self._get_player_groups(player, available_only=True, powered_only=True):
- if player.active_source in (group_player.player_id, group_player.active_source):
- return group_player.player_id
- # fallback to just the first powered group
- for group_player in self._get_player_groups(player, available_only=True, powered_only=True):
- return group_player.player_id
- return None
-
def _get_active_source(self, player: Player) -> str:
"""Return the active_source id for given player."""
# if player is synced, return group leader's active source
if player.synced_to and (parent_player := self.get(player.synced_to)):
return parent_player.active_source
- # fallback to the first active group player
- if player.active_group:
- group_player = self.get(player.active_group)
+ # if player has group active, return those details
+ if player.active_group and (group_player := self.get(player.active_group)):
return self._get_active_source(group_player)
- # defaults to the player's own player id if not active source set
+ # defaults to the player's own player id if no active source set
return player.active_source or player.player_id
def _get_group_volume_level(self, player: Player) -> int:
# calculate group volume from all (turned on) players
group_volume = 0
active_players = 0
- for child_player in self.iter_group_members(player, True):
+ for child_player in self.iter_group_members(player, only_powered=True, exclude_self=False):
if PlayerFeature.VOLUME_SET not in child_player.supported_features:
continue
group_volume += child_player.volume_level or 0
group_player: Player,
only_powered: bool = False,
only_playing: bool = False,
+ active_only: bool = False,
+ exclude_self: bool = True,
) -> Iterator[Player]:
- """Get (child) players attached to a grouped player."""
+ """Get (child) players attached to a group player or syncgroup."""
for child_id in list(group_player.group_childs):
if child_player := self.get(child_id, False):
if not child_player.available:
continue
if not (not only_powered or child_player.powered):
continue
+ if not (not active_only or child_player.active_group == group_player.player_id):
+ continue
+ if exclude_self and child_player.player_id == group_player.player_id:
+ continue
if not (
not only_playing
or child_player.state in (PlayerState.PLAYING, PlayerState.PAUSED)
def on_player_config_changed(self, config: PlayerConfig, changed_keys: set[str]) -> None:
"""Call (by config manager) when the configuration of a player changes."""
- player = self.mass.players.get(config.player_id)
+ if not (player := self.mass.players.get(config.player_id)):
+ return
if config.enabled:
player_prov = self.mass.players.get_player_provider(config.player_id)
self.mass.create_task(player_prov.poll_player(config.player_id))
player.enabled = config.enabled
+ # signal player provider that the config changed
+ with suppress(PlayerUnavailableError):
+ if provider := self.mass.get_provider(config.provider):
+ provider.on_player_config_changed(config, changed_keys)
self.mass.players.update(config.player_id, force_update=True)
- if config.player_id.startswith(SYNCGROUP_PREFIX):
- # handle syncgroup
- if f"values/{CONF_GROUP_MEMBERS}" in changed_keys:
- player = self.mass.players.get(config.player_id)
- player.group_childs = config.get_value(CONF_GROUP_MEMBERS)
- self.mass.players.update(config.player_id)
- else:
- # signal player provider that the config changed
- with suppress(PlayerUnavailableError):
- if provider := self.mass.get_provider(config.provider):
- provider.on_player_config_changed(config, changed_keys)
# if the player was playing, restart playback
if player and player.state == PlayerState.PLAYING:
self.mass.create_task(self.mass.player_queues.resume(player.active_source))
def on_player_config_removed(self, player_id: str) -> None:
"""Call (by config manager) when the configuration of a player is removed."""
if (player := self.mass.players.get(player_id)) and player.available:
- player.enabled = False
self.mass.players.update(player_id, force_update=True)
if player and (provider := self.mass.get_provider(player.provider)):
- assert isinstance(provider, PlayerProvider)
+ provider = cast(PlayerProvider, provider)
provider.on_player_config_removed(player_id)
- if not player:
+ if not self.mass.players.get(player_id):
self.mass.signal_event(EventType.PLAYER_REMOVED, player_id)
- # Syncgroup specific functions/helpers
-
- @api_command("players/create_syncgroup")
- async def create_syncgroup(self, name: str, members: list[str]) -> Player:
- """Create a new Sync Group with name and members.
-
- - name: Name for the new group to create.
- - members: A list of player_id's that should be part of this group.
-
- Returns the newly created player on success.
- """
- base_player = self.get(members[0], True)
- # perform basic checks
- if (player_prov := self.mass.get_provider(base_player.provider)) is None:
- msg = f"Provider {base_player.provider} is not available!"
- raise ProviderUnavailableError(msg)
- if ProviderFeature.SYNC_PLAYERS not in player_prov.supported_features:
- msg = f"Provider {player_prov.name} does not support creating groups"
- raise UnsupportedFeaturedException(msg)
- new_group_id = f"{SYNCGROUP_PREFIX}{shortuuid.random(8).lower()}"
- # cleanup list, just in case the frontend sends some garbage
- members = [
- x
- for x in members
- if (x in base_player.can_sync_with or x == base_player.player_id)
- and not x.startswith(SYNCGROUP_PREFIX)
- ]
- # create default config with the user chosen name
- self.mass.config.create_default_player_config(
- new_group_id,
- player_prov.instance_id,
- name=name,
- enabled=True,
- values={CONF_GROUP_MEMBERS: members},
- )
- return self.register_syncgroup(group_player_id=new_group_id, name=name, members=members)
-
- def register_syncgroup(self, group_player_id: str, name: str, members: Iterable[str]) -> Player:
- """Register a (virtual/fake) syncgroup player."""
- # extract player features from first/random player
- for member in members:
- if first_player := self.mass.players.get(member):
- break
- else:
- # edge case: no child player is (yet) available; postpone register
- return None
- player_prov = self.mass.get_provider(first_player.provider)
- if TYPE_CHECKING:
- assert player_prov
- player = Player(
- player_id=group_player_id,
- provider=player_prov.instance_id,
- type=PlayerType.SYNC_GROUP,
- name=name,
- available=True,
- powered=False,
- device_info=DeviceInfo(model="SyncGroup", manufacturer=player_prov.name),
- supported_features=first_player.supported_features,
- group_childs=set(members),
- active_source=group_player_id,
- )
- self.mass.players.register_or_update(player)
- return player
-
- def get_sync_leader(self, group_player: Player) -> Player | None:
- """Get the active sync leader player for a syncgroup or synced player."""
- if group_player.synced_to:
- # should not happen but just in case...
- return group_player.synced_to
- # current sync leader: return the (first/only) player that has group childs
- for child_player in self.iter_group_members(
- group_player, only_powered=False, only_playing=False
- ):
- if child_player.group_childs:
- return child_player
- pref_sync_leader = self.mass.config.get_raw_player_config_value(
- group_player.player_id, CONF_SYNC_LEADER, "auto"
- )
- if pref_sync_leader != "auto" and (player := self.get(pref_sync_leader)):
- return player
- # select new sync leader: return the first playing player
- for child_player in self.iter_group_members(
- group_player, only_powered=True, only_playing=True
- ):
- return child_player
- # fallback select new sync leader: return the first powered player
- for child_player in self.iter_group_members(
- group_player, only_powered=True, only_playing=False
- ):
- return child_player
- # fallback select new sync leader: simply return the first player
- for child_player in self.iter_group_members(
- group_player, only_powered=False, only_playing=False
- ):
- return child_player
- return None
-
- async def sync_syncgroup(self, player_id: str) -> None:
- """Sync all (possible) players of a syncgroup."""
- group_player = self.get(player_id, True)
- if not (sync_leader := self.get_sync_leader(group_player)):
- raise RuntimeError("No sync leader found for syncgroup")
- for member in self.iter_group_members(group_player, only_powered=True):
- if not member.can_sync_with:
- continue
- if sync_leader.player_id == member.player_id:
- continue
- await self.cmd_sync(member.player_id, sync_leader.player_id)
-
- def _on_syncgroup_child_power(
- self, group_player: Player, child_player: Player, new_power: bool, group_state: PlayerState
- ) -> None:
- """
- Call when a power command was executed on one of the child players of a SyncGroup.
-
- This is used to handle special actions such as (re)syncing.
- The group state is sent with the state BEFORE the power command was executed.
- """
- group_playing = group_state == PlayerState.PLAYING
- sync_leader = self.mass.players.get_sync_leader(group_player)
- is_sync_leader = child_player.player_id == sync_leader.player_id
- if group_playing and not new_power and is_sync_leader:
- # the current sync leader player turned OFF while the group player
- # should still be playing - we need to select a new sync leader and resume
- self.logger.warning(
- "Syncleader %s turned off while syncgroup is playing, "
- "a forced resync for syngroup %s will be attempted...",
- child_player.display_name,
- group_player.display_name,
- )
-
- async def full_resync() -> None:
- await self.mass.players.sync_syncgroup(group_player.player_id)
- await self.mass.player_queues.resume(group_player.player_id)
-
- self.mass.call_later(2, full_resync, task_id=f"forced_resync_{group_player.player_id}")
- return
- elif new_power:
- # if a child player turned ON while the group is already active, we need to resync
- if sync_leader.player_id != child_player.player_id:
- self.mass.create_task(
- self.cmd_sync(child_player.player_id, sync_leader.player_id),
- )
-
- async def _register_syncgroups(self) -> None:
- """Register all (virtual/fake) syncgroup players."""
- player_configs = await self.mass.config.get_player_configs()
- for player_config in player_configs:
- if not player_config.player_id.startswith(SYNCGROUP_PREFIX):
- continue
- members = self.mass.config.get_raw_player_config_value(
- player_config.player_id, CONF_GROUP_MEMBERS
- )
- self.register_syncgroup(
- group_player_id=player_config.player_id,
- name=player_config.name or player_config.default_name,
- members=members,
- )
-
async def _play_announcement(
self,
player: Player,
CONF_ENTRY_ANNOUNCE_VOLUME_MAX,
CONF_ENTRY_ANNOUNCE_VOLUME_MIN,
CONF_ENTRY_ANNOUNCE_VOLUME_STRATEGY,
- CONF_ENTRY_PLAYER_ICON_GROUP,
ConfigEntry,
- ConfigValueOption,
PlayerConfig,
)
-from music_assistant.common.models.enums import ConfigEntryType, PlayerState
from music_assistant.common.models.player import Player, PlayerMedia
-from music_assistant.constants import (
- CONF_GROUP_MEMBERS,
- CONF_PREVENT_SYNC_LEADER_OFF,
- CONF_SYNC_LEADER,
- CONF_SYNCGROUP_DEFAULT_ON,
- SYNCGROUP_PREFIX,
-)
from .provider import Provider
async def get_player_config_entries(self, player_id: str) -> tuple[ConfigEntry, ...]:
"""Return all (provider/player specific) Config Entries for the given player (if any)."""
- if player_id.startswith(SYNCGROUP_PREFIX):
- # default entries for syncgroups
- return (
- *BASE_PLAYER_CONFIG_ENTRIES,
- CONF_ENTRY_PLAYER_ICON_GROUP,
- ConfigEntry(
- key=CONF_GROUP_MEMBERS,
- type=ConfigEntryType.STRING,
- label="Group members",
- default_value=[],
- options=tuple(
- ConfigValueOption(x.display_name, x.player_id)
- for x in self.mass.players.all(True, False)
- if x.player_id != player_id
- and x.provider == self.instance_id
- and not x.player_id.startswith(SYNCGROUP_PREFIX)
- ),
- description="Select all players you want to be part of this group",
- multi_value=True,
- required=True,
- ),
- ConfigEntry(
- key=CONF_SYNC_LEADER,
- type=ConfigEntryType.STRING,
- label="Preferred sync leader",
- default_value="auto",
- options=(
- *tuple(
- ConfigValueOption(x.display_name, x.player_id)
- for x in self.mass.players.all(True, False)
- if x.player_id
- in self.mass.config.get_raw_player_config_value(
- player_id, CONF_GROUP_MEMBERS, []
- )
- ),
- ConfigValueOption("Select automatically", "auto"),
- ),
- description="By default Music Assistant will automatically assign a "
- "(random) player as sync leader, meaning the other players in the sync group "
- "will be synced to that player. If you want to force a specific player to be "
- "the sync leader, select it here.",
- required=True,
- ),
- ConfigEntry(
- key=CONF_PREVENT_SYNC_LEADER_OFF,
- type=ConfigEntryType.BOOLEAN,
- label="Prevent sync leader power off",
- default_value=False,
- description="With this setting enabled, Music Assistant will disallow powering "
- "off the sync leader player if other players are still "
- "active in the sync group. This is useful if you want to prevent "
- "a short drop in the music while the music is transferred to another player.",
- required=True,
- ),
- ConfigEntry(
- key=CONF_SYNCGROUP_DEFAULT_ON,
- type=ConfigEntryType.STRING,
- label="Default power ON behavior",
- default_value="powered_only",
- options=(
- ConfigValueOption("Always power ON all child devices", "always_all"),
- ConfigValueOption("Always power ON sync leader", "always_leader"),
- ConfigValueOption("Start with powered players", "powered_only"),
- ConfigValueOption("Ignore", "ignore"),
- ),
- description="What should happen if you power ON a sync group "
- "(or you start playback to it), while no (or not all) players "
- "are powered ON ?\n\nShould Music Assistant power ON all players, or only the "
- "sync leader, or should it ignore the command if no players are powered ON ?",
- required=False,
- ),
- )
-
return (
*BASE_PLAYER_CONFIG_ENTRIES,
# add default entries for announce feature
- player_id: player_id of the player to handle the command.
"""
- @abstractmethod
async def cmd_play(self, player_id: str) -> None:
"""Send PLAY (unpause) command to given player.
- player_id: player_id of the player to handle the command.
"""
+ # will only be called for players with Pause feature set.
+ raise NotImplementedError
async def cmd_pause(self, player_id: str) -> None:
"""Send PAUSE command to given player.
# will only be called for players with Pause feature set.
raise NotImplementedError
+ @abstractmethod
async def play_media(
self,
player_id: str,
Join/add the given player(id) to the given (master) player/sync group.
- player_id: player_id of the player to handle the command.
- - target_player: player_id of the syncgroup master or group player.
+ - target_player: player_id of the sync leader.
"""
# will only be called for players with SYNC feature set.
raise NotImplementedError
# default implementation, simply call the cmd_sync for all child players
await self.cmd_sync(child_id, target_player)
+ async def on_group_child_power(
+ self, group_player_id: str, child_player_id: str, powered: bool
+ ) -> None:
+ """Call when a child player of a group player is powered on/off."""
+ # default implementation, simply redirect the request to the group player
+ self.logger.warning(
+ "Detected a player power command to a player that is part of a group. "
+ "Redirecting to group player..."
+ )
+ await self.mass.players.cmd_power(group_player_id, powered)
+
async def poll_player(self, player_id: str) -> None:
"""Poll player for state updates.
if 'needs_poll' is set to True in the player object.
"""
- def on_group_child_power(
- self, group_player: Player, child_player: Player, new_power: bool, group_state: PlayerState
- ) -> None:
- """
- Call when a power command was executed on one of the child players of a PlayerGroup.
-
- This is used to handle special actions such as (re)syncing.
- The group state is sent with the state BEFORE the power command was executed.
- """
-
# DO NOT OVERRIDE BELOW
@property
"domain": self.domain,
"name": self.config.name or self.name,
"instance_id": self.instance_id,
+ "lookup_key": self.lookup_key,
"supported_features": [x.value for x in self.supported_features],
"available": self.available,
"is_streaming_provider": getattr(self, "is_streaming_provider", None),
from music_assistant.server import MusicAssistant
from music_assistant.server.helpers.audio import FFMpeg, get_ffmpeg_stream, get_player_filter_params
from music_assistant.server.helpers.process import AsyncProcess, check_output
-from music_assistant.server.helpers.util import TaskManager
+from music_assistant.server.helpers.util import TaskManager, lock
from music_assistant.server.models.player_provider import PlayerProvider
if TYPE_CHECKING:
from music_assistant.common.models.provider import ProviderManifest
from music_assistant.server import MusicAssistant
from music_assistant.server.models import ProviderInstanceType
- from music_assistant.server.providers.ugp import UniversalGroupProvider
+ from music_assistant.server.providers.player_group import PlayerGroupProvider
DOMAIN = "airplay"
airplay_player = self._players[player_id]
await airplay_player.cmd_pause()
+ @lock
async def play_media(
self,
player_id: str,
)
elif media.queue_id.startswith("ugp_"):
# special case: UGP stream
- ugp_provider: UniversalGroupProvider = self.mass.get_provider("ugp")
- ugp_stream = ugp_provider.streams[media.queue_id]
+ ugp_provider: PlayerGroupProvider = self.mass.get_provider("player_group")
+ ugp_stream = ugp_provider.ugp_streams[media.queue_id]
input_format = ugp_stream.output_format
audio_source = ugp_stream.subscribe()
elif media.queue_id and media.queue_item_id:
# store last state in cache
await self.mass.cache.set(player_id, volume_level, base_key=CACHE_KEY_PREV_VOLUME)
+ @lock
async def cmd_sync(self, player_id: str, target_player: str) -> None:
"""Handle SYNC command for given player.
)
else:
# make sure that the player manager gets an update
- self.mass.players.update(child_player.player_id, skip_forward=True)
- self.mass.players.update(parent_player.player_id, skip_forward=True)
+ self.mass.players.update(child_player.player_id, skip_redirect=True)
+ self.mass.players.update(parent_player.player_id, skip_redirect=True)
+ @lock
async def cmd_unsync(self, player_id: str) -> None:
"""Handle UNSYNC command for given player.
- player_id: player_id of the player to handle the command.
"""
player = self.mass.players.get(player_id, raise_unavailable=True)
- if not player.synced_to:
- return
- group_leader = self.mass.players.get(player.synced_to, raise_unavailable=True)
- group_leader.group_childs.remove(player_id)
- player.synced_to = None
- await self.cmd_stop(player_id)
- # make sure that the player manager gets an update
- self.mass.players.update(player.player_id, skip_forward=True)
- self.mass.players.update(group_leader.player_id, skip_forward=True)
+ if player.synced_to:
+ group_leader = self.mass.players.get(player.synced_to, raise_unavailable=True)
+ if player_id in group_leader.group_childs:
+ group_leader.group_childs.remove(player_id)
+ player.synced_to = None
+ airplay_player = self._players.get(player_id)
+ await airplay_player.cmd_stop()
+ # make sure that the player manager gets an update
+ self.mass.players.update(player.player_id, skip_redirect=True)
+ self.mass.players.update(group_leader.player_id, skip_redirect=True)
async def _getcliraop_binary(self):
"""Find the correct raop/airplay binary belonging to the platform."""
PlayerFeature.SYNC,
PlayerFeature.VOLUME_SET,
),
- can_sync_with=tuple(x for x in self._players if x != player_id),
volume_level=volume,
)
self.mass.players.register_or_update(mass_player)
- # update can_sync_with field of all other players
- # this ensure that the field always contains all player ids,
- # even when a player joins later on
- for player in self.players:
- if player.player_id == player_id:
- continue
- player.can_sync_with = tuple(x for x in self._players if x != player.player_id)
- self.mass.players.update(player.player_id)
async def _handle_dacp_request( # noqa: PLR0915
self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter
)
from music_assistant.common.models.errors import PlayerCommandFailed
from music_assistant.common.models.player import DeviceInfo, Player, PlayerMedia
-from music_assistant.constants import (
- VERBOSE_LOG_LEVEL,
-)
+from music_assistant.constants import VERBOSE_LOG_LEVEL
from music_assistant.server.helpers.util import (
get_port_from_zeroconf,
get_primary_ip_address_from_zeroconf,
self.mass_player.active_source = self.sync_status.master
self.mass_player.state = PLAYBACK_STATE_MAP[self.status.state]
- self.mass_player.can_sync_with = (
- tuple(x for x in self.prov.bluos_players if x != self.player_id),
- )
-
self.mass.players.update(self.player_id)
castplayer = self.castplayers[player_id]
if powered:
await self._launch_app(castplayer)
- else:
- await asyncio.to_thread(castplayer.cc.quit_app)
+ return
+ # handle power off
+ await asyncio.to_thread(castplayer.cc.quit_app)
async def cmd_volume_set(self, player_id: str, volume_level: int) -> None:
"""Send VOLUME_SET command to given player."""
# active source
if group_player:
castplayer.player.active_source = group_player.player.active_source
+ castplayer.player.active_group = group_player.player.player_id
elif castplayer.cc.app_id == MASS_APP_ID:
castplayer.player.active_source = castplayer.player_id
+ castplayer.player.active_group = None
else:
castplayer.player.active_source = castplayer.cc.app_display_name
+ castplayer.player.active_group = None
+
+ if status.content_id:
+ castplayer.player.current_media = PlayerMedia(
+ uri=status.content_id,
+ title=status.title,
+ artist=status.artist,
+ album=status.album_name,
+ image_url=status.images[0].url if status.images else None,
+ duration=status.duration,
+ media_type=MediaType.TRACK,
+ )
+ else:
+ castplayer.player.current_media = None
# current media
self.mass.loop.call_soon_threadsafe(self.mass.players.update, castplayer.player_id)
if group_player := self.prov.castplayers.get(group_uuid):
if group_player.cc.media_controller.is_active:
self.castplayer.active_group = group_uuid
- self.castplayer.player.active_source = group_uuid
- self.castplayer.player.state = group_player.player.state
elif group_uuid == self.castplayer.active_group:
self.castplayer.active_group = None
- self.castplayer.player.active_source = self.castplayer.player.player_id
self.prov.logger.log(
VERBOSE_LOG_LEVEL,
) -> None:
"""Handle setup of a Player from an hass entity."""
hass_device: HassDevice | None = None
- platform_players: list[str] = []
if entity_registry_entry := entity_registry.get(state["entity_id"]):
- # collect all players from same platform
- platform_players = [
- entity_id
- for entity_id, entity in entity_registry.items()
- if entity["platform"] == entity_registry_entry["platform"]
- and state["entity_id"].startswith("media_player")
- and entity_id != state["entity_id"]
- ]
hass_device = device_registry.get(entity_registry_entry["device_id"])
hass_supported_features = MediaPlayerEntityFeature(
state["attributes"]["supported_features"]
supported_features=tuple(supported_features),
state=StateMap.get(state["state"], PlayerState.IDLE),
)
- if MediaPlayerEntityFeature.GROUPING in hass_supported_features:
- player.can_sync_with = platform_players
self._update_player_attributes(player, state["attributes"])
self.mass.players.register_or_update(player)
--- /dev/null
+"""
+Sync Group Player provider.
+
+This is more like a "virtual" player provider,
+allowing the user to create 'presets' of players to sync together (of the same type).
+"""
+
+from __future__ import annotations
+
+from collections.abc import Callable
+from time import time
+from typing import TYPE_CHECKING, Final, cast
+
+import shortuuid
+from aiohttp import web
+
+from music_assistant.common.models.config_entries import (
+ BASE_PLAYER_CONFIG_ENTRIES,
+ CONF_ENTRY_CROSSFADE,
+ CONF_ENTRY_CROSSFADE_DURATION,
+ CONF_ENTRY_FLOW_MODE_ENFORCED,
+ CONF_ENTRY_PLAYER_ICON_GROUP,
+ ConfigEntry,
+ ConfigValueOption,
+ ConfigValueType,
+ PlayerConfig,
+ create_sample_rates_config_entry,
+)
+from music_assistant.common.models.enums import (
+ ConfigEntryType,
+ ContentType,
+ EventType,
+ MediaType,
+ PlayerFeature,
+ PlayerState,
+ PlayerType,
+ ProviderFeature,
+)
+from music_assistant.common.models.errors import (
+ ProviderUnavailableError,
+ UnsupportedFeaturedException,
+)
+from music_assistant.common.models.event import MassEvent
+from music_assistant.common.models.media_items import AudioFormat
+from music_assistant.common.models.player import DeviceInfo, Player, PlayerMedia
+from music_assistant.constants import (
+ CONF_CROSSFADE,
+ CONF_CROSSFADE_DURATION,
+ CONF_ENABLE_ICY_METADATA,
+ CONF_ENFORCE_MP3,
+ CONF_FLOW_MODE,
+ CONF_GROUP_MEMBERS,
+ CONF_HTTP_PROFILE,
+ CONF_SAMPLE_RATES,
+)
+from music_assistant.server.controllers.streams import DEFAULT_STREAM_HEADERS
+from music_assistant.server.helpers.ffmpeg import get_ffmpeg_stream
+from music_assistant.server.helpers.util import TaskManager
+from music_assistant.server.models.player_provider import PlayerProvider
+
+from .ugp_stream import UGP_FORMAT, UGPStream
+
+if TYPE_CHECKING:
+ from collections.abc import Iterable
+
+ from music_assistant.common.models.config_entries import ProviderConfig
+ from music_assistant.common.models.provider import ProviderManifest
+ from music_assistant.server import MusicAssistant
+ from music_assistant.server.models import ProviderInstanceType
+
+
+# ruff: noqa: ARG002
+
+UNIVERSAL_PREFIX: Final[str] = "ugp_"
+SYNCGROUP_PREFIX: Final[str] = "syncgroup_"
+GROUP_TYPE_UNIVERSAL: Final[str] = "universal"
+CONF_GROUP_TYPE: Final[str] = "group_type"
+CONF_ENTRY_GROUP_TYPE = ConfigEntry(
+ key=CONF_GROUP_TYPE,
+ type=ConfigEntryType.STRING,
+ label="Group type",
+ default_value="universal",
+ hidden=True,
+ required=True,
+)
+CONF_ENTRY_GROUP_MEMBERS = ConfigEntry(
+ key=CONF_GROUP_MEMBERS,
+ type=ConfigEntryType.STRING,
+ label="Group members",
+ default_value=[],
+ description="Select all players you want to be part of this group",
+ multi_value=True,
+ required=True,
+)
+CONF_ENTRY_SAMPLE_RATES_UGP = create_sample_rates_config_entry(44100, 16, 44100, 16, True)
+CONFIG_ENTRY_UGP_NOTE = ConfigEntry(
+ key="ugp_note",
+ type=ConfigEntryType.LABEL,
+ label="Please note that although the Universal Group "
+ "allows you to group any player, it will not enable audio sync "
+ "between players of different ecosystems. It is advised to always use native "
+ "player groups or sync groups when available for your player type(s) and use "
+ "the Universal Group only to group players of different ecosystems.",
+ required=False,
+)
+
+
+async def setup(
+ mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
+) -> ProviderInstanceType:
+ """Initialize provider(instance) with given configuration."""
+ return PlayerGroupProvider(mass, manifest, config)
+
+
+async def get_config_entries(
+ mass: MusicAssistant, # noqa: ARG001
+ instance_id: str | None = None, # noqa: ARG001
+ action: str | None = None, # noqa: ARG001
+ values: dict[str, ConfigValueType] | None = None, # noqa: ARG001
+) -> tuple[ConfigEntry, ...]:
+ """
+ Return Config entries to setup this provider.
+
+ instance_id: id of an existing provider instance (None if new instance setup).
+ action: [optional] action key called from config entries UI.
+ values: the (intermediate) raw values for config entries sent with the action.
+ """
+ # nothing to configure (for now)
+ return ()
+
+
+class PlayerGroupProvider(PlayerProvider):
+ """Base/builtin provider for creating (permanent) player groups."""
+
+ @property
+ def supported_features(self) -> tuple[ProviderFeature, ...]:
+ """Return the features supported by this Provider."""
+ return ()
+
+ def __init__(
+ self, mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
+ ) -> None:
+ """Initialize MusicProvider."""
+ super().__init__(mass, manifest, config)
+ self.ugp_streams: dict[str, UGPStream] = {}
+ self._on_unload: list[Callable[[], None]] = [
+ self.mass.register_api_command("player_group/create", self.create_group),
+ ]
+
+ async def loaded_in_mass(self) -> None:
+ """Call after the provider has been loaded."""
+ # temp: migrate old config entries
+ # remove this after MA 2.4 release
+ for player_config in await self.mass.config.get_player_configs():
+ if player_config.provider == self.instance_id:
+ # already migrated
+ continue
+ # migrate old syncgroup players to this provider
+ if player_config.player_id.startswith(SYNCGROUP_PREFIX):
+ self.mass.config.set_raw_player_config_value(
+ player_config.player_id, CONF_GROUP_TYPE, player_config.provider
+ )
+ player_config.provider = self.instance_id
+ self.mass.config.set_raw_player_config_value(
+ player_config.player_id, "provider", self.instance_id
+ )
+ # migrate old UGP players to this provider
+ elif player_config.player_id.startswith(UNIVERSAL_PREFIX):
+ self.mass.config.set_raw_player_config_value(
+ player_config.player_id, CONF_GROUP_TYPE, "universal"
+ )
+ player_config.provider = self.instance_id
+ self.mass.config.set_raw_player_config_value(
+ player_config.player_id, "provider", self.instance_id
+ )
+
+ await self._register_all_players()
+ # listen for player added events so we can catch late joiners
+ # (because a group depends on its childs to be available)
+ self._on_unload.append(
+ self.mass.subscribe(self._on_mass_player_added_event, EventType.PLAYER_ADDED)
+ )
+
+ async def unload(self) -> None:
+ """
+ Handle unload/close of the provider.
+
+ Called when provider is deregistered (e.g. MA exiting or config reloading).
+ """
+ for unload_cb in self._on_unload:
+ unload_cb()
+
+ async def get_player_config_entries(self, player_id: str) -> tuple[ConfigEntry]:
+ """Return all (provider/player specific) Config Entries for the given player (if any)."""
+ # default entries for player groups
+ base_entries = (
+ *BASE_PLAYER_CONFIG_ENTRIES,
+ CONF_ENTRY_PLAYER_ICON_GROUP,
+ CONF_ENTRY_GROUP_TYPE,
+ CONF_ENTRY_GROUP_MEMBERS,
+ )
+ # group type is static and can not be changed. we just grab the existing, stored value
+ group_type: str = self.mass.config.get_raw_player_config_value(
+ player_id, CONF_GROUP_TYPE, GROUP_TYPE_UNIVERSAL
+ )
+ # handle config entries for universal group players
+ if group_type == GROUP_TYPE_UNIVERSAL:
+ group_members = CONF_ENTRY_GROUP_MEMBERS
+ group_members.options = tuple(
+ ConfigValueOption(x.display_name, x.player_id)
+ for x in self.mass.players.all(True, False)
+ if not x.player_id.startswith(UNIVERSAL_PREFIX)
+ )
+ return (
+ *base_entries,
+ group_members,
+ CONFIG_ENTRY_UGP_NOTE,
+ CONF_ENTRY_CROSSFADE,
+ CONF_ENTRY_CROSSFADE_DURATION,
+ CONF_ENTRY_SAMPLE_RATES_UGP,
+ CONF_ENTRY_FLOW_MODE_ENFORCED,
+ )
+ # handle config entries for syncgroup players
+ group_members = CONF_ENTRY_GROUP_MEMBERS
+ group_members.options = tuple(
+ ConfigValueOption(x.display_name, x.player_id)
+ for x in self.mass.players.all(True, False)
+ if x.provider != self.instance_id
+ and (player_prov := self.mass.get_provider(x.provider))
+ and ProviderFeature.SYNC_PLAYERS in player_prov.supported_features
+ )
+
+ # grab additional details from one of the provider's players
+ if not (player_provider := self.mass.get_provider(group_type)):
+ return base_entries # guard
+ if TYPE_CHECKING:
+ player_provider = cast(PlayerProvider, player_provider)
+ assert player_provider.lookup_key != self.lookup_key
+ if not (child_player := next((x for x in player_provider.players), None)):
+ return base_entries # guard
+
+ # combine base group entries with (base) player entries for this player type
+ allowed_conf_entries = (
+ CONF_HTTP_PROFILE,
+ CONF_ENABLE_ICY_METADATA,
+ CONF_CROSSFADE,
+ CONF_CROSSFADE_DURATION,
+ CONF_ENFORCE_MP3,
+ CONF_FLOW_MODE,
+ CONF_SAMPLE_RATES,
+ )
+ child_config_entries = await player_provider.get_player_config_entries(
+ child_player.player_id
+ )
+ return (
+ *base_entries,
+ group_members,
+ *(entry for entry in child_config_entries if entry.key in allowed_conf_entries),
+ )
+
+ def on_player_config_changed(self, config: PlayerConfig, changed_keys: set[str]) -> None:
+ """Call (by config manager) when the configuration of a player changes."""
+ if "enabled" in changed_keys and not config.enabled:
+ # edge case: ensure that the player is powered off if the player gets disabled
+ self.mass.create_task(self.cmd_power(config.player_id, False))
+ if f"values/{CONF_GROUP_MEMBERS}" in changed_keys:
+ members = config.get_value(CONF_GROUP_MEMBERS)
+ # ensure we filter invalid members
+ members = self._filter_members(config.get_value(CONF_GROUP_TYPE), members)
+ self.mass.config.set_raw_player_config_value(
+ config.player_id, CONF_GROUP_MEMBERS, members
+ )
+ if player := self.mass.players.get(config.player_id):
+ player.group_childs = members
+ self.mass.players.update(config.player_id)
+
+ def on_player_config_removed(self, player_id: str) -> None:
+ """Call (by config manager) when the configuration of a player is removed."""
+ if not (group_player := self.mass.players.get(player_id)):
+ return
+ if group_player.powered:
+ # edge case: the group player is powered and being removed
+ for member in self.mass.players.iter_group_members(group_player, only_powered=True):
+ member.active_group = None
+ if member.state == PlayerState.IDLE:
+ continue
+ if member.synced_to:
+ continue
+ self.mass.create_task(
+ self.mass.players.cmd_stop(member.player_id, skip_redirect=True)
+ )
+ self.mass.players.remove(group_player.player_id, False)
+
+ async def cmd_stop(self, player_id: str) -> None:
+ """Send STOP command to given player."""
+ group_player = self.mass.players.get(player_id)
+ if player_id.startswith(SYNCGROUP_PREFIX):
+ # syncgroup: forward command to sync leader
+ if sync_leader := self._get_sync_leader(group_player):
+ await self.mass.players.cmd_stop(sync_leader.player_id, skip_redirect=True)
+ else:
+ # ugp: forward command to all active members
+ async with TaskManager(self.mass) as tg:
+ for member in self.mass.players.iter_group_members(group_player, active_only=True):
+ if member.state not in (PlayerState.PAUSED, PlayerState.PLAYING):
+ continue
+ tg.create_task(self.mass.players.cmd_stop(member.player_id, skip_redirect=True))
+ # abort the stream session
+ if (stream := self.ugp_streams.pop(player_id, None)) and not stream.done:
+ await stream.stop()
+ # set state optimistically
+ group_player.state = PlayerState.IDLE
+ self.mass.players.update(player_id)
+
+ async def cmd_play(self, player_id: str) -> None:
+ """Send PLAY command to given player."""
+ group_player = self.mass.players.get(player_id)
+ if not player_id.startswith(SYNCGROUP_PREFIX):
+ # this shouldn't happen, but just in case
+ raise UnsupportedFeaturedException("Command is not supported for UGP players")
+ # forward command to sync leader
+ if sync_leader := self._get_sync_leader(group_player):
+ await self.mass.players.cmd_play(sync_leader.player_id, skip_redirect=True)
+
+ async def cmd_pause(self, player_id: str) -> None:
+ """Send PAUSE command to given player."""
+ group_player = self.mass.players.get(player_id)
+ if not player_id.startswith(SYNCGROUP_PREFIX):
+ raise UnsupportedFeaturedException("Command is not supported for UGP players")
+ # forward command to sync leader
+ if sync_leader := self._get_sync_leader(group_player):
+ await self.mass.players.cmd_pause(sync_leader.player_id, skip_redirect=True)
+
+ async def cmd_power(self, player_id: str, powered: bool) -> None:
+ """Handle POWER command to group player."""
+ group_player = self.mass.players.get(player_id, raise_unavailable=True)
+ if TYPE_CHECKING:
+ group_player = cast(Player, group_player)
+
+ # always stop at power off
+ if not powered and group_player.state in (PlayerState.PLAYING, PlayerState.PAUSED):
+ await self.cmd_stop(group_player.player_id)
+
+ async with TaskManager(self.mass) as tg:
+ if powered:
+ # handle TURN_ON of the group player by turning on all members
+ for member in self.mass.players.iter_group_members(
+ group_player, only_powered=False, active_only=False
+ ):
+ if (
+ member.state in (PlayerState.PLAYING, PlayerState.PAUSED)
+ and member.active_source != group_player.active_source
+ ):
+ # stop playing existing content on member if we start the group player
+ tg.create_task(
+ self.mass.players.cmd_stop(member.player_id, skip_redirect=True)
+ )
+ if not member.powered:
+ tg.create_task(
+ self.mass.players.cmd_power(member.player_id, True, skip_redirect=True)
+ )
+ # set active source to group player if the group (is going to be) powered
+ member.active_group = group_player.player_id
+ member.active_source = group_player.active_source
+ else:
+ # handle TURN_OFF of the group player by turning off all members
+ for member in self.mass.players.iter_group_members(
+ group_player, only_powered=True, active_only=True
+ ):
+ # reset active group on player when the group is turned off
+ member.active_group = None
+ member.active_source = None
+ # handle TURN_OFF of the group player by turning off all members
+ if member.powered:
+ tg.create_task(
+ self.mass.players.cmd_power(member.player_id, False, skip_redirect=True)
+ )
+ if powered and player_id.startswith(SYNCGROUP_PREFIX):
+ await self._sync_syncgroup(group_player)
+ # optimistically set the group state
+ group_player.powered = powered
+ self.mass.players.update(group_player.player_id)
+
+ async def cmd_volume_set(self, player_id: str, volume_level: int) -> None:
+ """Send VOLUME_SET command to given player."""
+ # group volume is already handled in the player manager
+
+ async def play_media(
+ self,
+ player_id: str,
+ media: PlayerMedia,
+ ) -> None:
+ """Handle PLAY MEDIA on given player."""
+ group_player = self.mass.players.get(player_id)
+ # power on (or resync) if needed
+ if not group_player.powered:
+ await self.cmd_power(player_id, True)
+ elif player_id.startswith(SYNCGROUP_PREFIX):
+ await self._sync_syncgroup(group_player)
+
+ # set the state optimistically
+ group_player.current_media = media
+ group_player.elapsed_time = 0
+ group_player.elapsed_time_last_updated = time() - 1
+ group_player.state = PlayerState.PLAYING
+ self.mass.players.update(player_id)
+
+ # handle play_media for sync group
+ if player_id.startswith(SYNCGROUP_PREFIX):
+ # simply forward the command to the sync leader
+ if sync_leader := self._select_sync_leader(group_player):
+ await self.mass.players.play_media(
+ sync_leader.player_id, media=media, skip_redirect=True
+ )
+ return
+
+ # handle play_media for UGP group
+ if (existing := self.ugp_streams.pop(player_id, None)) and not existing.done:
+ # stop any existing stream first
+ await existing.stop()
+
+ # select audio source
+ if media.media_type == MediaType.ANNOUNCEMENT:
+ # special case: stream announcement
+ audio_source = self.mass.streams.get_announcement_stream(
+ media.custom_data["url"],
+ output_format=UGP_FORMAT,
+ use_pre_announce=media.custom_data["use_pre_announce"],
+ )
+ elif media.queue_id and media.queue_item_id:
+ # regular queue stream request
+ audio_source = self.mass.streams.get_flow_stream(
+ queue=self.mass.player_queues.get(media.queue_id),
+ start_queue_item=self.mass.player_queues.get_item(
+ media.queue_id, media.queue_item_id
+ ),
+ pcm_format=UGP_FORMAT,
+ )
+ else:
+ # assume url or some other direct path
+ # NOTE: this will fail if its an uri not playable by ffmpeg
+ audio_source = get_ffmpeg_stream(
+ audio_input=media.uri,
+ input_format=AudioFormat(ContentType.try_parse(media.uri)),
+ output_format=UGP_FORMAT,
+ )
+
+ # start the stream task
+ self.ugp_streams[player_id] = UGPStream(audio_source=audio_source, audio_format=UGP_FORMAT)
+ base_url = f"{self.mass.streams.base_url}/ugp/{player_id}.aac"
+
+ # forward to downstream play_media commands
+ async with TaskManager(self.mass) as tg:
+ for member in self.mass.players.iter_group_members(
+ group_player, only_powered=True, active_only=True
+ ):
+ tg.create_task(
+ self.mass.players.play_media(
+ member.player_id,
+ media=PlayerMedia(
+ uri=f"{base_url}?player_id={member.player_id}",
+ media_type=MediaType.FLOW_STREAM,
+ title=group_player.display_name,
+ queue_id=group_player.player_id,
+ ),
+ )
+ )
+
+ async def enqueue_next_media(self, player_id: str, media: PlayerMedia) -> None:
+ """Handle enqueuing of a next media item on the player."""
+ group_player = self.mass.players.get(player_id, True)
+ if not player_id.startswith(SYNCGROUP_PREFIX):
+ # this shouldn't happen, but just in case
+ raise UnsupportedFeaturedException("Command is not supported for UGP players")
+ if sync_leader := self._get_sync_leader(group_player):
+ await self.enqueue_next_media(
+ sync_leader.player_id,
+ media=media,
+ )
+
+ async def poll_player(self, player_id: str) -> None:
+ """Poll player for state updates.
+
+ This is called by the Player Manager;
+ if 'needs_poll' is set to True in the player object.
+ """
+ if group_player := self.mass.players.get(player_id):
+ self._update_attributes(group_player)
+
+ async def create_group(self, group_type: str, name: str, members: list[str]) -> Player:
+ """Create new Group Player."""
+ # perform basic checks
+ if group_type == GROUP_TYPE_UNIVERSAL:
+ prefix = UNIVERSAL_PREFIX
+ else:
+ prefix = SYNCGROUP_PREFIX
+ if (player_prov := self.mass.get_provider(group_type)) is None:
+ msg = f"Provider {group_type} is not available!"
+ raise ProviderUnavailableError(msg)
+ if ProviderFeature.SYNC_PLAYERS not in player_prov.supported_features:
+ msg = f"Provider {player_prov.name} does not support creating groups"
+ raise UnsupportedFeaturedException(msg)
+
+ new_group_id = f"{prefix}{shortuuid.random(8).lower()}"
+ # cleanup list, just in case the frontend sends some garbage
+ members = self._filter_members(group_type, members)
+ # create default config with the user chosen name
+ self.mass.config.create_default_player_config(
+ new_group_id,
+ player_prov.instance_id,
+ name=name,
+ enabled=True,
+ values={CONF_GROUP_MEMBERS: members, CONF_GROUP_TYPE: group_type},
+ )
+ return self._register_group_player(
+ group_player_id=new_group_id, group_type=group_type, name=name, members=members
+ )
+
+ async def _register_all_players(self) -> None:
+ """Register all (virtual/fake) group players in the Player controller."""
+ player_configs = await self.mass.config.get_player_configs(
+ self.instance_id, include_values=True
+ )
+ for player_config in player_configs:
+ if self.mass.players.get(player_config.player_id):
+ continue # already registered
+ members = player_config.get_value(CONF_GROUP_MEMBERS)
+ group_type = player_config.get_value(CONF_GROUP_TYPE)
+ self._register_group_player(
+ player_config.player_id,
+ group_type,
+ player_config.name or player_config.default_name,
+ members,
+ )
+
+ def _register_group_player(
+ self, group_player_id: str, group_type: str, name: str, members: Iterable[str]
+ ) -> Player:
+ """Register a syncgroup player."""
+ player_features = {PlayerFeature.POWER, PlayerFeature.VOLUME_SET}
+ if group_type == GROUP_TYPE_UNIVERSAL:
+ model_name = "Universal Group"
+ manufacturer = self.name
+ # register dynamic route for the ugp stream
+ route_path = f"/ugp/{group_player_id}.aac"
+ self._on_unload.append(
+ self.mass.streams.register_dynamic_route(route_path, self._serve_ugp_stream)
+ )
+ elif player_provider := self.mass.get_provider(group_type):
+ # grab additional details from one of the provider's players
+ if TYPE_CHECKING:
+ player_provider = cast(PlayerProvider, player_provider)
+ model_name = "Sync Group"
+ manufacturer = self.mass.get_provider(group_type).name
+ if child_player := next((x for x in player_provider.players), None):
+ for feature in (
+ PlayerFeature.PAUSE,
+ PlayerFeature.VOLUME_MUTE,
+ ):
+ if feature in child_player.supported_features:
+ player_features.add(feature)
+ else:
+ # this may happen if the provider is not available yet
+ model_name = "Sync Group"
+ manufacturer = self.name
+
+ player = Player(
+ player_id=group_player_id,
+ provider=self.instance_id,
+ type=PlayerType.GROUP,
+ name=name,
+ available=True,
+ powered=False,
+ device_info=DeviceInfo(model=model_name, manufacturer=manufacturer),
+ supported_features=tuple(player_features),
+ group_childs=set(members),
+ active_source=group_player_id,
+ )
+
+ self.mass.players.register_or_update(player)
+ self._update_attributes(player)
+ return player
+
+ def _get_sync_leader(self, group_player: Player) -> Player | None:
+ """Get the active sync leader player for the syncgroup."""
+ if group_player.synced_to:
+ # should not happen but just in case...
+ return self.mass.players.get(group_player.synced_to)
+ # Return the (first/only) player that has group childs
+ for child_player in self.mass.players.iter_group_members(
+ group_player, only_powered=False, only_playing=False, active_only=False
+ ):
+ if child_player.group_childs:
+ return child_player
+ return None
+
+ def _select_sync_leader(self, group_player: Player) -> Player | None:
+ """Select the active sync leader player for a syncgroup."""
+ if sync_leader := self._get_sync_leader(group_player):
+ return sync_leader
+ # select new sync leader: return the first active player
+ for child_player in self.mass.players.iter_group_members(group_player, active_only=True):
+ if child_player.active_group not in (None, group_player.player_id):
+ continue
+ if (
+ child_player.active_source
+ and child_player.active_source != group_player.active_source
+ ):
+ continue
+ return child_player
+ # fallback select new sync leader: simply return the first (available) player
+ for child_player in self.mass.players.iter_group_members(
+ group_player, only_powered=False, only_playing=False, active_only=False
+ ):
+ return child_player
+ # this really should not be possible
+ raise RuntimeError("Impossible to select sync leader for syncgroup")
+
+ async def _sync_syncgroup(self, group_player: Player) -> None:
+ """Sync all (possible) players of a syncgroup."""
+ sync_leader = self._select_sync_leader(group_player)
+ members_to_sync: list[str] = []
+ for member in self.mass.players.iter_group_members(group_player, active_only=True):
+ if sync_leader.player_id == member.player_id:
+ # skip sync leader
+ continue
+ if member.synced_to == sync_leader.player_id:
+ # already synced
+ continue
+ if member.synced_to and member.synced_to != sync_leader.player_id:
+ # unsync first
+ await self.mass.players.cmd_unsync(member.player_id)
+ members_to_sync.append(member.player_id)
+ if members_to_sync:
+ await self.mass.players.cmd_sync_many(sync_leader.player_id, members_to_sync)
+
+ async def _on_mass_player_added_event(self, event: MassEvent) -> None:
+ """Handle player added event from player controller."""
+ await self._register_all_players()
+
+ def _update_attributes(self, player: Player) -> None:
+ """Update attributes of a player."""
+ for child_player in self.mass.players.iter_group_members(player, active_only=True):
+ # just grab the first active player
+ player.state = child_player.state
+ if player.current_media:
+ player.current_media = child_player.current_media
+ player.elapsed_time = child_player.elapsed_time
+ player.elapsed_time_last_updated = child_player.elapsed_time_last_updated
+ break
+ else:
+ player.state = PlayerState.IDLE
+ player.active_source = player.player_id
+ self.mass.players.update(player.player_id)
+
+ async def _serve_ugp_stream(self, request: web.Request) -> web.Response:
+ """Serve the UGP (multi-client) flow stream audio to a player."""
+ ugp_player_id = request.path.rsplit(".")[0].rsplit("/")[-1]
+ child_player_id = request.query.get("player_id") # optional!
+
+ if not (ugp_player := self.mass.players.get(ugp_player_id)):
+ raise web.HTTPNotFound(reason=f"Unknown UGP player: {ugp_player_id}")
+
+ if not (stream := self.ugp_streams.get(ugp_player_id, None)) or stream.done:
+ raise web.HTTPNotFound(body=f"There is no active UGP stream for {ugp_player_id}!")
+
+ http_profile: str = await self.mass.config.get_player_config_value(
+ child_player_id, CONF_HTTP_PROFILE
+ )
+ headers = {
+ **DEFAULT_STREAM_HEADERS,
+ "Content-Type": "audio/aac",
+ "Accept-Ranges": "none",
+ "Cache-Control": "no-cache",
+ "Connection": "close",
+ }
+
+ resp = web.StreamResponse(status=200, reason="OK", headers=headers)
+ if http_profile == "forced_content_length":
+ resp.content_length = 4294967296
+ elif http_profile == "chunked":
+ resp.enable_chunked_encoding()
+
+ await resp.prepare(request)
+
+ # return early if this is not a GET request
+ if request.method != "GET":
+ return resp
+
+ # all checks passed, start streaming!
+ self.logger.debug(
+ "Start serving UGP flow audio stream for UGP-player %s to %s",
+ ugp_player.display_name,
+ child_player_id or request.remote,
+ )
+ async for chunk in stream.subscribe():
+ try:
+ await resp.write(chunk)
+ except (ConnectionError, ConnectionResetError):
+ break
+
+ return resp
+
+ def _filter_members(self, provider: str, members: list[str]) -> list[str]:
+ """Filter out members that are not valid players."""
+ if provider != GROUP_TYPE_UNIVERSAL:
+ return [
+ x
+ for x in members
+ if (player := self.mass.players.get(x)) and player.provider == provider
+ ]
+ # cleanup members - filter out impossible choices
+ syncgroup_childs: list[str] = []
+ for member in members:
+ if not member.startswith(SYNCGROUP_PREFIX):
+ continue
+ if syncgroup := self.mass.players.get(member):
+ syncgroup_childs.extend(syncgroup.group_childs)
+ # we filter out other UGP players and syncgroup childs
+ # if their parent is already in the list
+ return [
+ x
+ for x in members
+ if self.mass.players.get(x)
+ and x not in syncgroup_childs
+ and not x.startswith(UNIVERSAL_PREFIX)
+ ]
--- /dev/null
+{
+ "type": "player",
+ "domain": "player_group",
+ "name": "Playergroup",
+ "description": "Create (permanent) groups of your favorite players. \nSupports both syncgroups (to group speakers of the same ecocystem to play in sync) and universal groups to group speakers of different ecosystems to play the same audio (but not in sync).",
+ "codeowners": ["@music-assistant"],
+ "requirements": [],
+ "documentation": "https://music-assistant.io/player-support/player_group/",
+ "multi_instance": false,
+ "builtin": true,
+ "allow_disable": false,
+ "icon": "speaker-multiple"
+}
--- /dev/null
+"""
+Implementation of a Stream for the Universal Group Player.
+
+Basically this is like a fake radio radio stream (AAC) format with multiple subscribers.
+The AAC format is chosen because it is widely supported and has a good balance between
+quality and bandwidth and also allows for mid-stream joining of (extra) players.
+"""
+
+from __future__ import annotations
+
+import asyncio
+from collections.abc import AsyncGenerator, Awaitable, Callable
+
+from music_assistant.common.models.enums import ContentType
+from music_assistant.common.models.media_items import AudioFormat
+from music_assistant.server.helpers.audio import get_ffmpeg_stream
+
+# ruff: noqa: ARG002
+
+UGP_FORMAT = AudioFormat(
+ content_type=ContentType.PCM_F32LE,
+ sample_rate=44100,
+ bit_depth=32,
+)
+
+
+class UGPStream:
+ """
+ Implementation of a Stream for the Universal Group Player.
+
+ Basically this is like a fake radio radio stream (AAC) format with multiple subscribers.
+ The AAC format is chosen because it is widely supported and has a good balance between
+ quality and bandwidth and also allows for mid-stream joining of (extra) players.
+ """
+
+ def __init__(
+ self,
+ audio_source: AsyncGenerator[bytes, None],
+ audio_format: AudioFormat,
+ ) -> None:
+ """Initialize UGP Stream."""
+ self.audio_source = audio_source
+ self.input_format = audio_format
+ self.output_format = AudioFormat(content_type=ContentType.AAC)
+ self.subscribers: list[Callable[[bytes], Awaitable]] = []
+ self._task: asyncio.Task | None = None
+ self._done: asyncio.Event = asyncio.Event()
+
+ @property
+ def done(self) -> bool:
+ """Return if this stream is already done."""
+ return self._done.is_set() and self._task and self._task.done()
+
+ async def stop(self) -> None:
+ """Stop/cancel the stream."""
+ if self._done.is_set():
+ return
+ if self._task and not self._task.done():
+ self._task.cancel()
+ self._done.set()
+
+ async def subscribe(self) -> AsyncGenerator[bytes, None]:
+ """Subscribe to the raw/unaltered audio stream."""
+ # start the runner as soon as the (first) client connects
+ if not self._task:
+ self._task = asyncio.create_task(self._runner())
+ queue = asyncio.Queue(1)
+ try:
+ self.subscribers.append(queue.put)
+ while True:
+ chunk = await queue.get()
+ if not chunk:
+ break
+ yield chunk
+ finally:
+ self.subscribers.remove(queue.put)
+
+ async def _runner(self) -> None:
+ """Run the stream for the given audio source."""
+ await asyncio.sleep(0.25) # small delay to allow subscribers to connect
+ async for chunk in get_ffmpeg_stream(
+ audio_input=self.audio_source,
+ input_format=self.input_format,
+ output_format=self.output_format,
+ # TODO: enable readrate limiting + initial burst once we have a newer ffmpeg version
+ # extra_input_args=["-readrate", "1.15"],
+ ):
+ await asyncio.gather(*[sub(chunk) for sub in self.subscribers], return_exceptions=True)
+ # empty chunk when done
+ await asyncio.gather(*[sub(b"") for sub in self.subscribers], return_exceptions=True)
+ self._done.set()
import time
from collections import deque
from collections.abc import Iterator
-from contextlib import suppress
from dataclasses import dataclass
from typing import TYPE_CHECKING
from music_assistant.server.helpers.audio import get_ffmpeg_stream, get_player_filter_params
from music_assistant.server.helpers.util import TaskManager
from music_assistant.server.models.player_provider import PlayerProvider
-from music_assistant.server.providers.ugp import UniversalGroupProvider
+from music_assistant.server.providers.player_group import PlayerGroupProvider
from .multi_client_stream import MultiClientStream
async def get_player_config_entries(self, player_id: str) -> tuple[ConfigEntry]:
"""Return all (provider/player specific) Config Entries for the given player (if any)."""
base_entries = await super().get_player_config_entries(player_id)
- if not (slimclient := self.slimproto.get_player(player_id)):
- # most probably a syncgroup
- return (
- *base_entries,
- CONF_ENTRY_CROSSFADE,
- CONF_ENTRY_CROSSFADE_DURATION,
- CONF_ENTRY_HTTP_PROFILE_FORCED_2,
- create_sample_rates_config_entry(96000, 24, 48000, 24),
- )
-
+ if slimclient := self.slimproto.get_player(player_id):
+ max_sample_rate = int(slimclient.max_sample_rate)
+ else:
+ # player not (yet) connected? use default
+ max_sample_rate = 48000
# create preset entries (for players that support it)
preset_entries = ()
presets = []
)
for index in range(1, preset_count + 1)
)
-
return (
base_entries
+ preset_entries
CONF_ENTRY_DISPLAY,
CONF_ENTRY_VISUALIZATION,
CONF_ENTRY_HTTP_PROFILE_FORCED_2,
- create_sample_rates_config_entry(int(slimclient.max_sample_rate), 24, 48000, 24),
+ create_sample_rates_config_entry(max_sample_rate, 24, 48000, 24),
)
)
)
elif media.queue_id.startswith("ugp_"):
# special case: UGP stream
- ugp_provider: UniversalGroupProvider = self.mass.get_provider("ugp")
- ugp_stream = ugp_provider.streams[media.queue_id]
- audio_source = ugp_stream.subscribe_raw()
+ ugp_provider: PlayerGroupProvider = self.mass.get_provider("player_group")
+ ugp_stream = ugp_provider.ugp_streams[media.queue_id]
+ audio_source = ugp_stream.subscribe()
elif media.queue_id and media.queue_item_id:
# regular queue stream request
audio_source = self.mass.streams.get_flow_stream(
parent_player.group_childs.add(child_player.player_id)
child_player.synced_to = parent_player.player_id
# check if we should (re)start or join a stream session
+ # TODO: support late joining of a client into an existing stream session
+ # so it doesn't need to be restarted anymore.
active_queue = self.mass.player_queues.get_active_queue(parent_player.player_id)
if active_queue.state == PlayerState.PLAYING:
# playback needs to be restarted to form a new multi client stream session
)
else:
# make sure that the player manager gets an update
- self.mass.players.update(child_player.player_id, skip_forward=True)
- self.mass.players.update(parent_player.player_id, skip_forward=True)
+ self.mass.players.update(child_player.player_id, skip_redirect=True)
+ self.mass.players.update(parent_player.player_id, skip_redirect=True)
async def cmd_unsync(self, player_id: str) -> None:
- """Handle UNSYNC command for given player."""
- child_player = self.mass.players.get(player_id)
- parent_player = self.mass.players.get(child_player.synced_to)
- # make sure to send stop to the player
- await self.cmd_stop(child_player.player_id)
- child_player.synced_to = None
- with suppress(KeyError):
- parent_player.group_childs.remove(child_player.player_id)
- if parent_player.group_childs == {parent_player.player_id}:
- # last child vanished; the sync group is dissolved
- parent_player.group_childs.remove(parent_player.player_id)
- self.mass.players.update(child_player.player_id)
- self.mass.players.update(parent_player.player_id)
+ """Handle UNSYNC command for given player.
+
+ Remove the given player from any syncgroups it currently is synced to.
+
+ - player_id: player_id of the player to handle the command.
+ """
+ player = self.mass.players.get(player_id, raise_unavailable=True)
+ if player.synced_to:
+ group_leader = self.mass.players.get(player.synced_to, raise_unavailable=True)
+ group_leader.group_childs.remove(player_id)
+ player.synced_to = None
+ if slimclient := self.slimproto.get_player(player_id):
+ await slimclient.stop()
+ # make sure that the player manager gets an update
+ self.mass.players.update(player.player_id, skip_redirect=True)
+ self.mass.players.update(group_leader.player_id, skip_redirect=True)
def _client_callback(
self,
PlayerFeature.PAUSE,
PlayerFeature.VOLUME_MUTE,
),
- can_sync_with=tuple(
- x.player_id for x in self.slimproto.players if x.player_id != player_id
- ),
)
self.mass.players.register_or_update(player)
await self._set_display(slimplayer)
# update all attributes
await self._handle_player_update(slimplayer)
- # update existing players so they can update their `can_sync_with` field
- for _player in self.players:
- _player.can_sync_with = tuple(
- x.player_id for x in self.slimproto.players if x.player_id != _player.player_id
- )
- self.mass.players.update(_player.player_id)
# restore volume and power state
if last_state := await self.mass.cache.get(player_id, base_key=CACHE_KEY_PREV_STATE):
init_power = last_state[0]
from music_assistant.common.models.provider import ProviderManifest
from music_assistant.server import MusicAssistant
from music_assistant.server.models import ProviderInstanceType
- from music_assistant.server.providers.ugp import UniversalGroupProvider
+ from music_assistant.server.providers.player_group import PlayerGroupProvider
CONF_SERVER_HOST = "snapcast_server_host"
CONF_SERVER_CONTROL_PORT = "snapcast_server_control_port"
else:
return self._get_ma_id(snap_client_id)
- def _can_sync_with(self, player_id: str) -> None:
- mass_player = self.mass.players.get(player_id)
- mass_player.can_sync_with = tuple(
- self._get_ma_id(snap_client.identifier)
- for snap_client in self._snapserver.clients
- if self._get_ma_id(snap_client.identifier) != player_id
- )
-
- self.mass.players.update(mass_player.player_id)
-
@property
def supported_features(self) -> tuple[ProviderFeature, ...]:
"""Return the features supported by this Provider."""
PlayerFeature.VOLUME_SET,
PlayerFeature.VOLUME_MUTE,
),
- can_sync_with=[],
group_childs=set(),
synced_to=self._synced_to(player_id),
)
player.active_source = stream.name
else:
player.active_source = player_id
- self._can_sync_with(player_id)
self._group_childs(player_id)
self.mass.players.update(player_id)
for mass_child_id in list(mass_player.group_childs):
if mass_child_id != player_id:
await self.cmd_unsync(mass_child_id)
- else:
- mass_sync_master_player = self.mass.players.get(mass_player.synced_to)
- mass_sync_master_player.group_childs.remove(player_id)
- mass_player.synced_to = None
- snap_client_id = self._get_snapclient_id(player_id)
- group = self._get_snapgroup(player_id)
- await group.remove_client(snap_client_id)
+ return
+ mass_sync_master_player = self.mass.players.get(mass_player.synced_to)
+ mass_sync_master_player.group_childs.remove(player_id)
+ mass_player.synced_to = None
+ snap_client_id = self._get_snapclient_id(player_id)
+ group = self._get_snapgroup(player_id)
+ await group.remove_client(snap_client_id)
# assign default/empty stream to the player
await self._get_snapgroup(player_id).set_stream("default")
await self.cmd_stop(player_id=player_id)
+ # make sure that the player manager gets an update
+ self.mass.players.update(player_id, skip_redirect=True)
+ self.mass.players.update(mass_player.synced_to, skip_redirect=True)
async def play_media(self, player_id: str, media: PlayerMedia) -> None:
"""Handle PLAY MEDIA on given player."""
)
elif media.queue_id.startswith("ugp_"):
# special case: UGP stream
- ugp_provider: UniversalGroupProvider = self.mass.get_provider("ugp")
- ugp_stream = ugp_provider.streams[media.queue_id]
+ ugp_provider: PlayerGroupProvider = self.mass.get_provider("ugp")
+ ugp_stream = ugp_provider.ugp_streams[media.queue_id]
input_format = ugp_stream.output_format
audio_source = ugp_stream.subscribe()
elif media.queue_id and media.queue_item_id:
from music_assistant.common.models.errors import PlayerCommandFailed
from music_assistant.common.models.event import MassEvent
from music_assistant.common.models.player import DeviceInfo, Player, PlayerMedia
-from music_assistant.constants import (
- CONF_CROSSFADE,
- MASS_LOGO_ONLINE,
- SYNCGROUP_PREFIX,
- VERBOSE_LOG_LEVEL,
-)
-from music_assistant.server.helpers.util import TaskManager
+from music_assistant.constants import CONF_CROSSFADE, MASS_LOGO_ONLINE, VERBOSE_LOG_LEVEL
from music_assistant.server.models.player_provider import PlayerProvider
if TYPE_CHECKING:
self.mass_player.volume_level = self.client.player.volume_level or 0
self.mass_player.volume_muted = self.client.player.volume_muted
- # work out 'can sync with' for this player
- self.mass_player.can_sync_with = tuple(
- x
- for x in self.prov.sonos_players
- if x != self.player_id
- and x in self.prov.sonos_players
- and self.prov.sonos_players[x].client.household_id == self.client.household_id
- )
-
group_parent = None
if self.client.player.is_coordinator:
# player is group coordinator
self.mass_player.group_childs = set()
self.mass_player.synced_to = active_group.coordinator_id
self.mass_player.active_source = active_group.coordinator_id
- self.mass_player.can_sync_with = ()
if airplay := self.get_linked_airplay_player(True):
# linked airplay player is active, update media from there
player_id: str,
) -> tuple[ConfigEntry, ...]:
"""Return Config Entries for the given player."""
- base_entries = await super().get_player_config_entries(player_id)
- if not (sonos_player := self.sonos_players.get(player_id)):
- # most probably a syncgroup or the player is not yet discovered
- return (*base_entries, CONF_ENTRY_CROSSFADE, CONF_ENTRY_FLOW_MODE_HIDDEN_DISABLED)
- return (
- *base_entries,
+ base_entries = (
+ *await super().get_player_config_entries(player_id),
CONF_ENTRY_CROSSFADE,
CONF_ENTRY_FLOW_MODE_HIDDEN_DISABLED,
create_sample_rates_config_entry(48000, 24, 48000, 24, True),
+ )
+ if not (sonos_player := self.sonos_players.get(player_id)):
+ # most probably the player is not yet discovered
+ return base_entries
+ return (
+ *base_entries,
ConfigEntry(
key=CONF_AIRPLAY_MODE,
type=ConfigEntryType.BOOLEAN,
self, player_id: str, announcement: PlayerMedia, volume_level: int | None = None
) -> None:
"""Handle (provider native) playback of an announcement on given player."""
- if player_id.startswith(SYNCGROUP_PREFIX):
- # handle syncgroup, unwrap to all underlying child's
- async with TaskManager(self.mass) as tg:
- if group_player := self.mass.players.get(player_id):
- # execute on all child players
- for child_player_id in group_player.group_childs:
- tg.create_task(
- self.play_announcement(child_player_id, announcement, volume_level)
- )
- return
sonos_player = self.sonos_players[player_id]
self.logger.debug(
"Playing announcement %s using websocket audioclip on %s",
self, player_id, discovery_info=discovery_info, ip_address=address
)
await sonos_player.setup()
- # when we add a new player, update 'can_sync_with' for all other players
- for other_player_id in self.sonos_players:
- if other_player_id == player_id:
- continue
- self.sonos_players[other_player_id].update_attributes()
async def _handle_sonos_queue_itemwindow(self, request: web.Request) -> web.Response:
"""
self.mass_player.elapsed_time_last_updated = self.position_updated_at.timestamp()
# zone topology (syncing/grouping) details
- self.mass_player.can_sync_with = tuple(
- x.player_id
- for x in self.sonos_prov.sonosplayers.values()
- if x.player_id != self.player_id
- )
if self.sync_coordinator:
# player is synced to another player
self.mass_player.synced_to = self.sync_coordinator.player_id
+++ /dev/null
-"""
-Universal Group Player provider.
-
-This is more like a "virtual" player provider,
-allowing the user to create player groups from all players known in the system.
-"""
-
-from __future__ import annotations
-
-import asyncio
-from collections.abc import AsyncGenerator, Awaitable, Callable
-from time import time
-from typing import TYPE_CHECKING, Final, cast
-
-import shortuuid
-from aiohttp import web
-
-from music_assistant.common.models.config_entries import (
- CONF_ENTRY_CROSSFADE,
- CONF_ENTRY_CROSSFADE_DURATION,
- CONF_ENTRY_FLOW_MODE_ENFORCED,
- ConfigEntry,
- ConfigValueOption,
- ConfigValueType,
- PlayerConfig,
- create_sample_rates_config_entry,
-)
-from music_assistant.common.models.enums import (
- ConfigEntryType,
- ContentType,
- MediaType,
- PlayerFeature,
- PlayerState,
- PlayerType,
- ProviderFeature,
-)
-from music_assistant.common.models.media_items import AudioFormat
-from music_assistant.common.models.player import DeviceInfo, Player, PlayerMedia
-from music_assistant.constants import CONF_GROUP_MEMBERS, CONF_HTTP_PROFILE, SYNCGROUP_PREFIX
-from music_assistant.server.controllers.streams import DEFAULT_STREAM_HEADERS
-from music_assistant.server.helpers.audio import get_ffmpeg_stream
-from music_assistant.server.helpers.util import TaskManager
-from music_assistant.server.models.player_provider import PlayerProvider
-
-if TYPE_CHECKING:
- from collections.abc import Iterable
-
- from music_assistant.common.models.config_entries import ProviderConfig
- from music_assistant.common.models.provider import ProviderManifest
- from music_assistant.server import MusicAssistant
- from music_assistant.server.models import ProviderInstanceType
-
-
-# ruff: noqa: ARG002
-
-UGP_FORMAT = AudioFormat(
- content_type=ContentType.PCM_F32LE,
- sample_rate=44100,
- bit_depth=32,
-)
-UGP_PREFIX = "ugp_"
-
-CONF_ACTION_CREATE_PLAYER = "create_player"
-CONF_ACTION_CREATE_PLAYER_SAVE = "create_player_save"
-CONF_ENTRY_SAMPLE_RATES_UGP = create_sample_rates_config_entry(44100, 16, 44100, 16, True)
-CONF_GROUP_PLAYERS: Final[str] = "group_players"
-CONF_NEW_GROUP_NAME: Final[str] = "name"
-CONF_NEW_GROUP_MEMBERS: Final[list[str]] = "members"
-
-CONFIG_ENTRY_UGP_NOTE = ConfigEntry(
- key="ugp_note",
- type=ConfigEntryType.LABEL,
- label="Please note that although the universal group "
- "allows you to group any player, it will not enable audio sync "
- "between players of different ecosystems. It is advised to always use native "
- "player groups or sync groups when available for your player type(s) and use "
- "the Universal Group only to group players of different ecosystems.",
- required=False,
-)
-
-
-async def setup(
- mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
-) -> ProviderInstanceType:
- """Initialize provider(instance) with given configuration."""
- return UniversalGroupProvider(mass, manifest, config)
-
-
-async def get_config_entries(
- mass: MusicAssistant,
- instance_id: str | None = None,
- action: str | None = None,
- values: dict[str, ConfigValueType] | None = None,
-) -> tuple[ConfigEntry, ...]:
- """
- Return Config entries to setup this provider.
-
- instance_id: id of an existing provider instance (None if new instance setup).
- action: [optional] action key called from config entries UI.
- values: the (intermediate) raw values for config entries sent with the action.
- """
- if not (ugp_provider := mass.get_provider(instance_id)):
- # UGP provider is not (yet) loaded
- return ()
- if TYPE_CHECKING:
- ugp_provider = cast(UniversalGroupProvider, ugp_provider)
- if action == CONF_ACTION_CREATE_PLAYER:
- # create new group player
- name = values.pop(CONF_NEW_GROUP_NAME)
- members: list[str] = values.pop(CONF_GROUP_MEMBERS)
- members = ugp_provider._filter_members(members)
- await ugp_provider.create_group(name, members)
- return (
- ConfigEntry(
- key="ugp_note",
- type=ConfigEntryType.LABEL,
- label=f"Your new Universal Group Player {name} has been created and "
- "is available in the players list.",
- required=False,
- ),
- )
- return (
- ConfigEntry(
- key="ugp_new",
- type=ConfigEntryType.LABEL,
- label="Fill in the details below to create a new Universal Group "
- "Player and click the 'Create new universal group' button.",
- required=False,
- ),
- ConfigEntry(
- key=CONF_NEW_GROUP_NAME,
- type=ConfigEntryType.STRING,
- label="Name",
- required=False,
- ),
- ConfigEntry(
- key=CONF_GROUP_MEMBERS,
- type=ConfigEntryType.STRING,
- label=CONF_NEW_GROUP_MEMBERS,
- default_value=[],
- options=tuple(
- ConfigValueOption(x.display_name, x.player_id)
- for x in mass.players.all(True, False)
- ),
- multi_value=True,
- required=False,
- ),
- ConfigEntry(
- key=CONF_ACTION_CREATE_PLAYER,
- type=ConfigEntryType.ACTION,
- label="Create new Universal Player Group",
- required=False,
- ),
- CONFIG_ENTRY_UGP_NOTE,
- )
-
-
-class UniversalGroupProvider(PlayerProvider):
- """Base/builtin provider for universally grouping players."""
-
- @property
- def supported_features(self) -> tuple[ProviderFeature, ...]:
- """Return the features supported by this Provider."""
- return ()
-
- def __init__(
- self, mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
- ) -> None:
- """Initialize MusicProvider."""
- super().__init__(mass, manifest, config)
- self._registered_routes: set[str] = set()
- self.streams: dict[str, UGPStream] = {}
-
- async def loaded_in_mass(self) -> None:
- """Call after the provider has been loaded."""
- await self._register_all_players()
-
- async def unload(self) -> None:
- """
- Handle unload/close of the provider.
-
- Called when provider is deregistered (e.g. MA exiting or config reloading).
- """
- for route_path in list(self._registered_routes):
- self._registered_routes.remove(route_path)
- self.mass.streams.unregister_dynamic_route(route_path)
-
- async def get_player_config_entries(self, player_id: str) -> tuple[ConfigEntry]:
- """Return all (provider/player specific) Config Entries for the given player (if any)."""
- base_entries = await super().get_player_config_entries(player_id)
- return (
- *base_entries,
- CONF_ENTRY_FLOW_MODE_ENFORCED,
- ConfigEntry(
- key=CONF_GROUP_MEMBERS,
- type=ConfigEntryType.STRING,
- label="Group members",
- default_value=[],
- options=tuple(
- ConfigValueOption(x.display_name, x.player_id)
- for x in self.mass.players.all(True, False)
- if x.player_id != player_id and not x.player_id.startswith(UGP_PREFIX)
- ),
- description="Select all players you want to be part of this universal group",
- multi_value=True,
- required=True,
- ),
- CONFIG_ENTRY_UGP_NOTE,
- CONF_ENTRY_CROSSFADE,
- CONF_ENTRY_CROSSFADE_DURATION,
- CONF_ENTRY_SAMPLE_RATES_UGP,
- )
-
- def on_player_config_changed(self, config: PlayerConfig, changed_keys: set[str]) -> None:
- """Call (by config manager) when the configuration of a player changes."""
- if f"values/{CONF_GROUP_MEMBERS}" in changed_keys:
- player = self.mass.players.get(config.player_id)
- members = config.get_value(CONF_GROUP_MEMBERS)
- # ensure we filter invalid members
- members = self._filter_members(members)
- player.group_childs = members
- self.mass.config.set_raw_player_config_value(
- config.player_id, CONF_GROUP_PLAYERS, members
- )
- self.mass.players.update(config.player_id)
-
- def on_player_config_removed(self, player_id: str) -> None:
- """Call (by config manager) when the configuration of a player is removed."""
- # ensure that any group players get removed
- group_players = self.mass.config.get_raw_provider_config_value(
- self.instance_id, CONF_GROUP_PLAYERS, {}
- )
- if player_id in group_players:
- del group_players[player_id]
- self.mass.config.set_raw_provider_config_value(
- self.instance_id, CONF_GROUP_PLAYERS, group_players
- )
-
- async def cmd_stop(self, player_id: str) -> None:
- """Send STOP command to given player."""
- group_player = self.mass.players.get(player_id)
- group_player.state = PlayerState.IDLE
- self.mass.players.update(player_id)
- # forward command to player and any connected sync child's
- async with TaskManager(self.mass) as tg:
- for member in self.mass.players.iter_group_members(group_player, only_powered=True):
- if member.state == PlayerState.IDLE:
- continue
- tg.create_task(self.mass.players.cmd_stop(member.player_id))
- if (stream := self.streams.pop(player_id, None)) and not stream.done:
- await stream.stop()
-
- async def cmd_play(self, player_id: str) -> None:
- """Send PLAY command to given player."""
-
- async def cmd_power(self, player_id: str, powered: bool) -> None:
- """Send POWER command to given UGP group player."""
- group_player = self.mass.players.get(player_id, True)
-
- if group_player.powered == powered:
- return # nothing to do
-
- # make sure to update the group power state
- group_player.powered = powered
-
- any_member_powered = False
- async with TaskManager(self.mass) as tg:
- for member in self.mass.players.iter_group_members(group_player, only_powered=True):
- any_member_powered = True
- if powered:
- if member.state in (PlayerState.PLAYING, PlayerState.PAUSED):
- # stop playing existing content on member if we start the group player
- tg.create_task(self.cmd_stop(member.player_id))
- # set active source to group player if the group (is going to be) powered
- member.active_group = group_player.active_group
- member.active_source = group_player.active_source
- self.mass.players.update(member.player_id, skip_forward=True)
- else:
- # turn off child player when group turns off
- tg.create_task(self.cmd_power(member.player_id, False))
- # reset active source on player
- member.active_source = None
- member.active_group = None
- self.mass.players.update(member.player_id, skip_forward=True)
- # edge case: group turned on but no members are powered, power them all!
- # TODO: Do we want to make this configurable ?
- if not any_member_powered and powered:
- for member in self.mass.players.iter_group_members(
- group_player, only_powered=False
- ):
- tg.create_task(self.cmd_power(member.player_id, True))
- member.active_group = group_player.player_id
- member.active_source = group_player.active_source
-
- self.mass.players.update(player_id)
-
- async def cmd_volume_set(self, player_id: str, volume_level: int) -> None:
- """Send VOLUME_SET command to given player."""
- # group volume is already handled in the player manager
-
- async def play_media(
- self,
- player_id: str,
- media: PlayerMedia,
- ) -> None:
- """Handle PLAY MEDIA on given player."""
- # power ON
- await self.cmd_power(player_id, True)
- group_player = self.mass.players.get(player_id)
- # stop any existing stream first
- if (existing := self.streams.pop(player_id, None)) and not existing.done:
- await existing.stop()
-
- # select audio source
- if media.media_type == MediaType.ANNOUNCEMENT:
- # special case: stream announcement
- audio_source = self.mass.streams.get_announcement_stream(
- media.custom_data["url"],
- output_format=UGP_FORMAT,
- use_pre_announce=media.custom_data["use_pre_announce"],
- )
- elif media.queue_id and media.queue_item_id:
- # regular queue stream request
- audio_source = self.mass.streams.get_flow_stream(
- queue=self.mass.player_queues.get(media.queue_id),
- start_queue_item=self.mass.player_queues.get_item(
- media.queue_id, media.queue_item_id
- ),
- pcm_format=UGP_FORMAT,
- )
- else:
- # assume url or some other direct path
- # NOTE: this will fail if its an uri not playable by ffmpeg
- audio_source = get_ffmpeg_stream(
- audio_input=media.uri,
- input_format=AudioFormat(ContentType.try_parse(media.uri)),
- output_format=UGP_FORMAT,
- )
-
- # start the stream task
- self.streams[player_id] = UGPStream(audio_source=audio_source, audio_format=UGP_FORMAT)
- base_url = f"{self.mass.streams.base_url}/ugp/{player_id}.aac"
-
- # forward to downstream play_media commands
- async with TaskManager(self.mass) as tg:
- for member in self.mass.players.iter_group_members(group_player, only_powered=True):
- tg.create_task(
- self.mass.players.play_media(
- member.player_id,
- media=PlayerMedia(
- uri=f"{base_url}?player_id={member.player_id}",
- media_type=MediaType.FLOW_STREAM,
- title=group_player.display_name,
- queue_id=group_player.player_id,
- ),
- )
- )
- # set the state optimistically
- group_player.elapsed_time = 0
- group_player.elapsed_time_last_updated = time() - 1
- group_player.state = PlayerState.PLAYING
- self.mass.players.update(player_id)
-
- async def create_group(self, name: str, members: list[str]) -> Player:
- """Create new PlayerGroup on this provider.
-
- Create a new PlayerGroup with given name and members.
-
- - name: Name for the new group to create.
- - members: A list of player_id's that should be part of this group.
- """
- new_group_id = f"{UGP_PREFIX}{shortuuid.random(8).lower()}"
- # cleanup list, filter groups (should be handled by frontend, but just in case)
- members = self._filter_members(members)
- # create default config with the user chosen name
- self.mass.config.create_default_player_config(
- new_group_id,
- self.instance_id,
- name=name,
- enabled=True,
- values={CONF_GROUP_MEMBERS: members},
- )
- return self._register_group_player(new_group_id, name=name, members=members)
-
- async def _register_all_players(self) -> None:
- """Register all (virtual/fake) group players in the Player controller."""
- player_configs = await self.mass.config.get_player_configs(
- self.instance_id, include_values=True
- )
- for player_config in player_configs:
- members = player_config.get_value(CONF_GROUP_MEMBERS)
- self._register_group_player(
- player_config.player_id,
- player_config.name or player_config.default_name,
- members,
- )
-
- def _register_group_player(
- self, group_player_id: str, name: str, members: Iterable[str]
- ) -> Player:
- """Register a UGP group player in the Player controller."""
- player = Player(
- player_id=group_player_id,
- provider=self.instance_id,
- type=PlayerType.GROUP,
- name=name,
- available=True,
- powered=False,
- device_info=DeviceInfo(model="Group", manufacturer=self.name),
- supported_features=(PlayerFeature.VOLUME_SET, PlayerFeature.POWER),
- group_childs=set(members),
- )
- self.mass.players.register_or_update(player)
- # register dynamic route for the ugp stream
- route_path = f"/ugp/{group_player_id}.aac"
- self.mass.streams.register_dynamic_route(route_path, self._serve_ugp_stream)
- self._registered_routes.add(route_path)
-
- return player
-
- def on_group_child_power(
- self, group_player: Player, child_player: Player, new_power: bool, group_state: PlayerState
- ) -> None:
- """
- Call when a power command was executed on one of the child player of a PlayerGroup.
-
- The group state is sent with the state BEFORE the power command was executed.
- """
- if not group_player.powered:
- # guard, this should be caught in the player controller but just in case...
- return None
-
- powered_childs = [
- x
- for x in self.mass.players.iter_group_members(group_player, True)
- if not (not new_power and x.player_id == child_player.player_id)
- ]
- if new_power and child_player not in powered_childs:
- powered_childs.append(child_player)
-
- # if the last player of a group turned off, turn off the group
- if len(powered_childs) == 0:
- self.logger.debug(
- "Group %s has no more powered members, turning off group player",
- group_player.display_name,
- )
- self.mass.create_task(self.cmd_power(group_player.player_id, False))
- return False
-
- # if a child player turned ON while the group player is already playing
- # we just direct it to the existing stream (we dont care about the audio being in sync)
- if new_power and group_state == PlayerState.PLAYING:
- base_url = f"{self.mass.streams.base_url}/ugp/{group_player.player_id}.aac"
- self.mass.create_task(
- self.mass.players.play_media(
- child_player.player_id,
- media=PlayerMedia(
- uri=f"{base_url}?player_id={child_player.player_id}",
- media_type=MediaType.FLOW_STREAM,
- title=group_player.display_name,
- queue_id=group_player.player_id,
- ),
- )
- )
-
- return None
-
- async def _serve_ugp_stream(self, request: web.Request) -> web.Response:
- """Serve the UGP (multi-client) flow stream audio to a player."""
- ugp_player_id = request.path.rsplit(".")[0].rsplit("/")[-1]
- child_player_id = request.query.get("player_id") # optional!
-
- if not (ugp_player := self.mass.players.get(ugp_player_id)):
- raise web.HTTPNotFound(reason=f"Unknown UGP player: {ugp_player_id}")
-
- if not (stream := self.streams.get(ugp_player_id, None)) or stream.done:
- raise web.HTTPNotFound(body=f"There is no active UGP stream for {ugp_player_id}!")
-
- http_profile: str = await self.mass.config.get_player_config_value(
- child_player_id, CONF_HTTP_PROFILE
- )
- headers = {
- **DEFAULT_STREAM_HEADERS,
- "Content-Type": "audio/aac",
- "Accept-Ranges": "none",
- "Cache-Control": "no-cache",
- "Connection": "close",
- }
-
- resp = web.StreamResponse(status=200, reason="OK", headers=headers)
- if http_profile == "forced_content_length":
- resp.content_length = 4294967296
- elif http_profile == "chunked":
- resp.enable_chunked_encoding()
-
- await resp.prepare(request)
-
- # return early if this is not a GET request
- if request.method != "GET":
- return resp
-
- # all checks passed, start streaming!
- self.logger.debug(
- "Start serving UGP flow audio stream for UGP-player %s to %s",
- ugp_player.display_name,
- child_player_id or request.remote,
- )
- async for chunk in stream.subscribe():
- try:
- await resp.write(chunk)
- except (ConnectionError, ConnectionResetError):
- break
-
- return resp
-
- def _filter_members(self, members: list[str]) -> list[str]:
- """Filter out members that are not valid players."""
- # cleanup members - filter out impossible choices
- syncgroup_childs: list[str] = []
- for member in members:
- if not member.startswith(SYNCGROUP_PREFIX):
- continue
- if syncgroup := self.mass.players.get(member):
- syncgroup_childs.extend(syncgroup.group_childs)
- # we filter out other UGP players and syncgroup childs
- # if their parent is already in the list
- return [
- x
- for x in members
- if self.mass.players.get(x)
- and x not in syncgroup_childs
- and not x.startswith(UGP_PREFIX)
- ]
-
-
-class UGPStream:
- """
- Implementation of a Stream for the Universal Group Player.
-
- Basiclaly this is like a fake radio radio stream (AAC) format with multiple subscribers.
- The AAC format is chosen because it is widely supported and has a good balance between
- quality and bandwidth and also allows for mid-stream joining of (extra) players.
- """
-
- def __init__(
- self,
- audio_source: AsyncGenerator[bytes, None],
- audio_format: AudioFormat,
- ) -> None:
- """Initialize UGP Stream."""
- self.audio_source = audio_source
- self.input_format = audio_format
- self.output_format = AudioFormat(content_type=ContentType.AAC)
- self.subscribers: list[Callable[[bytes], Awaitable]] = []
- self._task: asyncio.Task | None = None
- self._done: asyncio.Event = asyncio.Event()
-
- @property
- def done(self) -> bool:
- """Return if this stream is already done."""
- return self._done.is_set() and self._task and self._task.done()
-
- async def stop(self) -> None:
- """Stop/cancel the stream."""
- if self._done.is_set():
- return
- if self._task and not self._task.done():
- self._task.cancel()
- self._done.set()
-
- async def subscribe(self) -> AsyncGenerator[bytes, None]:
- """Subscribe to the raw/unaltered audio stream."""
- # start the runner as soon as the (first) client connects
- if not self._task:
- self._task = asyncio.create_task(self._runner())
- queue = asyncio.Queue(1)
- try:
- self.subscribers.append(queue.put)
- while True:
- chunk = await queue.get()
- if not chunk:
- break
- yield chunk
- finally:
- self.subscribers.remove(queue.put)
-
- async def _runner(self) -> None:
- """Run the stream for the given audio source."""
- await asyncio.sleep(0.25) # small delay to allow subscribers to connect
- async for chunk in get_ffmpeg_stream(
- audio_input=self.audio_source,
- input_format=self.input_format,
- output_format=self.output_format,
- # TODO: enable readrate limiting + initial burst once we have a newer ffmpeg version
- # extra_input_args=["-readrate", "1.15"],
- ):
- await asyncio.gather(*[sub(chunk) for sub in self.subscribers], return_exceptions=True)
- # empty chunk when done
- await asyncio.gather(*[sub(b"") for sub in self.subscribers], return_exceptions=True)
- self._done.set()
+++ /dev/null
-{
- "type": "player",
- "domain": "ugp",
- "name": "Universal Group Player",
- "description": "Create Player Groups with your favorite players, regardless of type and model.",
- "codeowners": [
- "@music-assistant"
- ],
- "requirements": [],
- "documentation": "https://music-assistant.io/player-support/universal/",
- "multi_instance": false,
- "builtin": false,
- "icon": "speaker-multiple"
-}
command: str,
handler: Callable,
) -> None:
- """Dynamically register a command on the API."""
+ """
+ Dynamically register a command on the API.
+
+ Returns handle to unregister.
+ """
if command in self.command_handlers:
msg = f"Command {command} is already registered"
raise RuntimeError(msg)
self.command_handlers[command] = APICommandHandler.parse(command, handler)
+ def unregister() -> None:
+ self.command_handlers.pop(command)
+
+ return unregister
+
async def load_provider_config(
self,
prov_conf: ProviderConfig,