From 29e85057ff986a0ffa1a077308d253212433f08f Mon Sep 17 00:00:00 2001 From: Marcel van der Veldt Date: Fri, 11 Oct 2024 01:38:30 +0200 Subject: [PATCH] Complete refactor of player groups logic (#1704) --- music_assistant/client/players.py | 4 - music_assistant/common/models/enums.py | 4 +- music_assistant/common/models/player.py | 7 +- music_assistant/common/models/provider.py | 1 + music_assistant/constants.py | 4 - music_assistant/server/controllers/config.py | 18 +- .../server/controllers/player_queues.py | 9 +- music_assistant/server/controllers/players.py | 592 +++----------- .../server/models/player_provider.py | 110 +-- music_assistant/server/models/provider.py | 1 + .../server/providers/airplay/__init__.py | 43 +- .../server/providers/bluesound/__init__.py | 8 +- .../server/providers/chromecast/__init__.py | 21 +- .../server/providers/chromecast/helpers.py | 3 - .../server/providers/hass_players/__init__.py | 11 - .../server/providers/player_group/__init__.py | 727 ++++++++++++++++++ .../providers/player_group/manifest.json | 13 + .../providers/player_group/ugp_stream.py | 91 +++ .../server/providers/slimproto/__init__.py | 71 +- .../server/providers/snapcast/__init__.py | 35 +- .../server/providers/sonos/__init__.py | 47 +- .../server/providers/sonos_s1/player.py | 5 - .../server/providers/ugp/__init__.py | 600 --------------- .../server/providers/ugp/manifest.json | 14 - music_assistant/server/server.py | 11 +- 25 files changed, 1096 insertions(+), 1354 deletions(-) create mode 100644 music_assistant/server/providers/player_group/__init__.py create mode 100644 music_assistant/server/providers/player_group/manifest.json create mode 100644 music_assistant/server/providers/player_group/ugp_stream.py delete mode 100644 music_assistant/server/providers/ugp/__init__.py delete mode 100644 music_assistant/server/providers/ugp/manifest.json diff --git a/music_assistant/client/players.py b/music_assistant/client/players.py index 066343c7..7c57c197 100644 --- a/music_assistant/client/players.py +++ b/music_assistant/client/players.py @@ -174,10 +174,6 @@ class Players: "players/cmd/group_volume", player_id=player_id, volume_level=volume_level ) - async def set_player_group_power(self, player_id: str, power: bool) -> None: - """Handle power command for a (Sync)Group.""" - await self.client.send_command("players/cmd/group_volume", player_id=player_id, power=power) - async def set_player_group_members(self, player_id: str, members: list[str]) -> None: """ Update the memberlist of the given PlayerGroup. diff --git a/music_assistant/common/models/enums.py b/music_assistant/common/models/enums.py index 78cc15ae..4c2e301d 100644 --- a/music_assistant/common/models/enums.py +++ b/music_assistant/common/models/enums.py @@ -258,14 +258,12 @@ class PlayerType(StrEnum): player: A regular player. stereo_pair: Same as player but a dedicated stereo pair of 2 speakers. - group: A (dedicated) group player or (universal) playergroup. - sync_group: A group/preset of players that can be synced together. + group: A (dedicated) (sync)group player or (universal) playergroup. """ PLAYER = "player" STEREO_PAIR = "stereo_pair" GROUP = "group" - SYNC_GROUP = "sync_group" UNKNOWN = "unknown" @classmethod diff --git a/music_assistant/common/models/player.py b/music_assistant/common/models/player.py index e3a15634..35fb66ad 100644 --- a/music_assistant/common/models/player.py +++ b/music_assistant/common/models/player.py @@ -68,7 +68,8 @@ class Player(DataClassDictMixin): active_source: str | None = None # active_source: return player_id of the active group for this player (if any) - # if the player is grouped and a group is active, this will be set to the group's player_id + # if the player is grouped and a group is active, + # this should be set to the group's player_id by the group player implementation. active_group: str | None = None # current_media: return current active/loaded item on the player @@ -76,10 +77,6 @@ class Player(DataClassDictMixin): # includes metadata if supported by the provider/player current_media: PlayerMedia | None = None - # can_sync_with: return tuple of player_ids that can be synced to/with this player - # usually this is just a list of all player_ids within the playerprovider - can_sync_with: tuple[str, ...] = field(default=()) - # synced_to: player_id of the player this player is currently synced to # also referred to as "sync master" synced_to: str | None = None diff --git a/music_assistant/common/models/provider.py b/music_assistant/common/models/provider.py index d9a9e895..7a7d0219 100644 --- a/music_assistant/common/models/provider.py +++ b/music_assistant/common/models/provider.py @@ -64,6 +64,7 @@ class ProviderInstance(DataClassORJSONMixin): domain: str name: str instance_id: str + lookup_key: str supported_features: list[ProviderFeature] available: bool icon: str | None = None diff --git a/music_assistant/constants.py b/music_assistant/constants.py index 017371ca..b78e4c84 100644 --- a/music_assistant/constants.py +++ b/music_assistant/constants.py @@ -65,10 +65,7 @@ CONF_ICON: Final[str] = "icon" CONF_LANGUAGE: Final[str] = "language" CONF_SAMPLE_RATES: Final[str] = "sample_rates" CONF_HTTP_PROFILE: Final[str] = "http_profile" -CONF_SYNC_LEADER: Final[str] = "sync_leader" CONF_BYPASS_NORMALIZATION_RADIO: Final[str] = "bypass_normalization_radio" -CONF_PREVENT_SYNC_LEADER_OFF: Final[str] = "prevent_sync_leader_off" -CONF_SYNCGROUP_DEFAULT_ON: Final[str] = "syncgroup_default_on" CONF_ENABLE_ICY_METADATA: Final[str] = "enable_icy_metadata" CONF_VOLUME_NORMALIZATION_RADIO: Final[str] = "volume_normalization_radio" CONF_VOLUME_NORMALIZATION_TRACKS: Final[str] = "volume_normalization_tracks" @@ -111,6 +108,5 @@ CONFIGURABLE_CORE_CONTROLLERS = ( "music", "player_queues", ) -SYNCGROUP_PREFIX: Final[str] = "syncgroup_" VERBOSE_LOG_LEVEL: Final[int] = 5 PROVIDERS_WITH_SHAREABLE_URLS = ("spotify", "qobuz") diff --git a/music_assistant/server/controllers/config.py b/music_assistant/server/controllers/config.py index 0aa98c59..07ed579d 100644 --- a/music_assistant/server/controllers/config.py +++ b/music_assistant/server/controllers/config.py @@ -26,7 +26,7 @@ from music_assistant.common.models.config_entries import ( ProviderConfig, ) from music_assistant.common.models.enums import EventType, ProviderType -from music_assistant.common.models.errors import InvalidDataError, ProviderUnavailableError +from music_assistant.common.models.errors import InvalidDataError from music_assistant.constants import ( CONF_CORE, CONF_PLAYERS, @@ -47,6 +47,7 @@ if TYPE_CHECKING: LOGGER = logging.getLogger(__name__) DEFAULT_SAVE_DELAY = 5 +BASE_KEYS = ("enabled", "name", "available", "default_name", "provider", "type") isfile = wrap(os.path.isfile) remove = wrap(os.remove) @@ -590,11 +591,13 @@ class ConfigController: raise KeyError(msg) if encrypted: value = self.encrypt_string(value) - # also update the cached value in the provider itself - if not (prov := self.mass.get_provider(provider_instance, return_unavailable=True)): - raise ProviderUnavailableError(provider_instance) - prov.config.values[key].value = value + if key in BASE_KEYS: + self.set(f"{CONF_PROVIDERS}/{provider_instance}/{key}", value) + return self.set(f"{CONF_PROVIDERS}/{provider_instance}/values/{key}", value) + # also update the cached value in the provider itself + if prov := self.mass.get_provider(provider_instance, return_unavailable=True): + prov.config.values[key].value = value def set_raw_core_config_value(self, core_module: str, key: str, value: ConfigValueType) -> None: """ @@ -617,7 +620,10 @@ class ConfigController: # only allow setting raw values if main entry exists msg = f"Invalid player_id: {player_id}" raise KeyError(msg) - self.set(f"{CONF_PLAYERS}/{player_id}/values/{key}", value) + if key in BASE_KEYS: + self.set(f"{CONF_PLAYERS}/{player_id}/{key}", value) + else: + self.set(f"{CONF_PLAYERS}/{player_id}/values/{key}", value) def save(self, immediate: bool = False) -> None: """Schedule save of data to disk.""" diff --git a/music_assistant/server/controllers/player_queues.py b/music_assistant/server/controllers/player_queues.py index 6267eb35..017cd6b9 100644 --- a/music_assistant/server/controllers/player_queues.py +++ b/music_assistant/server/controllers/player_queues.py @@ -290,7 +290,8 @@ class PlayerQueuesController(CoreController): # we need to restart playback self.mass.create_task(self.resume(queue_id)) else: - self.mass.call_later(5, self._enqueue_next(queue, queue.current_index)) + task_id = f"enqueue_next_{queue_id}" + self.mass.call_later(2, self._enqueue_next, queue, queue.current_index, task_id=task_id) @api_command("player_queues/play_media") async def play_media( @@ -599,7 +600,7 @@ class PlayerQueuesController(CoreController): queue.stream_finished = None queue.end_of_track_reached = None # forward the actual command to the player controller - await self.mass.players.cmd_stop(queue_id, skip_forward=True) + await self.mass.players.cmd_stop(queue_id, skip_redirect=True) @api_command("player_queues/play") async def play(self, queue_id: str) -> None: @@ -618,7 +619,7 @@ class PlayerQueuesController(CoreController): and queue.state == PlayerState.PAUSED ): # forward the actual command to the player controller - await self.mass.players.cmd_play(queue_id, skip_forward=True) + await self.mass.players.cmd_play(queue_id, skip_redirect=True) else: await self.resume(queue_id) @@ -1156,7 +1157,7 @@ class PlayerQueuesController(CoreController): # it has started buffering the given queue item if not queue.flow_mode: task_id = f"enqueue_next_{queue_id}" - self.mass.call_later(2, self._enqueue_next, queue, item_id, task_id=task_id) + self.mass.call_later(5, self._enqueue_next, queue, item_id, task_id=task_id) # Main queue manipulation methods diff --git a/music_assistant/server/controllers/players.py b/music_assistant/server/controllers/players.py index 247296ab..7d21b820 100644 --- a/music_assistant/server/controllers/players.py +++ b/music_assistant/server/controllers/players.py @@ -5,12 +5,9 @@ from __future__ import annotations import asyncio import functools import time -from collections.abc import Iterable from contextlib import suppress from typing import TYPE_CHECKING, Any, Concatenate, ParamSpec, TypeVar, cast -import shortuuid - from music_assistant.common.helpers.util import get_changed_values from music_assistant.common.models.config_entries import ( CONF_ENTRY_ANNOUNCE_VOLUME, @@ -27,28 +24,21 @@ from music_assistant.common.models.enums import ( PlayerFeature, PlayerState, PlayerType, - ProviderFeature, ProviderType, ) from music_assistant.common.models.errors import ( AlreadyRegisteredError, PlayerCommandFailed, PlayerUnavailableError, - ProviderUnavailableError, UnsupportedFeaturedException, ) from music_assistant.common.models.media_items import UniqueList -from music_assistant.common.models.player import DeviceInfo, Player, PlayerMedia +from music_assistant.common.models.player import Player, PlayerMedia from music_assistant.constants import ( CONF_AUTO_PLAY, - CONF_GROUP_MEMBERS, CONF_HIDE_PLAYER, CONF_PLAYERS, - CONF_PREVENT_SYNC_LEADER_OFF, - CONF_SYNC_LEADER, - CONF_SYNCGROUP_DEFAULT_ON, CONF_TTS_PRE_ANNOUNCE, - SYNCGROUP_PREFIX, ) from music_assistant.server.helpers.api import api_command from music_assistant.server.helpers.tags import parse_tags @@ -174,63 +164,50 @@ class PlayerController(CoreController): @api_command("players/cmd/stop") @handle_player_command - async def cmd_stop(self, player_id: str, skip_forward: bool = False) -> None: + async def cmd_stop(self, player_id: str, skip_redirect: bool = False) -> None: """Send STOP command to given player. - player_id: player_id of the player to handle the command. """ - player_id = self._check_redirect(player_id) - player = self.get(player_id, True) + player = self._get_player_with_redirect(player_id, skip_redirect=skip_redirect) # Redirect to queue controller if active (as it also handles some other logic) - # Note that skip_forward will be set by the queue controller + # Note that skip_redirect will be set by the queue controller # to prevent an endless loop. - if not skip_forward and player.active_source == player_id: + if not skip_redirect and player.active_source == player_id: await self.mass.player_queues.stop(player_id) return - # handle syncgroup: redirect to syncgroup-leader if needed - if player_id.startswith(SYNCGROUP_PREFIX): - if sync_leader := self.get_sync_leader(player): - await self.cmd_stop(sync_leader.player_id) - return if player_provider := self.get_player_provider(player_id): await player_provider.cmd_stop(player_id) @api_command("players/cmd/play") @handle_player_command - async def cmd_play(self, player_id: str, skip_forward: bool = False) -> None: + async def cmd_play(self, player_id: str, skip_redirect: bool = False) -> None: """Send PLAY (unpause) command to given player. - player_id: player_id of the player to handle the command. """ - player_id = self._check_redirect(player_id) - player = self.get(player_id, True) + player = self._get_player_with_redirect(player_id, skip_redirect=skip_redirect) if player.announcement_in_progress: self.logger.warning("Ignore queue command: An announcement is in progress") return # Redirect to queue controller if active (as it also handles some other logic) - # Note that skip_forward will be set by the queue controller + # Note that skip_redirect will be set by the queue controller # to prevent an endless loop. - if not skip_forward and player.active_source == player_id: + if not skip_redirect and player.active_source == player_id: await self.mass.player_queues.play(player_id) return - # handle syncgroup: redirect to syncgroup-leader if needed - if player_id.startswith(SYNCGROUP_PREFIX): - if sync_leader := self.get_sync_leader(player): - await self.cmd_play(sync_leader.player_id) - return player_provider = self.get_player_provider(player_id) async with self._player_throttlers[player_id]: await player_provider.cmd_play(player_id) @api_command("players/cmd/pause") @handle_player_command - async def cmd_pause(self, player_id: str) -> None: + async def cmd_pause(self, player_id: str, skip_redirect: bool = False) -> None: """Send PAUSE command to given player. - player_id: player_id of the player to handle the command. """ - player_id = self._check_redirect(player_id) - player = self.get(player_id, True) + player = self._get_player_with_redirect(player_id, skip_redirect=skip_redirect) if player.announcement_in_progress: self.logger.warning("Ignore command: An announcement is in progress") return @@ -238,11 +215,6 @@ class PlayerController(CoreController): # if player does not support pause, we need to send stop await self.cmd_stop(player_id) return - # handle syncgroup: redirect to syncgroup-leader if needed - if player_id.startswith(SYNCGROUP_PREFIX): - if sync_leader := self.get_sync_leader(player): - await self.cmd_pause(sync_leader.player_id) - return player_provider = self.get_player_provider(player_id) await player_provider.cmd_pause(player_id) @@ -273,7 +245,7 @@ class PlayerController(CoreController): - player_id: player_id of the player to handle the command. """ - player = self.get(player_id, True) + player = self._get_player_with_redirect(player_id, skip_redirect=False) if player.state == PlayerState.PLAYING: await self.cmd_pause(player_id) else: @@ -281,59 +253,45 @@ class PlayerController(CoreController): @api_command("players/cmd/power") @handle_player_command - async def cmd_power(self, player_id: str, powered: bool) -> None: + async def cmd_power(self, player_id: str, powered: bool, skip_redirect: bool = False) -> None: """Send POWER command to given player. - player_id: player_id of the player to handle the command. - powered: bool if player should be powered on or off. """ - # forward to syncgroup if needed - if player_id.startswith(SYNCGROUP_PREFIX): - await self.cmd_group_power(player_id, powered) - return - player = self.get(player_id, True) if player.powered == powered: return # nothing to do - # grab info about any groups this player is active in - # to handle actions on the group when a (sync)group child turns on/off - if active_group_player_id := self._get_active_player_group(player): - active_group_player = self.get(active_group_player_id) - group_player_state = active_group_player.state - if not powered and active_group_player.type == PlayerType.SYNC_GROUP: - # handle 'prevent sync leader off' feature - powered_members = list(self.iter_group_members(active_group_player, True)) - sync_leader = self.get_sync_leader(active_group_player) - if ( - len(powered_members) > 1 - and (sync_leader == player) - and self.mass.config.get_raw_player_config_value( - active_group_player_id, CONF_PREVENT_SYNC_LEADER_OFF, False - ) - ): - raise PlayerCommandFailed( - f"{player.display_name} is the sync " - "leader of a syncgroup and cannot be turned off" + # redirect to active group player if player is group child + if not skip_redirect and player.active_group: + if group_player_provider := self.get_player_provider(player.active_group): + async with self._player_throttlers[player.active_group]: + await group_player_provider.on_group_child_power( + player.active_group, player_id, powered ) - else: - active_group_player = None + return # always stop player at power off if ( not powered - and player.powered - and player.state in (PlayerState.PLAYING, PlayerState.PAUSED) and not player.synced_to + and player.state in (PlayerState.PLAYING, PlayerState.PAUSED) ): await self.cmd_stop(player_id) # unsync player at power off - if not powered: - if player.synced_to or player.group_childs: - await self.cmd_unsync(player_id) + if not powered and ( + player.synced_to or (player.type == PlayerType.PLAYER and player.group_childs) + ): + await self.cmd_unsync(player_id) + # elif not powered and player.type == PlayerType.PLAYER and player.group_childs: + # async with TaskManager(self.mass) as tg: + # for member in self.iter_group_members(player, True): + # tg.create_task(self.cmd_power(member.player_id, False)) + # handle actual power command if PlayerFeature.POWER in player.supported_features: # player supports power command: forward to player provider player_provider = self.get_player_provider(player_id) @@ -346,46 +304,20 @@ class PlayerController(CoreController): # always optimistically set the power state to update the UI # as fast as possible and prevent race conditions player.powered = powered - # reset active source - player.active_source = None + # reset active source on power off + if not powered: + player.active_source = None self.update(player_id) - # handle 'auto play on power on' feature + # handle 'auto play on power on' feature if ( - not active_group_player + not player.active_group and powered and self.mass.config.get_raw_player_config_value(player_id, CONF_AUTO_PLAY, False) and player.active_source in (None, player_id) ): await self.mass.player_queues.resume(player_id) - # handle group player actions - if not (active_group_player and active_group_player.powered): - return - - # run actions suitable for every type of group player - powered_childs = list(self.mass.players.iter_group_members(active_group_player, True)) - if not powered and player in powered_childs: - powered_childs.remove(player.player_id) - elif powered and player.player_id not in powered_childs: - powered_childs.append(player.player_id) - # if the last player of a group turned off, turn off the group - if len(powered_childs) == 0: - self.logger.debug( - "Group %s has no more powered members, turning off group player", - active_group_player.display_name, - ) - self.mass.create_task(self.mass.players.cmd_power(active_group_player.player_id, False)) - return - # forward to either syncgroup logic or group player logic - if active_group_player.type == PlayerType.SYNC_GROUP: - self._on_syncgroup_child_power(active_group_player, player, powered, group_player_state) - elif active_group_player.type == PlayerType.GROUP: - player_prov = self.mass.get_provider(active_group_player.provider) - player_prov.on_group_child_power( - active_group_player, player, powered, group_player_state - ) - @api_command("players/cmd/volume_set") @handle_player_command async def cmd_volume_set(self, player_id: str, volume_level: int) -> None: @@ -396,7 +328,7 @@ class PlayerController(CoreController): """ # TODO: Implement PlayerControl player = self.get(player_id, True) - if player.type in (PlayerType.GROUP, PlayerType.SYNC_GROUP): + if player.type == PlayerType.GROUP: # redirect to group volume control await self.cmd_group_volume(player_id, volume_level) return @@ -443,7 +375,9 @@ class PlayerController(CoreController): new_volume = volume_level volume_dif = new_volume - cur_volume coros = [] - for child_player in self.iter_group_members(group_player, True): + for child_player in self.iter_group_members( + group_player, only_powered=True, exclude_self=False + ): if PlayerFeature.VOLUME_SET not in child_player.supported_features: continue cur_child_volume = child_player.volume_level @@ -453,77 +387,6 @@ class PlayerController(CoreController): coros.append(self.cmd_volume_set(child_player.player_id, new_child_volume)) await asyncio.gather(*coros) - @api_command("players/cmd/group_power") - async def cmd_group_power(self, player_id: str, power: bool) -> None: - """Handle power command for a (Sync/Player)Group.""" - group_player = self.get(player_id, True) - - if group_player.powered == power: - return # nothing to do - - if group_player.type == PlayerType.GROUP: - # this is a native group player, redirect - await self.cmd_power(player_id, power) - return - - if not (group_player.type == PlayerType.SYNC_GROUP or group_player.group_childs): - # this is not a (temporary) sync group - nothing to do - raise UnsupportedFeaturedException("Player is not a sync group") - - # make sure to update the group power state - group_player.powered = power - - # always stop (group/master)player at power off - if not power and group_player.state in (PlayerState.PLAYING, PlayerState.PAUSED): - await self.cmd_stop(player_id) - - default_on_pref = self.mass.config.get_raw_player_config_value( - group_player.player_id, CONF_SYNCGROUP_DEFAULT_ON, "powered_only" - ) - - # handle syncgroup - this will also work for temporary syncgroups - # where players are manually synced against a group leader - any_member_powered = False - async with TaskManager(self.mass) as tg: - for member in self.iter_group_members( - group_player, only_powered=(default_on_pref != "always_all") - ): - any_member_powered = True - if power: - if member.state in (PlayerState.PLAYING, PlayerState.PAUSED): - # stop playing existing content on member if we start the group player - tg.create_task(self.cmd_stop(member.player_id)) - # set active source to group player if the group (is going to be) powered - member.active_group = group_player.active_group - member.active_source = group_player.active_source - self.update(member.player_id, skip_forward=True) - else: - # turn off child player when group turns off - tg.create_task(self.cmd_power(member.player_id, False)) - # reset active source on player - member.active_source = None - member.active_group = None - self.update(member.player_id, skip_forward=True) - # handle default power ON - if power: - sync_leader = self.get_sync_leader(group_player) - for member in self.iter_group_members(group_player, only_powered=False): - if default_on_pref == "always_all" or ( - sync_leader - and default_on_pref == "always_leader" - and member.player_id == sync_leader.player_id - ): - tg.create_task(self.cmd_power(member.player_id, True)) - member.active_group = group_player.player_id - member.active_source = group_player.active_source - any_member_powered = True - if not any_member_powered: - return - - if power and group_player.player_id.startswith(SYNCGROUP_PREFIX): - await self.sync_syncgroup(group_player.player_id) - self.update(player_id) - @api_command("players/cmd/volume_mute") @handle_player_command async def cmd_volume_mute(self, player_id: str, muted: bool) -> None: @@ -557,9 +420,7 @@ class PlayerController(CoreController): - player_id: player_id of the player to handle the command. - position: position in seconds to seek to in the current playing item. """ - player_id = self._check_redirect(player_id) - - player = self.get(player_id, True) + player = self._get_player_with_redirect(player_id) if PlayerFeature.SEEK not in player.supported_features: msg = f"Player {player.display_name} does not support seeking" raise UnsupportedFeaturedException(msg) @@ -603,7 +464,7 @@ class PlayerController(CoreController): player.active_group, url, use_pre_announce, volume_level ) return - if player.type in (PlayerType.SYNC_GROUP, PlayerType.GROUP) and not player.powered: + if player.type == PlayerType.GROUP and not player.powered: # announcement request sent to inactive group, check if any child's are playing if len(list(self.iter_group_members(player, True, True))) > 0: # just for the sake of simplicity we handle this request per-player @@ -654,22 +515,16 @@ class PlayerController(CoreController): finally: player.announcement_in_progress = False - async def play_media(self, player_id: str, media: PlayerMedia) -> None: + async def play_media( + self, player_id: str, media: PlayerMedia, skip_redirect: bool = False + ) -> None: """Handle PLAY MEDIA on given player. - player_id: player_id of the player to handle the command. - media: The Media that needs to be played on the player. """ - # handle syncgroup: redirect to syncgroup-leader if needed - if player_id.startswith(SYNCGROUP_PREFIX): - await self.cmd_group_power(player_id, True) - group_player = self.get(player_id, True) - if sync_leader := self.get_sync_leader(group_player): - await self.play_media(sync_leader.player_id, media=media) - group_player.state = PlayerState.PLAYING - return + player = self._get_player_with_redirect(player_id, skip_redirect=skip_redirect) # power on the player if needed - player = self.get(player_id, True) if not player.powered: await self.cmd_power(player_id, True) player_prov = self.mass.players.get_player_provider(player_id) @@ -680,15 +535,6 @@ class PlayerController(CoreController): async def enqueue_next_media(self, player_id: str, media: PlayerMedia) -> None: """Handle enqueuing of a next media item on the player.""" - if player_id.startswith(SYNCGROUP_PREFIX): - # redirect to syncgroup-leader if needed - group_player = self.get(player_id, True) - if sync_leader := self.get_sync_leader(group_player): - await self.enqueue_next_media( - sync_leader.player_id, - media=media, - ) - return player_prov = self.mass.players.get_player_provider(player_id) async with self._player_throttlers[player_id]: await player_prov.enqueue_next_media(player_id=player_id, media=media) @@ -719,11 +565,20 @@ class PlayerController(CoreController): - player_id: player_id of the player to handle the command. """ - if (player := self.get(player_id)) and player.group_childs: - # this player is a syncgroup leader, unsync all children - await self.cmd_unsync_many(player.group_childs) + if not (player := self.get(player_id)): + self.logger.warning("Player %s is not available", player_id) + return + if PlayerFeature.SYNC not in player.supported_features: + self.logger.warning("Player %s does not support (un)sync commands", player.name) return - await self.cmd_unsync_many([player_id]) + if not (player.synced_to or player.group_childs): + return # nothing to do + + # reset active source player if it is unsynced + player.active_source = None + # forward command to the player provider + if player_provider := self.get_player_provider(player_id): + await player_provider.cmd_unsync(player_id) @api_command("players/cmd/sync_many") async def cmd_sync_many(self, target_player: str, child_player_ids: list[str]) -> None: @@ -753,14 +608,9 @@ class PlayerController(CoreController): "Player %s is already synced, unsyncing first", child_player.name ) await self.cmd_unsync(child_player.player_id) - - if child_player_id not in parent_player.can_sync_with: - self.logger.warning( - "Player %s can not be synced with %s", - child_player.display_name, - parent_player.display_name, - ) - continue + # power on the player if needed + if not child_player.powered: + await self.cmd_power(child_player.player_id, True) # if we reach here, all checks passed final_player_ids.append(child_player_id) # set active source if player is synced @@ -774,23 +624,8 @@ class PlayerController(CoreController): @api_command("players/cmd/unsync_many") async def cmd_unsync_many(self, player_ids: list[str]) -> None: """Handle UNSYNC command for all the given players.""" - # filter all player ids on compatibility and availability for player_id in list(player_ids): - if not (child_player := self.get(player_id)): - self.logger.warning("Player %s is not available", player_id) - continue - if PlayerFeature.SYNC not in child_player.supported_features: - self.logger.warning( - "Player %s does not support (un)sync commands", child_player.name - ) - continue - if not child_player.synced_to: - continue - # reset active source player if it is unsynced - child_player.active_source = None - # forward command to the player provider - if player_provider := self.get_player_provider(player_id): - await player_provider.cmd_unsync(player_id) + await self.cmd_unsync(player_id) def set(self, player: Player) -> None: """Set/Update player details on the controller.""" @@ -836,9 +671,6 @@ class PlayerController(CoreController): if not player.enabled: return - # initialize sync groups as soon as a player is registered - self.mass.loop.create_task(self._register_syncgroups()) - self.logger.info( "Player registered: %s/%s", player_id, @@ -873,7 +705,7 @@ class PlayerController(CoreController): self.mass.signal_event(EventType.PLAYER_REMOVED, player_id) def update( - self, player_id: str, skip_forward: bool = False, force_update: bool = False + self, player_id: str, skip_redirect: bool = False, force_update: bool = False ) -> None: """Update player state.""" if self.mass.closing: @@ -881,17 +713,14 @@ class PlayerController(CoreController): if player_id not in self._players: return player = self._players[player_id] - # calculate active group and active source - player.active_group = self._get_active_player_group(player) - if player.active_source is None: - player.active_source = self._get_active_source(player) + player.active_source = self._get_active_source(player) player.volume_level = player.volume_level or 0 # guard for None volume # correct group_members if needed if player.group_childs == {player.player_id}: player.group_childs = set() # calculate group volume player.group_volume = self._get_group_volume_level(player) - if player.type in (PlayerType.GROUP, PlayerType.SYNC_GROUP): + if player.type == PlayerType.GROUP: player.volume_level = player.group_volume # prefer any overridden name from config player.display_name = ( @@ -906,21 +735,9 @@ class PlayerController(CoreController): player.player_id, CONF_ENTRY_PLAYER_ICON.key, CONF_ENTRY_PLAYER_ICON_GROUP.default_value - if player.type in (PlayerType.GROUP, PlayerType.SYNC_GROUP) + if player.type == PlayerType.GROUP else CONF_ENTRY_PLAYER_ICON.default_value, ) - # handle syncgroup - get attributes from sync leader - if player.player_id.startswith(SYNCGROUP_PREFIX): - sync_leader = self.get_sync_leader(player) - if sync_leader and sync_leader.active_source == player.active_source: - player.state = sync_leader.state - player.active_source = sync_leader.active_source - player.current_media = sync_leader.current_media - player.elapsed_time = sync_leader.elapsed_time - player.elapsed_time_last_updated = sync_leader.elapsed_time_last_updated - else: - player.state = PlayerState.IDLE - player.active_source = player.player_id # basic throttle: do not send state changed events if player did not actually change prev_state = self._prev_states.get(player_id, {}) @@ -928,7 +745,12 @@ class PlayerController(CoreController): changed_values = get_changed_values( prev_state, new_state, - ignore_keys=["elapsed_time", "elapsed_time_last_updated", "seq_no", "last_poll"], + ignore_keys=[ + "elapsed_time", + "elapsed_time_last_updated", + "seq_no", + "last_poll", + ], ) self._prev_states[player_id] = new_state @@ -944,22 +766,16 @@ class PlayerController(CoreController): self.mass.signal_event(EventType.PLAYER_UPDATED, object_id=player_id, data=player) - if skip_forward: + if skip_redirect: return + # update/signal group player(s) child's when group updates - if player.type in (PlayerType.GROUP, PlayerType.SYNC_GROUP): - for child_player in self.iter_group_members(player): - if child_player.player_id == player.player_id: - continue - self.update(child_player.player_id, skip_forward=True) + if player.type == PlayerType.GROUP: + for child_player in self.iter_group_members(player, exclude_self=True): + self.update(child_player.player_id, skip_redirect=True) # update/signal group player(s) when child updates for group_player in self._get_player_groups(player, powered_only=False): - player_prov = self.get_player_provider(group_player.player_id) - if not player_prov: - continue - if group_player.player_id.startswith(SYNCGROUP_PREFIX): - self.update(group_player.player_id, skip_forward=True) - else: + if player_prov := self.mass.get_provider(group_player.provider): self.mass.create_task(player_prov.poll_player(group_player.player_id)) def get_player_provider(self, player_id: str) -> PlayerProvider: @@ -1006,20 +822,41 @@ class PlayerController(CoreController): # ensure the result is an integer return None if volume_level is None else int(volume_level) - def _check_redirect(self, player_id: str) -> str: - """Check if playback related command should be redirected.""" + def _get_player_with_redirect(self, player_id: str, skip_redirect: bool = False) -> Player: + """Get player with check if playback related command should be redirected.""" player = self.get(player_id, True) - if player.synced_to: - sync_leader = self.get(player.synced_to, True) - self.logger.warning( + if skip_redirect: + return player + if player.synced_to and (sync_leader := self.get(player.synced_to)): + self.logger.info( "Player %s is synced to %s and can not accept " "playback related commands itself, " "redirected the command to the sync leader.", player.name, sync_leader.name, ) - return player.synced_to - return player_id + return sync_leader + if player.active_group and (active_group := self.get(player.active_group)): + self.logger.info( + "Player %s is part of a playergroup and can not accept " + "playback related commands itself, " + "redirected the command to the group leader.", + player.name, + ) + return active_group + if ( + player.active_source + and player.active_source != player.player_id + and (active_source := self.get(player.active_source)) + ): + self.logger.info( + "Player %s has a different source active (%s), " + "redirected the command to the source player.", + player.name, + active_source.display_name, + ) + return active_source + return player def _get_player_groups( self, player: Player, available_only: bool = True, powered_only: bool = False @@ -1028,39 +865,24 @@ class PlayerController(CoreController): for _player in self: if _player.player_id == player.player_id: continue - if _player.type not in (PlayerType.GROUP, PlayerType.SYNC_GROUP): + if _player.type != PlayerType.GROUP: continue if available_only and not _player.available: continue if powered_only and not _player.powered: continue - if ( - player.player_id in _player.group_childs - or player.active_source == _player.player_id - ): + if player.player_id in _player.group_childs: yield _player - def _get_active_player_group(self, player: Player) -> str | None: - """Return the currently active groupplayer for the given player (if any).""" - # prefer active source group - for group_player in self._get_player_groups(player, available_only=True, powered_only=True): - if player.active_source in (group_player.player_id, group_player.active_source): - return group_player.player_id - # fallback to just the first powered group - for group_player in self._get_player_groups(player, available_only=True, powered_only=True): - return group_player.player_id - return None - def _get_active_source(self, player: Player) -> str: """Return the active_source id for given player.""" # if player is synced, return group leader's active source if player.synced_to and (parent_player := self.get(player.synced_to)): return parent_player.active_source - # fallback to the first active group player - if player.active_group: - group_player = self.get(player.active_group) + # if player has group active, return those details + if player.active_group and (group_player := self.get(player.active_group)): return self._get_active_source(group_player) - # defaults to the player's own player id if not active source set + # defaults to the player's own player id if no active source set return player.active_source or player.player_id def _get_group_volume_level(self, player: Player) -> int: @@ -1071,7 +893,7 @@ class PlayerController(CoreController): # calculate group volume from all (turned on) players group_volume = 0 active_players = 0 - for child_player in self.iter_group_members(player, True): + for child_player in self.iter_group_members(player, only_powered=True, exclude_self=False): if PlayerFeature.VOLUME_SET not in child_player.supported_features: continue group_volume += child_player.volume_level or 0 @@ -1085,14 +907,20 @@ class PlayerController(CoreController): group_player: Player, only_powered: bool = False, only_playing: bool = False, + active_only: bool = False, + exclude_self: bool = True, ) -> Iterator[Player]: - """Get (child) players attached to a grouped player.""" + """Get (child) players attached to a group player or syncgroup.""" for child_id in list(group_player.group_childs): if child_player := self.get(child_id, False): if not child_player.available: continue if not (not only_powered or child_player.powered): continue + if not (not active_only or child_player.active_group == group_player.player_id): + continue + if exclude_self and child_player.player_id == group_player.player_id: + continue if not ( not only_playing or child_player.state in (PlayerState.PLAYING, PlayerState.PAUSED) @@ -1139,23 +967,17 @@ class PlayerController(CoreController): def on_player_config_changed(self, config: PlayerConfig, changed_keys: set[str]) -> None: """Call (by config manager) when the configuration of a player changes.""" - player = self.mass.players.get(config.player_id) + if not (player := self.mass.players.get(config.player_id)): + return if config.enabled: player_prov = self.mass.players.get_player_provider(config.player_id) self.mass.create_task(player_prov.poll_player(config.player_id)) player.enabled = config.enabled + # signal player provider that the config changed + with suppress(PlayerUnavailableError): + if provider := self.mass.get_provider(config.provider): + provider.on_player_config_changed(config, changed_keys) self.mass.players.update(config.player_id, force_update=True) - if config.player_id.startswith(SYNCGROUP_PREFIX): - # handle syncgroup - if f"values/{CONF_GROUP_MEMBERS}" in changed_keys: - player = self.mass.players.get(config.player_id) - player.group_childs = config.get_value(CONF_GROUP_MEMBERS) - self.mass.players.update(config.player_id) - else: - # signal player provider that the config changed - with suppress(PlayerUnavailableError): - if provider := self.mass.get_provider(config.provider): - provider.on_player_config_changed(config, changed_keys) # if the player was playing, restart playback if player and player.state == PlayerState.PLAYING: self.mass.create_task(self.mass.player_queues.resume(player.active_source)) @@ -1163,173 +985,13 @@ class PlayerController(CoreController): def on_player_config_removed(self, player_id: str) -> None: """Call (by config manager) when the configuration of a player is removed.""" if (player := self.mass.players.get(player_id)) and player.available: - player.enabled = False self.mass.players.update(player_id, force_update=True) if player and (provider := self.mass.get_provider(player.provider)): - assert isinstance(provider, PlayerProvider) + provider = cast(PlayerProvider, provider) provider.on_player_config_removed(player_id) - if not player: + if not self.mass.players.get(player_id): self.mass.signal_event(EventType.PLAYER_REMOVED, player_id) - # Syncgroup specific functions/helpers - - @api_command("players/create_syncgroup") - async def create_syncgroup(self, name: str, members: list[str]) -> Player: - """Create a new Sync Group with name and members. - - - name: Name for the new group to create. - - members: A list of player_id's that should be part of this group. - - Returns the newly created player on success. - """ - base_player = self.get(members[0], True) - # perform basic checks - if (player_prov := self.mass.get_provider(base_player.provider)) is None: - msg = f"Provider {base_player.provider} is not available!" - raise ProviderUnavailableError(msg) - if ProviderFeature.SYNC_PLAYERS not in player_prov.supported_features: - msg = f"Provider {player_prov.name} does not support creating groups" - raise UnsupportedFeaturedException(msg) - new_group_id = f"{SYNCGROUP_PREFIX}{shortuuid.random(8).lower()}" - # cleanup list, just in case the frontend sends some garbage - members = [ - x - for x in members - if (x in base_player.can_sync_with or x == base_player.player_id) - and not x.startswith(SYNCGROUP_PREFIX) - ] - # create default config with the user chosen name - self.mass.config.create_default_player_config( - new_group_id, - player_prov.instance_id, - name=name, - enabled=True, - values={CONF_GROUP_MEMBERS: members}, - ) - return self.register_syncgroup(group_player_id=new_group_id, name=name, members=members) - - def register_syncgroup(self, group_player_id: str, name: str, members: Iterable[str]) -> Player: - """Register a (virtual/fake) syncgroup player.""" - # extract player features from first/random player - for member in members: - if first_player := self.mass.players.get(member): - break - else: - # edge case: no child player is (yet) available; postpone register - return None - player_prov = self.mass.get_provider(first_player.provider) - if TYPE_CHECKING: - assert player_prov - player = Player( - player_id=group_player_id, - provider=player_prov.instance_id, - type=PlayerType.SYNC_GROUP, - name=name, - available=True, - powered=False, - device_info=DeviceInfo(model="SyncGroup", manufacturer=player_prov.name), - supported_features=first_player.supported_features, - group_childs=set(members), - active_source=group_player_id, - ) - self.mass.players.register_or_update(player) - return player - - def get_sync_leader(self, group_player: Player) -> Player | None: - """Get the active sync leader player for a syncgroup or synced player.""" - if group_player.synced_to: - # should not happen but just in case... - return group_player.synced_to - # current sync leader: return the (first/only) player that has group childs - for child_player in self.iter_group_members( - group_player, only_powered=False, only_playing=False - ): - if child_player.group_childs: - return child_player - pref_sync_leader = self.mass.config.get_raw_player_config_value( - group_player.player_id, CONF_SYNC_LEADER, "auto" - ) - if pref_sync_leader != "auto" and (player := self.get(pref_sync_leader)): - return player - # select new sync leader: return the first playing player - for child_player in self.iter_group_members( - group_player, only_powered=True, only_playing=True - ): - return child_player - # fallback select new sync leader: return the first powered player - for child_player in self.iter_group_members( - group_player, only_powered=True, only_playing=False - ): - return child_player - # fallback select new sync leader: simply return the first player - for child_player in self.iter_group_members( - group_player, only_powered=False, only_playing=False - ): - return child_player - return None - - async def sync_syncgroup(self, player_id: str) -> None: - """Sync all (possible) players of a syncgroup.""" - group_player = self.get(player_id, True) - if not (sync_leader := self.get_sync_leader(group_player)): - raise RuntimeError("No sync leader found for syncgroup") - for member in self.iter_group_members(group_player, only_powered=True): - if not member.can_sync_with: - continue - if sync_leader.player_id == member.player_id: - continue - await self.cmd_sync(member.player_id, sync_leader.player_id) - - def _on_syncgroup_child_power( - self, group_player: Player, child_player: Player, new_power: bool, group_state: PlayerState - ) -> None: - """ - Call when a power command was executed on one of the child players of a SyncGroup. - - This is used to handle special actions such as (re)syncing. - The group state is sent with the state BEFORE the power command was executed. - """ - group_playing = group_state == PlayerState.PLAYING - sync_leader = self.mass.players.get_sync_leader(group_player) - is_sync_leader = child_player.player_id == sync_leader.player_id - if group_playing and not new_power and is_sync_leader: - # the current sync leader player turned OFF while the group player - # should still be playing - we need to select a new sync leader and resume - self.logger.warning( - "Syncleader %s turned off while syncgroup is playing, " - "a forced resync for syngroup %s will be attempted...", - child_player.display_name, - group_player.display_name, - ) - - async def full_resync() -> None: - await self.mass.players.sync_syncgroup(group_player.player_id) - await self.mass.player_queues.resume(group_player.player_id) - - self.mass.call_later(2, full_resync, task_id=f"forced_resync_{group_player.player_id}") - return - elif new_power: - # if a child player turned ON while the group is already active, we need to resync - if sync_leader.player_id != child_player.player_id: - self.mass.create_task( - self.cmd_sync(child_player.player_id, sync_leader.player_id), - ) - - async def _register_syncgroups(self) -> None: - """Register all (virtual/fake) syncgroup players.""" - player_configs = await self.mass.config.get_player_configs() - for player_config in player_configs: - if not player_config.player_id.startswith(SYNCGROUP_PREFIX): - continue - members = self.mass.config.get_raw_player_config_value( - player_config.player_id, CONF_GROUP_MEMBERS - ) - self.register_syncgroup( - group_player_id=player_config.player_id, - name=player_config.name or player_config.default_name, - members=members, - ) - async def _play_announcement( self, player: Player, diff --git a/music_assistant/server/models/player_provider.py b/music_assistant/server/models/player_provider.py index 2b553d7a..e1afab7b 100644 --- a/music_assistant/server/models/player_provider.py +++ b/music_assistant/server/models/player_provider.py @@ -10,20 +10,10 @@ from music_assistant.common.models.config_entries import ( CONF_ENTRY_ANNOUNCE_VOLUME_MAX, CONF_ENTRY_ANNOUNCE_VOLUME_MIN, CONF_ENTRY_ANNOUNCE_VOLUME_STRATEGY, - CONF_ENTRY_PLAYER_ICON_GROUP, ConfigEntry, - ConfigValueOption, PlayerConfig, ) -from music_assistant.common.models.enums import ConfigEntryType, PlayerState from music_assistant.common.models.player import Player, PlayerMedia -from music_assistant.constants import ( - CONF_GROUP_MEMBERS, - CONF_PREVENT_SYNC_LEADER_OFF, - CONF_SYNC_LEADER, - CONF_SYNCGROUP_DEFAULT_ON, - SYNCGROUP_PREFIX, -) from .provider import Provider @@ -38,79 +28,6 @@ class PlayerProvider(Provider): async def get_player_config_entries(self, player_id: str) -> tuple[ConfigEntry, ...]: """Return all (provider/player specific) Config Entries for the given player (if any).""" - if player_id.startswith(SYNCGROUP_PREFIX): - # default entries for syncgroups - return ( - *BASE_PLAYER_CONFIG_ENTRIES, - CONF_ENTRY_PLAYER_ICON_GROUP, - ConfigEntry( - key=CONF_GROUP_MEMBERS, - type=ConfigEntryType.STRING, - label="Group members", - default_value=[], - options=tuple( - ConfigValueOption(x.display_name, x.player_id) - for x in self.mass.players.all(True, False) - if x.player_id != player_id - and x.provider == self.instance_id - and not x.player_id.startswith(SYNCGROUP_PREFIX) - ), - description="Select all players you want to be part of this group", - multi_value=True, - required=True, - ), - ConfigEntry( - key=CONF_SYNC_LEADER, - type=ConfigEntryType.STRING, - label="Preferred sync leader", - default_value="auto", - options=( - *tuple( - ConfigValueOption(x.display_name, x.player_id) - for x in self.mass.players.all(True, False) - if x.player_id - in self.mass.config.get_raw_player_config_value( - player_id, CONF_GROUP_MEMBERS, [] - ) - ), - ConfigValueOption("Select automatically", "auto"), - ), - description="By default Music Assistant will automatically assign a " - "(random) player as sync leader, meaning the other players in the sync group " - "will be synced to that player. If you want to force a specific player to be " - "the sync leader, select it here.", - required=True, - ), - ConfigEntry( - key=CONF_PREVENT_SYNC_LEADER_OFF, - type=ConfigEntryType.BOOLEAN, - label="Prevent sync leader power off", - default_value=False, - description="With this setting enabled, Music Assistant will disallow powering " - "off the sync leader player if other players are still " - "active in the sync group. This is useful if you want to prevent " - "a short drop in the music while the music is transferred to another player.", - required=True, - ), - ConfigEntry( - key=CONF_SYNCGROUP_DEFAULT_ON, - type=ConfigEntryType.STRING, - label="Default power ON behavior", - default_value="powered_only", - options=( - ConfigValueOption("Always power ON all child devices", "always_all"), - ConfigValueOption("Always power ON sync leader", "always_leader"), - ConfigValueOption("Start with powered players", "powered_only"), - ConfigValueOption("Ignore", "ignore"), - ), - description="What should happen if you power ON a sync group " - "(or you start playback to it), while no (or not all) players " - "are powered ON ?\n\nShould Music Assistant power ON all players, or only the " - "sync leader, or should it ignore the command if no players are powered ON ?", - required=False, - ), - ) - return ( *BASE_PLAYER_CONFIG_ENTRIES, # add default entries for announce feature @@ -133,12 +50,13 @@ class PlayerProvider(Provider): - player_id: player_id of the player to handle the command. """ - @abstractmethod async def cmd_play(self, player_id: str) -> None: """Send PLAY (unpause) command to given player. - player_id: player_id of the player to handle the command. """ + # will only be called for players with Pause feature set. + raise NotImplementedError async def cmd_pause(self, player_id: str) -> None: """Send PAUSE command to given player. @@ -148,6 +66,7 @@ class PlayerProvider(Provider): # will only be called for players with Pause feature set. raise NotImplementedError + @abstractmethod async def play_media( self, player_id: str, @@ -226,7 +145,7 @@ class PlayerProvider(Provider): Join/add the given player(id) to the given (master) player/sync group. - player_id: player_id of the player to handle the command. - - target_player: player_id of the syncgroup master or group player. + - target_player: player_id of the sync leader. """ # will only be called for players with SYNC feature set. raise NotImplementedError @@ -247,6 +166,17 @@ class PlayerProvider(Provider): # default implementation, simply call the cmd_sync for all child players await self.cmd_sync(child_id, target_player) + async def on_group_child_power( + self, group_player_id: str, child_player_id: str, powered: bool + ) -> None: + """Call when a child player of a group player is powered on/off.""" + # default implementation, simply redirect the request to the group player + self.logger.warning( + "Detected a player power command to a player that is part of a group. " + "Redirecting to group player..." + ) + await self.mass.players.cmd_power(group_player_id, powered) + async def poll_player(self, player_id: str) -> None: """Poll player for state updates. @@ -254,16 +184,6 @@ class PlayerProvider(Provider): if 'needs_poll' is set to True in the player object. """ - def on_group_child_power( - self, group_player: Player, child_player: Player, new_power: bool, group_state: PlayerState - ) -> None: - """ - Call when a power command was executed on one of the child players of a PlayerGroup. - - This is used to handle special actions such as (re)syncing. - The group state is sent with the state BEFORE the power command was executed. - """ - # DO NOT OVERRIDE BELOW @property diff --git a/music_assistant/server/models/provider.py b/music_assistant/server/models/provider.py index bf94eea5..81db5e77 100644 --- a/music_assistant/server/models/provider.py +++ b/music_assistant/server/models/provider.py @@ -101,6 +101,7 @@ class Provider: "domain": self.domain, "name": self.config.name or self.name, "instance_id": self.instance_id, + "lookup_key": self.lookup_key, "supported_features": [x.value for x in self.supported_features], "available": self.available, "is_streaming_provider": getattr(self, "is_streaming_provider", None), diff --git a/music_assistant/server/providers/airplay/__init__.py b/music_assistant/server/providers/airplay/__init__.py index d4add27b..a769732f 100644 --- a/music_assistant/server/providers/airplay/__init__.py +++ b/music_assistant/server/providers/airplay/__init__.py @@ -48,7 +48,7 @@ from music_assistant.constants import CONF_SYNC_ADJUST, VERBOSE_LOG_LEVEL from music_assistant.server import MusicAssistant from music_assistant.server.helpers.audio import FFMpeg, get_ffmpeg_stream, get_player_filter_params from music_assistant.server.helpers.process import AsyncProcess, check_output -from music_assistant.server.helpers.util import TaskManager +from music_assistant.server.helpers.util import TaskManager, lock from music_assistant.server.models.player_provider import PlayerProvider if TYPE_CHECKING: @@ -56,7 +56,7 @@ if TYPE_CHECKING: from music_assistant.common.models.provider import ProviderManifest from music_assistant.server import MusicAssistant from music_assistant.server.models import ProviderInstanceType - from music_assistant.server.providers.ugp import UniversalGroupProvider + from music_assistant.server.providers.player_group import PlayerGroupProvider DOMAIN = "airplay" @@ -645,6 +645,7 @@ class AirplayProvider(PlayerProvider): airplay_player = self._players[player_id] await airplay_player.cmd_pause() + @lock async def play_media( self, player_id: str, @@ -674,8 +675,8 @@ class AirplayProvider(PlayerProvider): ) elif media.queue_id.startswith("ugp_"): # special case: UGP stream - ugp_provider: UniversalGroupProvider = self.mass.get_provider("ugp") - ugp_stream = ugp_provider.streams[media.queue_id] + ugp_provider: PlayerGroupProvider = self.mass.get_provider("player_group") + ugp_stream = ugp_provider.ugp_streams[media.queue_id] input_format = ugp_stream.output_format audio_source = ugp_stream.subscribe() elif media.queue_id and media.queue_item_id: @@ -750,6 +751,7 @@ class AirplayProvider(PlayerProvider): # store last state in cache await self.mass.cache.set(player_id, volume_level, base_key=CACHE_KEY_PREV_VOLUME) + @lock async def cmd_sync(self, player_id: str, target_player: str) -> None: """Handle SYNC command for given player. @@ -790,9 +792,10 @@ class AirplayProvider(PlayerProvider): ) else: # make sure that the player manager gets an update - self.mass.players.update(child_player.player_id, skip_forward=True) - self.mass.players.update(parent_player.player_id, skip_forward=True) + self.mass.players.update(child_player.player_id, skip_redirect=True) + self.mass.players.update(parent_player.player_id, skip_redirect=True) + @lock async def cmd_unsync(self, player_id: str) -> None: """Handle UNSYNC command for given player. @@ -801,15 +804,16 @@ class AirplayProvider(PlayerProvider): - player_id: player_id of the player to handle the command. """ player = self.mass.players.get(player_id, raise_unavailable=True) - if not player.synced_to: - return - group_leader = self.mass.players.get(player.synced_to, raise_unavailable=True) - group_leader.group_childs.remove(player_id) - player.synced_to = None - await self.cmd_stop(player_id) - # make sure that the player manager gets an update - self.mass.players.update(player.player_id, skip_forward=True) - self.mass.players.update(group_leader.player_id, skip_forward=True) + if player.synced_to: + group_leader = self.mass.players.get(player.synced_to, raise_unavailable=True) + if player_id in group_leader.group_childs: + group_leader.group_childs.remove(player_id) + player.synced_to = None + airplay_player = self._players.get(player_id) + await airplay_player.cmd_stop() + # make sure that the player manager gets an update + self.mass.players.update(player.player_id, skip_redirect=True) + self.mass.players.update(group_leader.player_id, skip_redirect=True) async def _getcliraop_binary(self): """Find the correct raop/airplay binary belonging to the platform.""" @@ -893,18 +897,9 @@ class AirplayProvider(PlayerProvider): PlayerFeature.SYNC, PlayerFeature.VOLUME_SET, ), - can_sync_with=tuple(x for x in self._players if x != player_id), volume_level=volume, ) self.mass.players.register_or_update(mass_player) - # update can_sync_with field of all other players - # this ensure that the field always contains all player ids, - # even when a player joins later on - for player in self.players: - if player.player_id == player_id: - continue - player.can_sync_with = tuple(x for x in self._players if x != player.player_id) - self.mass.players.update(player.player_id) async def _handle_dacp_request( # noqa: PLR0915 self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter diff --git a/music_assistant/server/providers/bluesound/__init__.py b/music_assistant/server/providers/bluesound/__init__.py index 4130ca8c..123a44f6 100644 --- a/music_assistant/server/providers/bluesound/__init__.py +++ b/music_assistant/server/providers/bluesound/__init__.py @@ -27,9 +27,7 @@ from music_assistant.common.models.enums import ( ) from music_assistant.common.models.errors import PlayerCommandFailed from music_assistant.common.models.player import DeviceInfo, Player, PlayerMedia -from music_assistant.constants import ( - VERBOSE_LOG_LEVEL, -) +from music_assistant.constants import VERBOSE_LOG_LEVEL from music_assistant.server.helpers.util import ( get_port_from_zeroconf, get_primary_ip_address_from_zeroconf, @@ -219,10 +217,6 @@ class BluesoundPlayer: self.mass_player.active_source = self.sync_status.master self.mass_player.state = PLAYBACK_STATE_MAP[self.status.state] - self.mass_player.can_sync_with = ( - tuple(x for x in self.prov.bluos_players if x != self.player_id), - ) - self.mass.players.update(self.player_id) diff --git a/music_assistant/server/providers/chromecast/__init__.py b/music_assistant/server/providers/chromecast/__init__.py index 23b8507f..5121f71e 100644 --- a/music_assistant/server/providers/chromecast/__init__.py +++ b/music_assistant/server/providers/chromecast/__init__.py @@ -207,8 +207,9 @@ class ChromecastProvider(PlayerProvider): castplayer = self.castplayers[player_id] if powered: await self._launch_app(castplayer) - else: - await asyncio.to_thread(castplayer.cc.quit_app) + return + # handle power off + await asyncio.to_thread(castplayer.cc.quit_app) async def cmd_volume_set(self, player_id: str, volume_level: int) -> None: """Send VOLUME_SET command to given player.""" @@ -474,10 +475,26 @@ class ChromecastProvider(PlayerProvider): # active source if group_player: castplayer.player.active_source = group_player.player.active_source + castplayer.player.active_group = group_player.player.player_id elif castplayer.cc.app_id == MASS_APP_ID: castplayer.player.active_source = castplayer.player_id + castplayer.player.active_group = None else: castplayer.player.active_source = castplayer.cc.app_display_name + castplayer.player.active_group = None + + if status.content_id: + castplayer.player.current_media = PlayerMedia( + uri=status.content_id, + title=status.title, + artist=status.artist, + album=status.album_name, + image_url=status.images[0].url if status.images else None, + duration=status.duration, + media_type=MediaType.TRACK, + ) + else: + castplayer.player.current_media = None # current media self.mass.loop.call_soon_threadsafe(self.mass.players.update, castplayer.player_id) diff --git a/music_assistant/server/providers/chromecast/helpers.py b/music_assistant/server/providers/chromecast/helpers.py index cf6afc46..098062c2 100644 --- a/music_assistant/server/providers/chromecast/helpers.py +++ b/music_assistant/server/providers/chromecast/helpers.py @@ -190,11 +190,8 @@ class CastStatusListener: if group_player := self.prov.castplayers.get(group_uuid): if group_player.cc.media_controller.is_active: self.castplayer.active_group = group_uuid - self.castplayer.player.active_source = group_uuid - self.castplayer.player.state = group_player.player.state elif group_uuid == self.castplayer.active_group: self.castplayer.active_group = None - self.castplayer.player.active_source = self.castplayer.player.player_id self.prov.logger.log( VERBOSE_LOG_LEVEL, diff --git a/music_assistant/server/providers/hass_players/__init__.py b/music_assistant/server/providers/hass_players/__init__.py index 15454d20..bc860755 100644 --- a/music_assistant/server/providers/hass_players/__init__.py +++ b/music_assistant/server/providers/hass_players/__init__.py @@ -366,16 +366,7 @@ class HomeAssistantPlayers(PlayerProvider): ) -> None: """Handle setup of a Player from an hass entity.""" hass_device: HassDevice | None = None - platform_players: list[str] = [] if entity_registry_entry := entity_registry.get(state["entity_id"]): - # collect all players from same platform - platform_players = [ - entity_id - for entity_id, entity in entity_registry.items() - if entity["platform"] == entity_registry_entry["platform"] - and state["entity_id"].startswith("media_player") - and entity_id != state["entity_id"] - ] hass_device = device_registry.get(entity_registry_entry["device_id"]) hass_supported_features = MediaPlayerEntityFeature( state["attributes"]["supported_features"] @@ -410,8 +401,6 @@ class HomeAssistantPlayers(PlayerProvider): supported_features=tuple(supported_features), state=StateMap.get(state["state"], PlayerState.IDLE), ) - if MediaPlayerEntityFeature.GROUPING in hass_supported_features: - player.can_sync_with = platform_players self._update_player_attributes(player, state["attributes"]) self.mass.players.register_or_update(player) diff --git a/music_assistant/server/providers/player_group/__init__.py b/music_assistant/server/providers/player_group/__init__.py new file mode 100644 index 00000000..8d08bf1e --- /dev/null +++ b/music_assistant/server/providers/player_group/__init__.py @@ -0,0 +1,727 @@ +""" +Sync Group Player provider. + +This is more like a "virtual" player provider, +allowing the user to create 'presets' of players to sync together (of the same type). +""" + +from __future__ import annotations + +from collections.abc import Callable +from time import time +from typing import TYPE_CHECKING, Final, cast + +import shortuuid +from aiohttp import web + +from music_assistant.common.models.config_entries import ( + BASE_PLAYER_CONFIG_ENTRIES, + CONF_ENTRY_CROSSFADE, + CONF_ENTRY_CROSSFADE_DURATION, + CONF_ENTRY_FLOW_MODE_ENFORCED, + CONF_ENTRY_PLAYER_ICON_GROUP, + ConfigEntry, + ConfigValueOption, + ConfigValueType, + PlayerConfig, + create_sample_rates_config_entry, +) +from music_assistant.common.models.enums import ( + ConfigEntryType, + ContentType, + EventType, + MediaType, + PlayerFeature, + PlayerState, + PlayerType, + ProviderFeature, +) +from music_assistant.common.models.errors import ( + ProviderUnavailableError, + UnsupportedFeaturedException, +) +from music_assistant.common.models.event import MassEvent +from music_assistant.common.models.media_items import AudioFormat +from music_assistant.common.models.player import DeviceInfo, Player, PlayerMedia +from music_assistant.constants import ( + CONF_CROSSFADE, + CONF_CROSSFADE_DURATION, + CONF_ENABLE_ICY_METADATA, + CONF_ENFORCE_MP3, + CONF_FLOW_MODE, + CONF_GROUP_MEMBERS, + CONF_HTTP_PROFILE, + CONF_SAMPLE_RATES, +) +from music_assistant.server.controllers.streams import DEFAULT_STREAM_HEADERS +from music_assistant.server.helpers.ffmpeg import get_ffmpeg_stream +from music_assistant.server.helpers.util import TaskManager +from music_assistant.server.models.player_provider import PlayerProvider + +from .ugp_stream import UGP_FORMAT, UGPStream + +if TYPE_CHECKING: + from collections.abc import Iterable + + from music_assistant.common.models.config_entries import ProviderConfig + from music_assistant.common.models.provider import ProviderManifest + from music_assistant.server import MusicAssistant + from music_assistant.server.models import ProviderInstanceType + + +# ruff: noqa: ARG002 + +UNIVERSAL_PREFIX: Final[str] = "ugp_" +SYNCGROUP_PREFIX: Final[str] = "syncgroup_" +GROUP_TYPE_UNIVERSAL: Final[str] = "universal" +CONF_GROUP_TYPE: Final[str] = "group_type" +CONF_ENTRY_GROUP_TYPE = ConfigEntry( + key=CONF_GROUP_TYPE, + type=ConfigEntryType.STRING, + label="Group type", + default_value="universal", + hidden=True, + required=True, +) +CONF_ENTRY_GROUP_MEMBERS = ConfigEntry( + key=CONF_GROUP_MEMBERS, + type=ConfigEntryType.STRING, + label="Group members", + default_value=[], + description="Select all players you want to be part of this group", + multi_value=True, + required=True, +) +CONF_ENTRY_SAMPLE_RATES_UGP = create_sample_rates_config_entry(44100, 16, 44100, 16, True) +CONFIG_ENTRY_UGP_NOTE = ConfigEntry( + key="ugp_note", + type=ConfigEntryType.LABEL, + label="Please note that although the Universal Group " + "allows you to group any player, it will not enable audio sync " + "between players of different ecosystems. It is advised to always use native " + "player groups or sync groups when available for your player type(s) and use " + "the Universal Group only to group players of different ecosystems.", + required=False, +) + + +async def setup( + mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig +) -> ProviderInstanceType: + """Initialize provider(instance) with given configuration.""" + return PlayerGroupProvider(mass, manifest, config) + + +async def get_config_entries( + mass: MusicAssistant, # noqa: ARG001 + instance_id: str | None = None, # noqa: ARG001 + action: str | None = None, # noqa: ARG001 + values: dict[str, ConfigValueType] | None = None, # noqa: ARG001 +) -> tuple[ConfigEntry, ...]: + """ + Return Config entries to setup this provider. + + instance_id: id of an existing provider instance (None if new instance setup). + action: [optional] action key called from config entries UI. + values: the (intermediate) raw values for config entries sent with the action. + """ + # nothing to configure (for now) + return () + + +class PlayerGroupProvider(PlayerProvider): + """Base/builtin provider for creating (permanent) player groups.""" + + @property + def supported_features(self) -> tuple[ProviderFeature, ...]: + """Return the features supported by this Provider.""" + return () + + def __init__( + self, mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig + ) -> None: + """Initialize MusicProvider.""" + super().__init__(mass, manifest, config) + self.ugp_streams: dict[str, UGPStream] = {} + self._on_unload: list[Callable[[], None]] = [ + self.mass.register_api_command("player_group/create", self.create_group), + ] + + async def loaded_in_mass(self) -> None: + """Call after the provider has been loaded.""" + # temp: migrate old config entries + # remove this after MA 2.4 release + for player_config in await self.mass.config.get_player_configs(): + if player_config.provider == self.instance_id: + # already migrated + continue + # migrate old syncgroup players to this provider + if player_config.player_id.startswith(SYNCGROUP_PREFIX): + self.mass.config.set_raw_player_config_value( + player_config.player_id, CONF_GROUP_TYPE, player_config.provider + ) + player_config.provider = self.instance_id + self.mass.config.set_raw_player_config_value( + player_config.player_id, "provider", self.instance_id + ) + # migrate old UGP players to this provider + elif player_config.player_id.startswith(UNIVERSAL_PREFIX): + self.mass.config.set_raw_player_config_value( + player_config.player_id, CONF_GROUP_TYPE, "universal" + ) + player_config.provider = self.instance_id + self.mass.config.set_raw_player_config_value( + player_config.player_id, "provider", self.instance_id + ) + + await self._register_all_players() + # listen for player added events so we can catch late joiners + # (because a group depends on its childs to be available) + self._on_unload.append( + self.mass.subscribe(self._on_mass_player_added_event, EventType.PLAYER_ADDED) + ) + + async def unload(self) -> None: + """ + Handle unload/close of the provider. + + Called when provider is deregistered (e.g. MA exiting or config reloading). + """ + for unload_cb in self._on_unload: + unload_cb() + + async def get_player_config_entries(self, player_id: str) -> tuple[ConfigEntry]: + """Return all (provider/player specific) Config Entries for the given player (if any).""" + # default entries for player groups + base_entries = ( + *BASE_PLAYER_CONFIG_ENTRIES, + CONF_ENTRY_PLAYER_ICON_GROUP, + CONF_ENTRY_GROUP_TYPE, + CONF_ENTRY_GROUP_MEMBERS, + ) + # group type is static and can not be changed. we just grab the existing, stored value + group_type: str = self.mass.config.get_raw_player_config_value( + player_id, CONF_GROUP_TYPE, GROUP_TYPE_UNIVERSAL + ) + # handle config entries for universal group players + if group_type == GROUP_TYPE_UNIVERSAL: + group_members = CONF_ENTRY_GROUP_MEMBERS + group_members.options = tuple( + ConfigValueOption(x.display_name, x.player_id) + for x in self.mass.players.all(True, False) + if not x.player_id.startswith(UNIVERSAL_PREFIX) + ) + return ( + *base_entries, + group_members, + CONFIG_ENTRY_UGP_NOTE, + CONF_ENTRY_CROSSFADE, + CONF_ENTRY_CROSSFADE_DURATION, + CONF_ENTRY_SAMPLE_RATES_UGP, + CONF_ENTRY_FLOW_MODE_ENFORCED, + ) + # handle config entries for syncgroup players + group_members = CONF_ENTRY_GROUP_MEMBERS + group_members.options = tuple( + ConfigValueOption(x.display_name, x.player_id) + for x in self.mass.players.all(True, False) + if x.provider != self.instance_id + and (player_prov := self.mass.get_provider(x.provider)) + and ProviderFeature.SYNC_PLAYERS in player_prov.supported_features + ) + + # grab additional details from one of the provider's players + if not (player_provider := self.mass.get_provider(group_type)): + return base_entries # guard + if TYPE_CHECKING: + player_provider = cast(PlayerProvider, player_provider) + assert player_provider.lookup_key != self.lookup_key + if not (child_player := next((x for x in player_provider.players), None)): + return base_entries # guard + + # combine base group entries with (base) player entries for this player type + allowed_conf_entries = ( + CONF_HTTP_PROFILE, + CONF_ENABLE_ICY_METADATA, + CONF_CROSSFADE, + CONF_CROSSFADE_DURATION, + CONF_ENFORCE_MP3, + CONF_FLOW_MODE, + CONF_SAMPLE_RATES, + ) + child_config_entries = await player_provider.get_player_config_entries( + child_player.player_id + ) + return ( + *base_entries, + group_members, + *(entry for entry in child_config_entries if entry.key in allowed_conf_entries), + ) + + def on_player_config_changed(self, config: PlayerConfig, changed_keys: set[str]) -> None: + """Call (by config manager) when the configuration of a player changes.""" + if "enabled" in changed_keys and not config.enabled: + # edge case: ensure that the player is powered off if the player gets disabled + self.mass.create_task(self.cmd_power(config.player_id, False)) + if f"values/{CONF_GROUP_MEMBERS}" in changed_keys: + members = config.get_value(CONF_GROUP_MEMBERS) + # ensure we filter invalid members + members = self._filter_members(config.get_value(CONF_GROUP_TYPE), members) + self.mass.config.set_raw_player_config_value( + config.player_id, CONF_GROUP_MEMBERS, members + ) + if player := self.mass.players.get(config.player_id): + player.group_childs = members + self.mass.players.update(config.player_id) + + def on_player_config_removed(self, player_id: str) -> None: + """Call (by config manager) when the configuration of a player is removed.""" + if not (group_player := self.mass.players.get(player_id)): + return + if group_player.powered: + # edge case: the group player is powered and being removed + for member in self.mass.players.iter_group_members(group_player, only_powered=True): + member.active_group = None + if member.state == PlayerState.IDLE: + continue + if member.synced_to: + continue + self.mass.create_task( + self.mass.players.cmd_stop(member.player_id, skip_redirect=True) + ) + self.mass.players.remove(group_player.player_id, False) + + async def cmd_stop(self, player_id: str) -> None: + """Send STOP command to given player.""" + group_player = self.mass.players.get(player_id) + if player_id.startswith(SYNCGROUP_PREFIX): + # syncgroup: forward command to sync leader + if sync_leader := self._get_sync_leader(group_player): + await self.mass.players.cmd_stop(sync_leader.player_id, skip_redirect=True) + else: + # ugp: forward command to all active members + async with TaskManager(self.mass) as tg: + for member in self.mass.players.iter_group_members(group_player, active_only=True): + if member.state not in (PlayerState.PAUSED, PlayerState.PLAYING): + continue + tg.create_task(self.mass.players.cmd_stop(member.player_id, skip_redirect=True)) + # abort the stream session + if (stream := self.ugp_streams.pop(player_id, None)) and not stream.done: + await stream.stop() + # set state optimistically + group_player.state = PlayerState.IDLE + self.mass.players.update(player_id) + + async def cmd_play(self, player_id: str) -> None: + """Send PLAY command to given player.""" + group_player = self.mass.players.get(player_id) + if not player_id.startswith(SYNCGROUP_PREFIX): + # this shouldn't happen, but just in case + raise UnsupportedFeaturedException("Command is not supported for UGP players") + # forward command to sync leader + if sync_leader := self._get_sync_leader(group_player): + await self.mass.players.cmd_play(sync_leader.player_id, skip_redirect=True) + + async def cmd_pause(self, player_id: str) -> None: + """Send PAUSE command to given player.""" + group_player = self.mass.players.get(player_id) + if not player_id.startswith(SYNCGROUP_PREFIX): + raise UnsupportedFeaturedException("Command is not supported for UGP players") + # forward command to sync leader + if sync_leader := self._get_sync_leader(group_player): + await self.mass.players.cmd_pause(sync_leader.player_id, skip_redirect=True) + + async def cmd_power(self, player_id: str, powered: bool) -> None: + """Handle POWER command to group player.""" + group_player = self.mass.players.get(player_id, raise_unavailable=True) + if TYPE_CHECKING: + group_player = cast(Player, group_player) + + # always stop at power off + if not powered and group_player.state in (PlayerState.PLAYING, PlayerState.PAUSED): + await self.cmd_stop(group_player.player_id) + + async with TaskManager(self.mass) as tg: + if powered: + # handle TURN_ON of the group player by turning on all members + for member in self.mass.players.iter_group_members( + group_player, only_powered=False, active_only=False + ): + if ( + member.state in (PlayerState.PLAYING, PlayerState.PAUSED) + and member.active_source != group_player.active_source + ): + # stop playing existing content on member if we start the group player + tg.create_task( + self.mass.players.cmd_stop(member.player_id, skip_redirect=True) + ) + if not member.powered: + tg.create_task( + self.mass.players.cmd_power(member.player_id, True, skip_redirect=True) + ) + # set active source to group player if the group (is going to be) powered + member.active_group = group_player.player_id + member.active_source = group_player.active_source + else: + # handle TURN_OFF of the group player by turning off all members + for member in self.mass.players.iter_group_members( + group_player, only_powered=True, active_only=True + ): + # reset active group on player when the group is turned off + member.active_group = None + member.active_source = None + # handle TURN_OFF of the group player by turning off all members + if member.powered: + tg.create_task( + self.mass.players.cmd_power(member.player_id, False, skip_redirect=True) + ) + if powered and player_id.startswith(SYNCGROUP_PREFIX): + await self._sync_syncgroup(group_player) + # optimistically set the group state + group_player.powered = powered + self.mass.players.update(group_player.player_id) + + async def cmd_volume_set(self, player_id: str, volume_level: int) -> None: + """Send VOLUME_SET command to given player.""" + # group volume is already handled in the player manager + + async def play_media( + self, + player_id: str, + media: PlayerMedia, + ) -> None: + """Handle PLAY MEDIA on given player.""" + group_player = self.mass.players.get(player_id) + # power on (or resync) if needed + if not group_player.powered: + await self.cmd_power(player_id, True) + elif player_id.startswith(SYNCGROUP_PREFIX): + await self._sync_syncgroup(group_player) + + # set the state optimistically + group_player.current_media = media + group_player.elapsed_time = 0 + group_player.elapsed_time_last_updated = time() - 1 + group_player.state = PlayerState.PLAYING + self.mass.players.update(player_id) + + # handle play_media for sync group + if player_id.startswith(SYNCGROUP_PREFIX): + # simply forward the command to the sync leader + if sync_leader := self._select_sync_leader(group_player): + await self.mass.players.play_media( + sync_leader.player_id, media=media, skip_redirect=True + ) + return + + # handle play_media for UGP group + if (existing := self.ugp_streams.pop(player_id, None)) and not existing.done: + # stop any existing stream first + await existing.stop() + + # select audio source + if media.media_type == MediaType.ANNOUNCEMENT: + # special case: stream announcement + audio_source = self.mass.streams.get_announcement_stream( + media.custom_data["url"], + output_format=UGP_FORMAT, + use_pre_announce=media.custom_data["use_pre_announce"], + ) + elif media.queue_id and media.queue_item_id: + # regular queue stream request + audio_source = self.mass.streams.get_flow_stream( + queue=self.mass.player_queues.get(media.queue_id), + start_queue_item=self.mass.player_queues.get_item( + media.queue_id, media.queue_item_id + ), + pcm_format=UGP_FORMAT, + ) + else: + # assume url or some other direct path + # NOTE: this will fail if its an uri not playable by ffmpeg + audio_source = get_ffmpeg_stream( + audio_input=media.uri, + input_format=AudioFormat(ContentType.try_parse(media.uri)), + output_format=UGP_FORMAT, + ) + + # start the stream task + self.ugp_streams[player_id] = UGPStream(audio_source=audio_source, audio_format=UGP_FORMAT) + base_url = f"{self.mass.streams.base_url}/ugp/{player_id}.aac" + + # forward to downstream play_media commands + async with TaskManager(self.mass) as tg: + for member in self.mass.players.iter_group_members( + group_player, only_powered=True, active_only=True + ): + tg.create_task( + self.mass.players.play_media( + member.player_id, + media=PlayerMedia( + uri=f"{base_url}?player_id={member.player_id}", + media_type=MediaType.FLOW_STREAM, + title=group_player.display_name, + queue_id=group_player.player_id, + ), + ) + ) + + async def enqueue_next_media(self, player_id: str, media: PlayerMedia) -> None: + """Handle enqueuing of a next media item on the player.""" + group_player = self.mass.players.get(player_id, True) + if not player_id.startswith(SYNCGROUP_PREFIX): + # this shouldn't happen, but just in case + raise UnsupportedFeaturedException("Command is not supported for UGP players") + if sync_leader := self._get_sync_leader(group_player): + await self.enqueue_next_media( + sync_leader.player_id, + media=media, + ) + + async def poll_player(self, player_id: str) -> None: + """Poll player for state updates. + + This is called by the Player Manager; + if 'needs_poll' is set to True in the player object. + """ + if group_player := self.mass.players.get(player_id): + self._update_attributes(group_player) + + async def create_group(self, group_type: str, name: str, members: list[str]) -> Player: + """Create new Group Player.""" + # perform basic checks + if group_type == GROUP_TYPE_UNIVERSAL: + prefix = UNIVERSAL_PREFIX + else: + prefix = SYNCGROUP_PREFIX + if (player_prov := self.mass.get_provider(group_type)) is None: + msg = f"Provider {group_type} is not available!" + raise ProviderUnavailableError(msg) + if ProviderFeature.SYNC_PLAYERS not in player_prov.supported_features: + msg = f"Provider {player_prov.name} does not support creating groups" + raise UnsupportedFeaturedException(msg) + + new_group_id = f"{prefix}{shortuuid.random(8).lower()}" + # cleanup list, just in case the frontend sends some garbage + members = self._filter_members(group_type, members) + # create default config with the user chosen name + self.mass.config.create_default_player_config( + new_group_id, + player_prov.instance_id, + name=name, + enabled=True, + values={CONF_GROUP_MEMBERS: members, CONF_GROUP_TYPE: group_type}, + ) + return self._register_group_player( + group_player_id=new_group_id, group_type=group_type, name=name, members=members + ) + + async def _register_all_players(self) -> None: + """Register all (virtual/fake) group players in the Player controller.""" + player_configs = await self.mass.config.get_player_configs( + self.instance_id, include_values=True + ) + for player_config in player_configs: + if self.mass.players.get(player_config.player_id): + continue # already registered + members = player_config.get_value(CONF_GROUP_MEMBERS) + group_type = player_config.get_value(CONF_GROUP_TYPE) + self._register_group_player( + player_config.player_id, + group_type, + player_config.name or player_config.default_name, + members, + ) + + def _register_group_player( + self, group_player_id: str, group_type: str, name: str, members: Iterable[str] + ) -> Player: + """Register a syncgroup player.""" + player_features = {PlayerFeature.POWER, PlayerFeature.VOLUME_SET} + if group_type == GROUP_TYPE_UNIVERSAL: + model_name = "Universal Group" + manufacturer = self.name + # register dynamic route for the ugp stream + route_path = f"/ugp/{group_player_id}.aac" + self._on_unload.append( + self.mass.streams.register_dynamic_route(route_path, self._serve_ugp_stream) + ) + elif player_provider := self.mass.get_provider(group_type): + # grab additional details from one of the provider's players + if TYPE_CHECKING: + player_provider = cast(PlayerProvider, player_provider) + model_name = "Sync Group" + manufacturer = self.mass.get_provider(group_type).name + if child_player := next((x for x in player_provider.players), None): + for feature in ( + PlayerFeature.PAUSE, + PlayerFeature.VOLUME_MUTE, + ): + if feature in child_player.supported_features: + player_features.add(feature) + else: + # this may happen if the provider is not available yet + model_name = "Sync Group" + manufacturer = self.name + + player = Player( + player_id=group_player_id, + provider=self.instance_id, + type=PlayerType.GROUP, + name=name, + available=True, + powered=False, + device_info=DeviceInfo(model=model_name, manufacturer=manufacturer), + supported_features=tuple(player_features), + group_childs=set(members), + active_source=group_player_id, + ) + + self.mass.players.register_or_update(player) + self._update_attributes(player) + return player + + def _get_sync_leader(self, group_player: Player) -> Player | None: + """Get the active sync leader player for the syncgroup.""" + if group_player.synced_to: + # should not happen but just in case... + return self.mass.players.get(group_player.synced_to) + # Return the (first/only) player that has group childs + for child_player in self.mass.players.iter_group_members( + group_player, only_powered=False, only_playing=False, active_only=False + ): + if child_player.group_childs: + return child_player + return None + + def _select_sync_leader(self, group_player: Player) -> Player | None: + """Select the active sync leader player for a syncgroup.""" + if sync_leader := self._get_sync_leader(group_player): + return sync_leader + # select new sync leader: return the first active player + for child_player in self.mass.players.iter_group_members(group_player, active_only=True): + if child_player.active_group not in (None, group_player.player_id): + continue + if ( + child_player.active_source + and child_player.active_source != group_player.active_source + ): + continue + return child_player + # fallback select new sync leader: simply return the first (available) player + for child_player in self.mass.players.iter_group_members( + group_player, only_powered=False, only_playing=False, active_only=False + ): + return child_player + # this really should not be possible + raise RuntimeError("Impossible to select sync leader for syncgroup") + + async def _sync_syncgroup(self, group_player: Player) -> None: + """Sync all (possible) players of a syncgroup.""" + sync_leader = self._select_sync_leader(group_player) + members_to_sync: list[str] = [] + for member in self.mass.players.iter_group_members(group_player, active_only=True): + if sync_leader.player_id == member.player_id: + # skip sync leader + continue + if member.synced_to == sync_leader.player_id: + # already synced + continue + if member.synced_to and member.synced_to != sync_leader.player_id: + # unsync first + await self.mass.players.cmd_unsync(member.player_id) + members_to_sync.append(member.player_id) + if members_to_sync: + await self.mass.players.cmd_sync_many(sync_leader.player_id, members_to_sync) + + async def _on_mass_player_added_event(self, event: MassEvent) -> None: + """Handle player added event from player controller.""" + await self._register_all_players() + + def _update_attributes(self, player: Player) -> None: + """Update attributes of a player.""" + for child_player in self.mass.players.iter_group_members(player, active_only=True): + # just grab the first active player + player.state = child_player.state + if player.current_media: + player.current_media = child_player.current_media + player.elapsed_time = child_player.elapsed_time + player.elapsed_time_last_updated = child_player.elapsed_time_last_updated + break + else: + player.state = PlayerState.IDLE + player.active_source = player.player_id + self.mass.players.update(player.player_id) + + async def _serve_ugp_stream(self, request: web.Request) -> web.Response: + """Serve the UGP (multi-client) flow stream audio to a player.""" + ugp_player_id = request.path.rsplit(".")[0].rsplit("/")[-1] + child_player_id = request.query.get("player_id") # optional! + + if not (ugp_player := self.mass.players.get(ugp_player_id)): + raise web.HTTPNotFound(reason=f"Unknown UGP player: {ugp_player_id}") + + if not (stream := self.ugp_streams.get(ugp_player_id, None)) or stream.done: + raise web.HTTPNotFound(body=f"There is no active UGP stream for {ugp_player_id}!") + + http_profile: str = await self.mass.config.get_player_config_value( + child_player_id, CONF_HTTP_PROFILE + ) + headers = { + **DEFAULT_STREAM_HEADERS, + "Content-Type": "audio/aac", + "Accept-Ranges": "none", + "Cache-Control": "no-cache", + "Connection": "close", + } + + resp = web.StreamResponse(status=200, reason="OK", headers=headers) + if http_profile == "forced_content_length": + resp.content_length = 4294967296 + elif http_profile == "chunked": + resp.enable_chunked_encoding() + + await resp.prepare(request) + + # return early if this is not a GET request + if request.method != "GET": + return resp + + # all checks passed, start streaming! + self.logger.debug( + "Start serving UGP flow audio stream for UGP-player %s to %s", + ugp_player.display_name, + child_player_id or request.remote, + ) + async for chunk in stream.subscribe(): + try: + await resp.write(chunk) + except (ConnectionError, ConnectionResetError): + break + + return resp + + def _filter_members(self, provider: str, members: list[str]) -> list[str]: + """Filter out members that are not valid players.""" + if provider != GROUP_TYPE_UNIVERSAL: + return [ + x + for x in members + if (player := self.mass.players.get(x)) and player.provider == provider + ] + # cleanup members - filter out impossible choices + syncgroup_childs: list[str] = [] + for member in members: + if not member.startswith(SYNCGROUP_PREFIX): + continue + if syncgroup := self.mass.players.get(member): + syncgroup_childs.extend(syncgroup.group_childs) + # we filter out other UGP players and syncgroup childs + # if their parent is already in the list + return [ + x + for x in members + if self.mass.players.get(x) + and x not in syncgroup_childs + and not x.startswith(UNIVERSAL_PREFIX) + ] diff --git a/music_assistant/server/providers/player_group/manifest.json b/music_assistant/server/providers/player_group/manifest.json new file mode 100644 index 00000000..8fa93574 --- /dev/null +++ b/music_assistant/server/providers/player_group/manifest.json @@ -0,0 +1,13 @@ +{ + "type": "player", + "domain": "player_group", + "name": "Playergroup", + "description": "Create (permanent) groups of your favorite players. \nSupports both syncgroups (to group speakers of the same ecocystem to play in sync) and universal groups to group speakers of different ecosystems to play the same audio (but not in sync).", + "codeowners": ["@music-assistant"], + "requirements": [], + "documentation": "https://music-assistant.io/player-support/player_group/", + "multi_instance": false, + "builtin": true, + "allow_disable": false, + "icon": "speaker-multiple" +} diff --git a/music_assistant/server/providers/player_group/ugp_stream.py b/music_assistant/server/providers/player_group/ugp_stream.py new file mode 100644 index 00000000..bf6f3042 --- /dev/null +++ b/music_assistant/server/providers/player_group/ugp_stream.py @@ -0,0 +1,91 @@ +""" +Implementation of a Stream for the Universal Group Player. + +Basically this is like a fake radio radio stream (AAC) format with multiple subscribers. +The AAC format is chosen because it is widely supported and has a good balance between +quality and bandwidth and also allows for mid-stream joining of (extra) players. +""" + +from __future__ import annotations + +import asyncio +from collections.abc import AsyncGenerator, Awaitable, Callable + +from music_assistant.common.models.enums import ContentType +from music_assistant.common.models.media_items import AudioFormat +from music_assistant.server.helpers.audio import get_ffmpeg_stream + +# ruff: noqa: ARG002 + +UGP_FORMAT = AudioFormat( + content_type=ContentType.PCM_F32LE, + sample_rate=44100, + bit_depth=32, +) + + +class UGPStream: + """ + Implementation of a Stream for the Universal Group Player. + + Basically this is like a fake radio radio stream (AAC) format with multiple subscribers. + The AAC format is chosen because it is widely supported and has a good balance between + quality and bandwidth and also allows for mid-stream joining of (extra) players. + """ + + def __init__( + self, + audio_source: AsyncGenerator[bytes, None], + audio_format: AudioFormat, + ) -> None: + """Initialize UGP Stream.""" + self.audio_source = audio_source + self.input_format = audio_format + self.output_format = AudioFormat(content_type=ContentType.AAC) + self.subscribers: list[Callable[[bytes], Awaitable]] = [] + self._task: asyncio.Task | None = None + self._done: asyncio.Event = asyncio.Event() + + @property + def done(self) -> bool: + """Return if this stream is already done.""" + return self._done.is_set() and self._task and self._task.done() + + async def stop(self) -> None: + """Stop/cancel the stream.""" + if self._done.is_set(): + return + if self._task and not self._task.done(): + self._task.cancel() + self._done.set() + + async def subscribe(self) -> AsyncGenerator[bytes, None]: + """Subscribe to the raw/unaltered audio stream.""" + # start the runner as soon as the (first) client connects + if not self._task: + self._task = asyncio.create_task(self._runner()) + queue = asyncio.Queue(1) + try: + self.subscribers.append(queue.put) + while True: + chunk = await queue.get() + if not chunk: + break + yield chunk + finally: + self.subscribers.remove(queue.put) + + async def _runner(self) -> None: + """Run the stream for the given audio source.""" + await asyncio.sleep(0.25) # small delay to allow subscribers to connect + async for chunk in get_ffmpeg_stream( + audio_input=self.audio_source, + input_format=self.input_format, + output_format=self.output_format, + # TODO: enable readrate limiting + initial burst once we have a newer ffmpeg version + # extra_input_args=["-readrate", "1.15"], + ): + await asyncio.gather(*[sub(chunk) for sub in self.subscribers], return_exceptions=True) + # empty chunk when done + await asyncio.gather(*[sub(b"") for sub in self.subscribers], return_exceptions=True) + self._done.set() diff --git a/music_assistant/server/providers/slimproto/__init__.py b/music_assistant/server/providers/slimproto/__init__.py index be477c25..2bfd094c 100644 --- a/music_assistant/server/providers/slimproto/__init__.py +++ b/music_assistant/server/providers/slimproto/__init__.py @@ -8,7 +8,6 @@ import statistics import time from collections import deque from collections.abc import Iterator -from contextlib import suppress from dataclasses import dataclass from typing import TYPE_CHECKING @@ -61,7 +60,7 @@ from music_assistant.constants import ( from music_assistant.server.helpers.audio import get_ffmpeg_stream, get_player_filter_params from music_assistant.server.helpers.util import TaskManager from music_assistant.server.models.player_provider import PlayerProvider -from music_assistant.server.providers.ugp import UniversalGroupProvider +from music_assistant.server.providers.player_group import PlayerGroupProvider from .multi_client_stream import MultiClientStream @@ -274,16 +273,11 @@ class SlimprotoProvider(PlayerProvider): async def get_player_config_entries(self, player_id: str) -> tuple[ConfigEntry]: """Return all (provider/player specific) Config Entries for the given player (if any).""" base_entries = await super().get_player_config_entries(player_id) - if not (slimclient := self.slimproto.get_player(player_id)): - # most probably a syncgroup - return ( - *base_entries, - CONF_ENTRY_CROSSFADE, - CONF_ENTRY_CROSSFADE_DURATION, - CONF_ENTRY_HTTP_PROFILE_FORCED_2, - create_sample_rates_config_entry(96000, 24, 48000, 24), - ) - + if slimclient := self.slimproto.get_player(player_id): + max_sample_rate = int(slimclient.max_sample_rate) + else: + # player not (yet) connected? use default + max_sample_rate = 48000 # create preset entries (for players that support it) preset_entries = () presets = [] @@ -305,7 +299,6 @@ class SlimprotoProvider(PlayerProvider): ) for index in range(1, preset_count + 1) ) - return ( base_entries + preset_entries @@ -321,7 +314,7 @@ class SlimprotoProvider(PlayerProvider): CONF_ENTRY_DISPLAY, CONF_ENTRY_VISUALIZATION, CONF_ENTRY_HTTP_PROFILE_FORCED_2, - create_sample_rates_config_entry(int(slimclient.max_sample_rate), 24, 48000, 24), + create_sample_rates_config_entry(max_sample_rate, 24, 48000, 24), ) ) @@ -385,9 +378,9 @@ class SlimprotoProvider(PlayerProvider): ) elif media.queue_id.startswith("ugp_"): # special case: UGP stream - ugp_provider: UniversalGroupProvider = self.mass.get_provider("ugp") - ugp_stream = ugp_provider.streams[media.queue_id] - audio_source = ugp_stream.subscribe_raw() + ugp_provider: PlayerGroupProvider = self.mass.get_provider("player_group") + ugp_stream = ugp_provider.ugp_streams[media.queue_id] + audio_source = ugp_stream.subscribe() elif media.queue_id and media.queue_item_id: # regular queue stream request audio_source = self.mass.streams.get_flow_stream( @@ -557,6 +550,8 @@ class SlimprotoProvider(PlayerProvider): parent_player.group_childs.add(child_player.player_id) child_player.synced_to = parent_player.player_id # check if we should (re)start or join a stream session + # TODO: support late joining of a client into an existing stream session + # so it doesn't need to be restarted anymore. active_queue = self.mass.player_queues.get_active_queue(parent_player.player_id) if active_queue.state == PlayerState.PLAYING: # playback needs to be restarted to form a new multi client stream session @@ -571,23 +566,26 @@ class SlimprotoProvider(PlayerProvider): ) else: # make sure that the player manager gets an update - self.mass.players.update(child_player.player_id, skip_forward=True) - self.mass.players.update(parent_player.player_id, skip_forward=True) + self.mass.players.update(child_player.player_id, skip_redirect=True) + self.mass.players.update(parent_player.player_id, skip_redirect=True) async def cmd_unsync(self, player_id: str) -> None: - """Handle UNSYNC command for given player.""" - child_player = self.mass.players.get(player_id) - parent_player = self.mass.players.get(child_player.synced_to) - # make sure to send stop to the player - await self.cmd_stop(child_player.player_id) - child_player.synced_to = None - with suppress(KeyError): - parent_player.group_childs.remove(child_player.player_id) - if parent_player.group_childs == {parent_player.player_id}: - # last child vanished; the sync group is dissolved - parent_player.group_childs.remove(parent_player.player_id) - self.mass.players.update(child_player.player_id) - self.mass.players.update(parent_player.player_id) + """Handle UNSYNC command for given player. + + Remove the given player from any syncgroups it currently is synced to. + + - player_id: player_id of the player to handle the command. + """ + player = self.mass.players.get(player_id, raise_unavailable=True) + if player.synced_to: + group_leader = self.mass.players.get(player.synced_to, raise_unavailable=True) + group_leader.group_childs.remove(player_id) + player.synced_to = None + if slimclient := self.slimproto.get_player(player_id): + await slimclient.stop() + # make sure that the player manager gets an update + self.mass.players.update(player.player_id, skip_redirect=True) + self.mass.players.update(group_leader.player_id, skip_redirect=True) def _client_callback( self, @@ -649,9 +647,6 @@ class SlimprotoProvider(PlayerProvider): PlayerFeature.PAUSE, PlayerFeature.VOLUME_MUTE, ), - can_sync_with=tuple( - x.player_id for x in self.slimproto.players if x.player_id != player_id - ), ) self.mass.players.register_or_update(player) @@ -855,12 +850,6 @@ class SlimprotoProvider(PlayerProvider): await self._set_display(slimplayer) # update all attributes await self._handle_player_update(slimplayer) - # update existing players so they can update their `can_sync_with` field - for _player in self.players: - _player.can_sync_with = tuple( - x.player_id for x in self.slimproto.players if x.player_id != _player.player_id - ) - self.mass.players.update(_player.player_id) # restore volume and power state if last_state := await self.mass.cache.get(player_id, base_key=CACHE_KEY_PREV_STATE): init_power = last_state[0] diff --git a/music_assistant/server/providers/snapcast/__init__.py b/music_assistant/server/providers/snapcast/__init__.py index 36855442..4a0d6885 100644 --- a/music_assistant/server/providers/snapcast/__init__.py +++ b/music_assistant/server/providers/snapcast/__init__.py @@ -57,7 +57,7 @@ if TYPE_CHECKING: from music_assistant.common.models.provider import ProviderManifest from music_assistant.server import MusicAssistant from music_assistant.server.models import ProviderInstanceType - from music_assistant.server.providers.ugp import UniversalGroupProvider + from music_assistant.server.providers.player_group import PlayerGroupProvider CONF_SERVER_HOST = "snapcast_server_host" CONF_SERVER_CONTROL_PORT = "snapcast_server_control_port" @@ -276,16 +276,6 @@ class SnapCastProvider(PlayerProvider): else: return self._get_ma_id(snap_client_id) - def _can_sync_with(self, player_id: str) -> None: - mass_player = self.mass.players.get(player_id) - mass_player.can_sync_with = tuple( - self._get_ma_id(snap_client.identifier) - for snap_client in self._snapserver.clients - if self._get_ma_id(snap_client.identifier) != player_id - ) - - self.mass.players.update(mass_player.player_id) - @property def supported_features(self) -> tuple[ProviderFeature, ...]: """Return the features supported by this Provider.""" @@ -401,7 +391,6 @@ class SnapCastProvider(PlayerProvider): PlayerFeature.VOLUME_SET, PlayerFeature.VOLUME_MUTE, ), - can_sync_with=[], group_childs=set(), synced_to=self._synced_to(player_id), ) @@ -424,7 +413,6 @@ class SnapCastProvider(PlayerProvider): player.active_source = stream.name else: player.active_source = player_id - self._can_sync_with(player_id) self._group_childs(player_id) self.mass.players.update(player_id) @@ -481,16 +469,19 @@ class SnapCastProvider(PlayerProvider): for mass_child_id in list(mass_player.group_childs): if mass_child_id != player_id: await self.cmd_unsync(mass_child_id) - else: - mass_sync_master_player = self.mass.players.get(mass_player.synced_to) - mass_sync_master_player.group_childs.remove(player_id) - mass_player.synced_to = None - snap_client_id = self._get_snapclient_id(player_id) - group = self._get_snapgroup(player_id) - await group.remove_client(snap_client_id) + return + mass_sync_master_player = self.mass.players.get(mass_player.synced_to) + mass_sync_master_player.group_childs.remove(player_id) + mass_player.synced_to = None + snap_client_id = self._get_snapclient_id(player_id) + group = self._get_snapgroup(player_id) + await group.remove_client(snap_client_id) # assign default/empty stream to the player await self._get_snapgroup(player_id).set_stream("default") await self.cmd_stop(player_id=player_id) + # make sure that the player manager gets an update + self.mass.players.update(player_id, skip_redirect=True) + self.mass.players.update(mass_player.synced_to, skip_redirect=True) async def play_media(self, player_id: str, media: PlayerMedia) -> None: """Handle PLAY MEDIA on given player.""" @@ -518,8 +509,8 @@ class SnapCastProvider(PlayerProvider): ) elif media.queue_id.startswith("ugp_"): # special case: UGP stream - ugp_provider: UniversalGroupProvider = self.mass.get_provider("ugp") - ugp_stream = ugp_provider.streams[media.queue_id] + ugp_provider: PlayerGroupProvider = self.mass.get_provider("ugp") + ugp_stream = ugp_provider.ugp_streams[media.queue_id] input_format = ugp_stream.output_format audio_source = ugp_stream.subscribe() elif media.queue_id and media.queue_item_id: diff --git a/music_assistant/server/providers/sonos/__init__.py b/music_assistant/server/providers/sonos/__init__.py index 8d3f7dc2..92d20951 100644 --- a/music_assistant/server/providers/sonos/__init__.py +++ b/music_assistant/server/providers/sonos/__init__.py @@ -38,13 +38,7 @@ from music_assistant.common.models.enums import ( from music_assistant.common.models.errors import PlayerCommandFailed from music_assistant.common.models.event import MassEvent from music_assistant.common.models.player import DeviceInfo, Player, PlayerMedia -from music_assistant.constants import ( - CONF_CROSSFADE, - MASS_LOGO_ONLINE, - SYNCGROUP_PREFIX, - VERBOSE_LOG_LEVEL, -) -from music_assistant.server.helpers.util import TaskManager +from music_assistant.constants import CONF_CROSSFADE, MASS_LOGO_ONLINE, VERBOSE_LOG_LEVEL from music_assistant.server.models.player_provider import PlayerProvider if TYPE_CHECKING: @@ -314,15 +308,6 @@ class SonosPlayer: self.mass_player.volume_level = self.client.player.volume_level or 0 self.mass_player.volume_muted = self.client.player.volume_muted - # work out 'can sync with' for this player - self.mass_player.can_sync_with = tuple( - x - for x in self.prov.sonos_players - if x != self.player_id - and x in self.prov.sonos_players - and self.prov.sonos_players[x].client.household_id == self.client.household_id - ) - group_parent = None if self.client.player.is_coordinator: # player is group coordinator @@ -343,7 +328,6 @@ class SonosPlayer: self.mass_player.group_childs = set() self.mass_player.synced_to = active_group.coordinator_id self.mass_player.active_source = active_group.coordinator_id - self.mass_player.can_sync_with = () if airplay := self.get_linked_airplay_player(True): # linked airplay player is active, update media from there @@ -531,15 +515,17 @@ class SonosPlayerProvider(PlayerProvider): player_id: str, ) -> tuple[ConfigEntry, ...]: """Return Config Entries for the given player.""" - base_entries = await super().get_player_config_entries(player_id) - if not (sonos_player := self.sonos_players.get(player_id)): - # most probably a syncgroup or the player is not yet discovered - return (*base_entries, CONF_ENTRY_CROSSFADE, CONF_ENTRY_FLOW_MODE_HIDDEN_DISABLED) - return ( - *base_entries, + base_entries = ( + *await super().get_player_config_entries(player_id), CONF_ENTRY_CROSSFADE, CONF_ENTRY_FLOW_MODE_HIDDEN_DISABLED, create_sample_rates_config_entry(48000, 24, 48000, 24, True), + ) + if not (sonos_player := self.sonos_players.get(player_id)): + # most probably the player is not yet discovered + return base_entries + return ( + *base_entries, ConfigEntry( key=CONF_AIRPLAY_MODE, type=ConfigEntryType.BOOLEAN, @@ -699,16 +685,6 @@ class SonosPlayerProvider(PlayerProvider): self, player_id: str, announcement: PlayerMedia, volume_level: int | None = None ) -> None: """Handle (provider native) playback of an announcement on given player.""" - if player_id.startswith(SYNCGROUP_PREFIX): - # handle syncgroup, unwrap to all underlying child's - async with TaskManager(self.mass) as tg: - if group_player := self.mass.players.get(player_id): - # execute on all child players - for child_player_id in group_player.group_childs: - tg.create_task( - self.play_announcement(child_player_id, announcement, volume_level) - ) - return sonos_player = self.sonos_players[player_id] self.logger.debug( "Playing announcement %s using websocket audioclip on %s", @@ -743,11 +719,6 @@ class SonosPlayerProvider(PlayerProvider): self, player_id, discovery_info=discovery_info, ip_address=address ) await sonos_player.setup() - # when we add a new player, update 'can_sync_with' for all other players - for other_player_id in self.sonos_players: - if other_player_id == player_id: - continue - self.sonos_players[other_player_id].update_attributes() async def _handle_sonos_queue_itemwindow(self, request: web.Request) -> web.Response: """ diff --git a/music_assistant/server/providers/sonos_s1/player.py b/music_assistant/server/providers/sonos_s1/player.py index 92d5016a..4f479d0f 100644 --- a/music_assistant/server/providers/sonos_s1/player.py +++ b/music_assistant/server/providers/sonos_s1/player.py @@ -648,11 +648,6 @@ class SonosPlayer: self.mass_player.elapsed_time_last_updated = self.position_updated_at.timestamp() # zone topology (syncing/grouping) details - self.mass_player.can_sync_with = tuple( - x.player_id - for x in self.sonos_prov.sonosplayers.values() - if x.player_id != self.player_id - ) if self.sync_coordinator: # player is synced to another player self.mass_player.synced_to = self.sync_coordinator.player_id diff --git a/music_assistant/server/providers/ugp/__init__.py b/music_assistant/server/providers/ugp/__init__.py deleted file mode 100644 index c04dea3c..00000000 --- a/music_assistant/server/providers/ugp/__init__.py +++ /dev/null @@ -1,600 +0,0 @@ -""" -Universal Group Player provider. - -This is more like a "virtual" player provider, -allowing the user to create player groups from all players known in the system. -""" - -from __future__ import annotations - -import asyncio -from collections.abc import AsyncGenerator, Awaitable, Callable -from time import time -from typing import TYPE_CHECKING, Final, cast - -import shortuuid -from aiohttp import web - -from music_assistant.common.models.config_entries import ( - CONF_ENTRY_CROSSFADE, - CONF_ENTRY_CROSSFADE_DURATION, - CONF_ENTRY_FLOW_MODE_ENFORCED, - ConfigEntry, - ConfigValueOption, - ConfigValueType, - PlayerConfig, - create_sample_rates_config_entry, -) -from music_assistant.common.models.enums import ( - ConfigEntryType, - ContentType, - MediaType, - PlayerFeature, - PlayerState, - PlayerType, - ProviderFeature, -) -from music_assistant.common.models.media_items import AudioFormat -from music_assistant.common.models.player import DeviceInfo, Player, PlayerMedia -from music_assistant.constants import CONF_GROUP_MEMBERS, CONF_HTTP_PROFILE, SYNCGROUP_PREFIX -from music_assistant.server.controllers.streams import DEFAULT_STREAM_HEADERS -from music_assistant.server.helpers.audio import get_ffmpeg_stream -from music_assistant.server.helpers.util import TaskManager -from music_assistant.server.models.player_provider import PlayerProvider - -if TYPE_CHECKING: - from collections.abc import Iterable - - from music_assistant.common.models.config_entries import ProviderConfig - from music_assistant.common.models.provider import ProviderManifest - from music_assistant.server import MusicAssistant - from music_assistant.server.models import ProviderInstanceType - - -# ruff: noqa: ARG002 - -UGP_FORMAT = AudioFormat( - content_type=ContentType.PCM_F32LE, - sample_rate=44100, - bit_depth=32, -) -UGP_PREFIX = "ugp_" - -CONF_ACTION_CREATE_PLAYER = "create_player" -CONF_ACTION_CREATE_PLAYER_SAVE = "create_player_save" -CONF_ENTRY_SAMPLE_RATES_UGP = create_sample_rates_config_entry(44100, 16, 44100, 16, True) -CONF_GROUP_PLAYERS: Final[str] = "group_players" -CONF_NEW_GROUP_NAME: Final[str] = "name" -CONF_NEW_GROUP_MEMBERS: Final[list[str]] = "members" - -CONFIG_ENTRY_UGP_NOTE = ConfigEntry( - key="ugp_note", - type=ConfigEntryType.LABEL, - label="Please note that although the universal group " - "allows you to group any player, it will not enable audio sync " - "between players of different ecosystems. It is advised to always use native " - "player groups or sync groups when available for your player type(s) and use " - "the Universal Group only to group players of different ecosystems.", - required=False, -) - - -async def setup( - mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig -) -> ProviderInstanceType: - """Initialize provider(instance) with given configuration.""" - return UniversalGroupProvider(mass, manifest, config) - - -async def get_config_entries( - mass: MusicAssistant, - instance_id: str | None = None, - action: str | None = None, - values: dict[str, ConfigValueType] | None = None, -) -> tuple[ConfigEntry, ...]: - """ - Return Config entries to setup this provider. - - instance_id: id of an existing provider instance (None if new instance setup). - action: [optional] action key called from config entries UI. - values: the (intermediate) raw values for config entries sent with the action. - """ - if not (ugp_provider := mass.get_provider(instance_id)): - # UGP provider is not (yet) loaded - return () - if TYPE_CHECKING: - ugp_provider = cast(UniversalGroupProvider, ugp_provider) - if action == CONF_ACTION_CREATE_PLAYER: - # create new group player - name = values.pop(CONF_NEW_GROUP_NAME) - members: list[str] = values.pop(CONF_GROUP_MEMBERS) - members = ugp_provider._filter_members(members) - await ugp_provider.create_group(name, members) - return ( - ConfigEntry( - key="ugp_note", - type=ConfigEntryType.LABEL, - label=f"Your new Universal Group Player {name} has been created and " - "is available in the players list.", - required=False, - ), - ) - return ( - ConfigEntry( - key="ugp_new", - type=ConfigEntryType.LABEL, - label="Fill in the details below to create a new Universal Group " - "Player and click the 'Create new universal group' button.", - required=False, - ), - ConfigEntry( - key=CONF_NEW_GROUP_NAME, - type=ConfigEntryType.STRING, - label="Name", - required=False, - ), - ConfigEntry( - key=CONF_GROUP_MEMBERS, - type=ConfigEntryType.STRING, - label=CONF_NEW_GROUP_MEMBERS, - default_value=[], - options=tuple( - ConfigValueOption(x.display_name, x.player_id) - for x in mass.players.all(True, False) - ), - multi_value=True, - required=False, - ), - ConfigEntry( - key=CONF_ACTION_CREATE_PLAYER, - type=ConfigEntryType.ACTION, - label="Create new Universal Player Group", - required=False, - ), - CONFIG_ENTRY_UGP_NOTE, - ) - - -class UniversalGroupProvider(PlayerProvider): - """Base/builtin provider for universally grouping players.""" - - @property - def supported_features(self) -> tuple[ProviderFeature, ...]: - """Return the features supported by this Provider.""" - return () - - def __init__( - self, mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig - ) -> None: - """Initialize MusicProvider.""" - super().__init__(mass, manifest, config) - self._registered_routes: set[str] = set() - self.streams: dict[str, UGPStream] = {} - - async def loaded_in_mass(self) -> None: - """Call after the provider has been loaded.""" - await self._register_all_players() - - async def unload(self) -> None: - """ - Handle unload/close of the provider. - - Called when provider is deregistered (e.g. MA exiting or config reloading). - """ - for route_path in list(self._registered_routes): - self._registered_routes.remove(route_path) - self.mass.streams.unregister_dynamic_route(route_path) - - async def get_player_config_entries(self, player_id: str) -> tuple[ConfigEntry]: - """Return all (provider/player specific) Config Entries for the given player (if any).""" - base_entries = await super().get_player_config_entries(player_id) - return ( - *base_entries, - CONF_ENTRY_FLOW_MODE_ENFORCED, - ConfigEntry( - key=CONF_GROUP_MEMBERS, - type=ConfigEntryType.STRING, - label="Group members", - default_value=[], - options=tuple( - ConfigValueOption(x.display_name, x.player_id) - for x in self.mass.players.all(True, False) - if x.player_id != player_id and not x.player_id.startswith(UGP_PREFIX) - ), - description="Select all players you want to be part of this universal group", - multi_value=True, - required=True, - ), - CONFIG_ENTRY_UGP_NOTE, - CONF_ENTRY_CROSSFADE, - CONF_ENTRY_CROSSFADE_DURATION, - CONF_ENTRY_SAMPLE_RATES_UGP, - ) - - def on_player_config_changed(self, config: PlayerConfig, changed_keys: set[str]) -> None: - """Call (by config manager) when the configuration of a player changes.""" - if f"values/{CONF_GROUP_MEMBERS}" in changed_keys: - player = self.mass.players.get(config.player_id) - members = config.get_value(CONF_GROUP_MEMBERS) - # ensure we filter invalid members - members = self._filter_members(members) - player.group_childs = members - self.mass.config.set_raw_player_config_value( - config.player_id, CONF_GROUP_PLAYERS, members - ) - self.mass.players.update(config.player_id) - - def on_player_config_removed(self, player_id: str) -> None: - """Call (by config manager) when the configuration of a player is removed.""" - # ensure that any group players get removed - group_players = self.mass.config.get_raw_provider_config_value( - self.instance_id, CONF_GROUP_PLAYERS, {} - ) - if player_id in group_players: - del group_players[player_id] - self.mass.config.set_raw_provider_config_value( - self.instance_id, CONF_GROUP_PLAYERS, group_players - ) - - async def cmd_stop(self, player_id: str) -> None: - """Send STOP command to given player.""" - group_player = self.mass.players.get(player_id) - group_player.state = PlayerState.IDLE - self.mass.players.update(player_id) - # forward command to player and any connected sync child's - async with TaskManager(self.mass) as tg: - for member in self.mass.players.iter_group_members(group_player, only_powered=True): - if member.state == PlayerState.IDLE: - continue - tg.create_task(self.mass.players.cmd_stop(member.player_id)) - if (stream := self.streams.pop(player_id, None)) and not stream.done: - await stream.stop() - - async def cmd_play(self, player_id: str) -> None: - """Send PLAY command to given player.""" - - async def cmd_power(self, player_id: str, powered: bool) -> None: - """Send POWER command to given UGP group player.""" - group_player = self.mass.players.get(player_id, True) - - if group_player.powered == powered: - return # nothing to do - - # make sure to update the group power state - group_player.powered = powered - - any_member_powered = False - async with TaskManager(self.mass) as tg: - for member in self.mass.players.iter_group_members(group_player, only_powered=True): - any_member_powered = True - if powered: - if member.state in (PlayerState.PLAYING, PlayerState.PAUSED): - # stop playing existing content on member if we start the group player - tg.create_task(self.cmd_stop(member.player_id)) - # set active source to group player if the group (is going to be) powered - member.active_group = group_player.active_group - member.active_source = group_player.active_source - self.mass.players.update(member.player_id, skip_forward=True) - else: - # turn off child player when group turns off - tg.create_task(self.cmd_power(member.player_id, False)) - # reset active source on player - member.active_source = None - member.active_group = None - self.mass.players.update(member.player_id, skip_forward=True) - # edge case: group turned on but no members are powered, power them all! - # TODO: Do we want to make this configurable ? - if not any_member_powered and powered: - for member in self.mass.players.iter_group_members( - group_player, only_powered=False - ): - tg.create_task(self.cmd_power(member.player_id, True)) - member.active_group = group_player.player_id - member.active_source = group_player.active_source - - self.mass.players.update(player_id) - - async def cmd_volume_set(self, player_id: str, volume_level: int) -> None: - """Send VOLUME_SET command to given player.""" - # group volume is already handled in the player manager - - async def play_media( - self, - player_id: str, - media: PlayerMedia, - ) -> None: - """Handle PLAY MEDIA on given player.""" - # power ON - await self.cmd_power(player_id, True) - group_player = self.mass.players.get(player_id) - # stop any existing stream first - if (existing := self.streams.pop(player_id, None)) and not existing.done: - await existing.stop() - - # select audio source - if media.media_type == MediaType.ANNOUNCEMENT: - # special case: stream announcement - audio_source = self.mass.streams.get_announcement_stream( - media.custom_data["url"], - output_format=UGP_FORMAT, - use_pre_announce=media.custom_data["use_pre_announce"], - ) - elif media.queue_id and media.queue_item_id: - # regular queue stream request - audio_source = self.mass.streams.get_flow_stream( - queue=self.mass.player_queues.get(media.queue_id), - start_queue_item=self.mass.player_queues.get_item( - media.queue_id, media.queue_item_id - ), - pcm_format=UGP_FORMAT, - ) - else: - # assume url or some other direct path - # NOTE: this will fail if its an uri not playable by ffmpeg - audio_source = get_ffmpeg_stream( - audio_input=media.uri, - input_format=AudioFormat(ContentType.try_parse(media.uri)), - output_format=UGP_FORMAT, - ) - - # start the stream task - self.streams[player_id] = UGPStream(audio_source=audio_source, audio_format=UGP_FORMAT) - base_url = f"{self.mass.streams.base_url}/ugp/{player_id}.aac" - - # forward to downstream play_media commands - async with TaskManager(self.mass) as tg: - for member in self.mass.players.iter_group_members(group_player, only_powered=True): - tg.create_task( - self.mass.players.play_media( - member.player_id, - media=PlayerMedia( - uri=f"{base_url}?player_id={member.player_id}", - media_type=MediaType.FLOW_STREAM, - title=group_player.display_name, - queue_id=group_player.player_id, - ), - ) - ) - # set the state optimistically - group_player.elapsed_time = 0 - group_player.elapsed_time_last_updated = time() - 1 - group_player.state = PlayerState.PLAYING - self.mass.players.update(player_id) - - async def create_group(self, name: str, members: list[str]) -> Player: - """Create new PlayerGroup on this provider. - - Create a new PlayerGroup with given name and members. - - - name: Name for the new group to create. - - members: A list of player_id's that should be part of this group. - """ - new_group_id = f"{UGP_PREFIX}{shortuuid.random(8).lower()}" - # cleanup list, filter groups (should be handled by frontend, but just in case) - members = self._filter_members(members) - # create default config with the user chosen name - self.mass.config.create_default_player_config( - new_group_id, - self.instance_id, - name=name, - enabled=True, - values={CONF_GROUP_MEMBERS: members}, - ) - return self._register_group_player(new_group_id, name=name, members=members) - - async def _register_all_players(self) -> None: - """Register all (virtual/fake) group players in the Player controller.""" - player_configs = await self.mass.config.get_player_configs( - self.instance_id, include_values=True - ) - for player_config in player_configs: - members = player_config.get_value(CONF_GROUP_MEMBERS) - self._register_group_player( - player_config.player_id, - player_config.name or player_config.default_name, - members, - ) - - def _register_group_player( - self, group_player_id: str, name: str, members: Iterable[str] - ) -> Player: - """Register a UGP group player in the Player controller.""" - player = Player( - player_id=group_player_id, - provider=self.instance_id, - type=PlayerType.GROUP, - name=name, - available=True, - powered=False, - device_info=DeviceInfo(model="Group", manufacturer=self.name), - supported_features=(PlayerFeature.VOLUME_SET, PlayerFeature.POWER), - group_childs=set(members), - ) - self.mass.players.register_or_update(player) - # register dynamic route for the ugp stream - route_path = f"/ugp/{group_player_id}.aac" - self.mass.streams.register_dynamic_route(route_path, self._serve_ugp_stream) - self._registered_routes.add(route_path) - - return player - - def on_group_child_power( - self, group_player: Player, child_player: Player, new_power: bool, group_state: PlayerState - ) -> None: - """ - Call when a power command was executed on one of the child player of a PlayerGroup. - - The group state is sent with the state BEFORE the power command was executed. - """ - if not group_player.powered: - # guard, this should be caught in the player controller but just in case... - return None - - powered_childs = [ - x - for x in self.mass.players.iter_group_members(group_player, True) - if not (not new_power and x.player_id == child_player.player_id) - ] - if new_power and child_player not in powered_childs: - powered_childs.append(child_player) - - # if the last player of a group turned off, turn off the group - if len(powered_childs) == 0: - self.logger.debug( - "Group %s has no more powered members, turning off group player", - group_player.display_name, - ) - self.mass.create_task(self.cmd_power(group_player.player_id, False)) - return False - - # if a child player turned ON while the group player is already playing - # we just direct it to the existing stream (we dont care about the audio being in sync) - if new_power and group_state == PlayerState.PLAYING: - base_url = f"{self.mass.streams.base_url}/ugp/{group_player.player_id}.aac" - self.mass.create_task( - self.mass.players.play_media( - child_player.player_id, - media=PlayerMedia( - uri=f"{base_url}?player_id={child_player.player_id}", - media_type=MediaType.FLOW_STREAM, - title=group_player.display_name, - queue_id=group_player.player_id, - ), - ) - ) - - return None - - async def _serve_ugp_stream(self, request: web.Request) -> web.Response: - """Serve the UGP (multi-client) flow stream audio to a player.""" - ugp_player_id = request.path.rsplit(".")[0].rsplit("/")[-1] - child_player_id = request.query.get("player_id") # optional! - - if not (ugp_player := self.mass.players.get(ugp_player_id)): - raise web.HTTPNotFound(reason=f"Unknown UGP player: {ugp_player_id}") - - if not (stream := self.streams.get(ugp_player_id, None)) or stream.done: - raise web.HTTPNotFound(body=f"There is no active UGP stream for {ugp_player_id}!") - - http_profile: str = await self.mass.config.get_player_config_value( - child_player_id, CONF_HTTP_PROFILE - ) - headers = { - **DEFAULT_STREAM_HEADERS, - "Content-Type": "audio/aac", - "Accept-Ranges": "none", - "Cache-Control": "no-cache", - "Connection": "close", - } - - resp = web.StreamResponse(status=200, reason="OK", headers=headers) - if http_profile == "forced_content_length": - resp.content_length = 4294967296 - elif http_profile == "chunked": - resp.enable_chunked_encoding() - - await resp.prepare(request) - - # return early if this is not a GET request - if request.method != "GET": - return resp - - # all checks passed, start streaming! - self.logger.debug( - "Start serving UGP flow audio stream for UGP-player %s to %s", - ugp_player.display_name, - child_player_id or request.remote, - ) - async for chunk in stream.subscribe(): - try: - await resp.write(chunk) - except (ConnectionError, ConnectionResetError): - break - - return resp - - def _filter_members(self, members: list[str]) -> list[str]: - """Filter out members that are not valid players.""" - # cleanup members - filter out impossible choices - syncgroup_childs: list[str] = [] - for member in members: - if not member.startswith(SYNCGROUP_PREFIX): - continue - if syncgroup := self.mass.players.get(member): - syncgroup_childs.extend(syncgroup.group_childs) - # we filter out other UGP players and syncgroup childs - # if their parent is already in the list - return [ - x - for x in members - if self.mass.players.get(x) - and x not in syncgroup_childs - and not x.startswith(UGP_PREFIX) - ] - - -class UGPStream: - """ - Implementation of a Stream for the Universal Group Player. - - Basiclaly this is like a fake radio radio stream (AAC) format with multiple subscribers. - The AAC format is chosen because it is widely supported and has a good balance between - quality and bandwidth and also allows for mid-stream joining of (extra) players. - """ - - def __init__( - self, - audio_source: AsyncGenerator[bytes, None], - audio_format: AudioFormat, - ) -> None: - """Initialize UGP Stream.""" - self.audio_source = audio_source - self.input_format = audio_format - self.output_format = AudioFormat(content_type=ContentType.AAC) - self.subscribers: list[Callable[[bytes], Awaitable]] = [] - self._task: asyncio.Task | None = None - self._done: asyncio.Event = asyncio.Event() - - @property - def done(self) -> bool: - """Return if this stream is already done.""" - return self._done.is_set() and self._task and self._task.done() - - async def stop(self) -> None: - """Stop/cancel the stream.""" - if self._done.is_set(): - return - if self._task and not self._task.done(): - self._task.cancel() - self._done.set() - - async def subscribe(self) -> AsyncGenerator[bytes, None]: - """Subscribe to the raw/unaltered audio stream.""" - # start the runner as soon as the (first) client connects - if not self._task: - self._task = asyncio.create_task(self._runner()) - queue = asyncio.Queue(1) - try: - self.subscribers.append(queue.put) - while True: - chunk = await queue.get() - if not chunk: - break - yield chunk - finally: - self.subscribers.remove(queue.put) - - async def _runner(self) -> None: - """Run the stream for the given audio source.""" - await asyncio.sleep(0.25) # small delay to allow subscribers to connect - async for chunk in get_ffmpeg_stream( - audio_input=self.audio_source, - input_format=self.input_format, - output_format=self.output_format, - # TODO: enable readrate limiting + initial burst once we have a newer ffmpeg version - # extra_input_args=["-readrate", "1.15"], - ): - await asyncio.gather(*[sub(chunk) for sub in self.subscribers], return_exceptions=True) - # empty chunk when done - await asyncio.gather(*[sub(b"") for sub in self.subscribers], return_exceptions=True) - self._done.set() diff --git a/music_assistant/server/providers/ugp/manifest.json b/music_assistant/server/providers/ugp/manifest.json deleted file mode 100644 index f465b11c..00000000 --- a/music_assistant/server/providers/ugp/manifest.json +++ /dev/null @@ -1,14 +0,0 @@ -{ - "type": "player", - "domain": "ugp", - "name": "Universal Group Player", - "description": "Create Player Groups with your favorite players, regardless of type and model.", - "codeowners": [ - "@music-assistant" - ], - "requirements": [], - "documentation": "https://music-assistant.io/player-support/universal/", - "multi_instance": false, - "builtin": false, - "icon": "speaker-multiple" -} diff --git a/music_assistant/server/server.py b/music_assistant/server/server.py index aa119d3f..1d4283a4 100644 --- a/music_assistant/server/server.py +++ b/music_assistant/server/server.py @@ -404,12 +404,21 @@ class MusicAssistant: command: str, handler: Callable, ) -> None: - """Dynamically register a command on the API.""" + """ + Dynamically register a command on the API. + + Returns handle to unregister. + """ if command in self.command_handlers: msg = f"Command {command} is already registered" raise RuntimeError(msg) self.command_handlers[command] = APICommandHandler.parse(command, handler) + def unregister() -> None: + self.command_handlers.pop(command) + + return unregister + async def load_provider_config( self, prov_conf: ProviderConfig, -- 2.34.1