From f712b4978d5b8b93c5fc2615dcff44c559b2defd Mon Sep 17 00:00:00 2001 From: Marcel van der Veldt Date: Sun, 21 Jan 2024 02:07:04 +0100 Subject: [PATCH] Better support for Player groups (#1011) --- .vscode/settings.json | 4 +- .../common/models/config_entries.py | 15 - music_assistant/common/models/enums.py | 14 +- music_assistant/common/models/player.py | 6 +- music_assistant/constants.py | 4 +- music_assistant/server/controllers/config.py | 44 +- .../server/controllers/player_queues.py | 11 +- music_assistant/server/controllers/players.py | 563 +++++++++++++----- music_assistant/server/controllers/streams.py | 44 +- .../server/models/player_provider.py | 74 ++- music_assistant/server/models/provider.py | 3 + .../server/providers/airplay/__init__.py | 19 +- .../server/providers/chromecast/__init__.py | 46 +- .../server/providers/chromecast/helpers.py | 4 +- .../server/providers/dlna/__init__.py | 31 + .../server/providers/slimproto/__init__.py | 34 +- .../server/providers/snapcast/__init__.py | 63 +- .../server/providers/snapcast/icon.svg | 49 +- .../server/providers/sonos/__init__.py | 50 +- .../server/providers/ugp/__init__.py | 509 +++++----------- 20 files changed, 969 insertions(+), 618 deletions(-) diff --git a/.vscode/settings.json b/.vscode/settings.json index ae9f98ef..9451796b 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -2,8 +2,8 @@ "[python]": { "editor.formatOnSave": true, "editor.codeActionsOnSave": { - "source.fixAll": true, - "source.organizeImports": true + "source.fixAll": "explicit", + "source.organizeImports": "explicit" } }, "python.formatting.provider": "black", diff --git a/music_assistant/common/models/config_entries.py b/music_assistant/common/models/config_entries.py index 27a519f5..bc02e6ba 100644 --- a/music_assistant/common/models/config_entries.py +++ b/music_assistant/common/models/config_entries.py @@ -18,7 +18,6 @@ from music_assistant.constants import ( CONF_EQ_MID, CONF_EQ_TREBLE, CONF_FLOW_MODE, - CONF_HIDE_GROUP_CHILDS, CONF_LOG_LEVEL, CONF_OUTPUT_CHANNELS, CONF_VOLUME_NORMALIZATION, @@ -385,20 +384,6 @@ CONF_ENTRY_EQ_TREBLE = ConfigEntry( advanced=True, ) -CONF_ENTRY_HIDE_GROUP_MEMBERS = ConfigEntry( - key=CONF_HIDE_GROUP_CHILDS, - type=ConfigEntryType.STRING, - options=[ - ConfigValueOption("Always", "always"), - ConfigValueOption("Only if the group is active/powered", "active"), - ConfigValueOption("Never", "never"), - ], - default_value="active", - label="Hide playergroup members in UI", - description="Hide the individual player entry for the members of this group " - "in the user interface.", - advanced=False, -) CONF_ENTRY_CROSSFADE = ConfigEntry( key=CONF_CROSSFADE, diff --git a/music_assistant/common/models/enums.py b/music_assistant/common/models/enums.py index 15398b72..b46c0e11 100644 --- a/music_assistant/common/models/enums.py +++ b/music_assistant/common/models/enums.py @@ -200,13 +200,13 @@ class PlayerType(StrEnum): """Enum with possible Player Types. player: A regular player. - group: A (dedicated) group player or playergroup. - stereo_pair: Two speakers playing as one stereo pair. + group: A (dedicated) group player or (universal) playergroup. + sync_group: A group/preset of players that can be synced together. """ PLAYER = "player" GROUP = "group" - STEREO_PAIR = "stereo_pair" + SYNC_GROUP = "sync_group" class PlayerFeature(StrEnum): @@ -218,8 +218,7 @@ class PlayerFeature(StrEnum): sync: The player supports syncing with other players (of the same platform). accurate_time: The player provides millisecond accurate timing information. seek: The player supports seeking to a specific. - set_members: The PlayerGroup supports adding/removing members. - queue: The player supports (en)queuing of media items. + queue: The player supports (en)queuing of media items natively. """ POWER = "power" @@ -227,10 +226,8 @@ class PlayerFeature(StrEnum): VOLUME_MUTE = "volume_mute" PAUSE = "pause" SYNC = "sync" - ACCURATE_TIME = "accurate_time" SEEK = "seek" ENQUEUE_NEXT = "enqueue_next" - CROSSFADE = "crossfade" class EventType(StrEnum): @@ -296,7 +293,8 @@ class ProviderFeature(StrEnum): # # PLAYERPROVIDER FEATURES # - # we currently have none ;-) + PLAYER_GROUP_CREATE = "player_group_create" + SYNC_PLAYERS = "sync_players" # # METADATAPROVIDER FEATURES diff --git a/music_assistant/common/models/player.py b/music_assistant/common/models/player.py index 20d3660e..8dbeb126 100644 --- a/music_assistant/common/models/player.py +++ b/music_assistant/common/models/player.py @@ -81,11 +81,12 @@ class Player(DataClassDictMixin): # enabled: if the player is enabled # will be set by the player manager based on config # a disabled player is hidden in the UI and updates will not be processed + # nor will it be added to the HA integration enabled: bool = True # hidden_by: if the player is enabled # will be set by the player manager based on config - # a disabled player is hidden in the UI only + # a disabled player is hidden in the UI only but can still be controlled hidden_by: set = field(default_factory=set) # group_volume: if the player is a player group or syncgroup master, @@ -101,9 +102,6 @@ class Player(DataClassDictMixin): # and pass along freely extra_data: dict[str, Any] = field(default_factory=dict) - # mute_as_power: special feature from the universal group - mute_as_power: bool = False - @property def corrected_elapsed_time(self) -> float: """Return the corrected/realtime elapsed time.""" diff --git a/music_assistant/constants.py b/music_assistant/constants.py index b96d78b9..145e655a 100755 --- a/music_assistant/constants.py +++ b/music_assistant/constants.py @@ -44,13 +44,14 @@ CONF_OUTPUT_CHANNELS: Final[str] = "output_channels" CONF_FLOW_MODE: Final[str] = "flow_mode" CONF_LOG_LEVEL: Final[str] = "log_level" CONF_HIDE_GROUP_CHILDS: Final[str] = "hide_group_childs" -CONF_GROUPED_POWER_ON: Final[str] = "grouped_power_on" CONF_CROSSFADE_DURATION: Final[str] = "crossfade_duration" CONF_BIND_IP: Final[str] = "bind_ip" CONF_BIND_PORT: Final[str] = "bind_port" CONF_PUBLISH_IP: Final[str] = "publish_ip" CONF_AUTO_PLAY: Final[str] = "auto_play" +CONF_GROUP_PLAYERS: Final[str] = "group_players" CONF_CROSSFADE: Final[str] = "crossfade" +CONF_GROUP_MEMBERS: Final[str] = "group_members" # config default values DEFAULT_HOST: Final[str] = "0.0.0.0" @@ -85,3 +86,4 @@ CONFIGURABLE_CORE_CONTROLLERS = ( "music", "player_queues", ) +SYNCGROUP_PREFIX: Final[str] = "syncgroup_" diff --git a/music_assistant/server/controllers/config.py b/music_assistant/server/controllers/config.py index 7aaa3ade..465e4e54 100644 --- a/music_assistant/server/controllers/config.py +++ b/music_assistant/server/controllers/config.py @@ -360,7 +360,10 @@ class ConfigController: Note that this only returns the stored value without any validation or default. """ - return self.get(f"{CONF_PLAYERS}/{player_id}/values/{key}", default) + return self.get( + f"{CONF_PLAYERS}/{player_id}/values/{key}", + self.get(f"{CONF_PLAYERS}/{player_id}/{key}", default), + ) @api_command("config/players/save") async def save_player_config( @@ -417,7 +420,12 @@ class ConfigController: provider.on_player_config_removed(player_id) def create_default_player_config( - self, player_id: str, provider: str, name: str, enabled: bool + self, + player_id: str, + provider: str, + name: str, + enabled: bool, + values: dict[str, ConfigValueType] | None = None, ) -> None: """ Create default/empty PlayerConfig. @@ -428,16 +436,20 @@ class ConfigController: # return early if the config already exists if self.get(f"{CONF_PLAYERS}/{player_id}"): # update default name if needed - self.set(f"{CONF_PLAYERS}/{player_id}/default_name", name) + if name: + self.set(f"{CONF_PLAYERS}/{player_id}/default_name", name) return # config does not yet exist, create a default one conf_key = f"{CONF_PLAYERS}/{player_id}" default_conf = PlayerConfig( values={}, provider=provider, player_id=player_id, enabled=enabled, default_name=name ) + default_conf_raw = default_conf.to_raw() + if values is not None: + default_conf_raw["values"] = values self.set( conf_key, - default_conf.to_raw(), + default_conf_raw, ) async def create_default_provider_config(self, provider_domain: str) -> None: @@ -556,6 +568,26 @@ class ConfigController: """ return self.get(f"{CONF_CORE}/{core_module}/{key}", default) + def get_raw_provider_config_value( + self, provider_instance: str, key: str, default: ConfigValueType = None + ) -> ConfigValueType: + """ + Return (raw) single config(entry) value for a provider. + + Note that this only returns the stored value without any validation or default. + """ + return self.get(f"{CONF_PROVIDERS}/{provider_instance}/{key}", default) + + def set_raw_provider_config_value( + self, provider_instance: str, key: str, value: ConfigValueType + ) -> None: + """ + Set (raw) single config(entry) value for a provider. + + Note that this only returns the stored value without any validation or default. + """ + return self.set(f"{CONF_PROVIDERS}/{provider_instance}/{key}", value) + def save(self, immediate: bool = False) -> None: """Schedule save of data to disk.""" self._value_cache = {} @@ -646,9 +678,7 @@ class ConfigController: await self.mass.unload_provider(config.instance_id) if config.type == ProviderType.PLAYER: # cleanup entries in player manager - for player in self.mass.players.all( - return_unavailable=True, return_hidden=True, return_disabled=True - ): + for player in self.mass.players.all(return_unavailable=True, return_disabled=True): if player.provider != instance_id: continue self.mass.players.remove(player.player_id, cleanup_config=False) diff --git a/music_assistant/server/controllers/player_queues.py b/music_assistant/server/controllers/player_queues.py index 55aa3b57..7698f724 100755 --- a/music_assistant/server/controllers/player_queues.py +++ b/music_assistant/server/controllers/player_queues.py @@ -463,11 +463,6 @@ class PlayerQueuesController(CoreController): assert queue.current_item.media_item.media_type == MediaType.TRACK assert queue.current_item.duration assert position < queue.current_item.duration - player = self.mass.players.get(queue_id) - if PlayerFeature.SEEK in player.supported_features: - player_prov = self.mass.players.get_player_provider(queue_id) - await player_prov.cmd_seek(player.player_id, position) - return await self.play_index(queue_id, queue.current_index, position) @api_command("players/queue/resume") @@ -527,8 +522,7 @@ class PlayerQueuesController(CoreController): queue.index_in_buffer = index queue.flow_mode_start_index = index queue.flow_mode = False # reset - player_prov = self.mass.players.get_player_provider(queue_id) - await player_prov.play_media( + await self.mass.players.play_media( player_id=queue_id, queue_item=queue_item, seek_position=int(seek_position), @@ -872,10 +866,9 @@ class PlayerQueuesController(CoreController): return # already enqueued async def _enqueue_next(index: int): - player_prov = self.mass.players.get_player_provider(player.player_id) with suppress(QueueEmpty): next_item = await self.preload_next_item(queue.queue_id, index) - await player_prov.enqueue_next_queue_item( + await self.mass.players.enqueue_next_queue_item( player_id=player.player_id, queue_item=next_item ) diff --git a/music_assistant/server/controllers/players.py b/music_assistant/server/controllers/players.py index 6701dff1..c2e32bbe 100755 --- a/music_assistant/server/controllers/players.py +++ b/music_assistant/server/controllers/players.py @@ -4,29 +4,35 @@ from __future__ import annotations import asyncio import functools import logging -from collections.abc import Awaitable, Callable, Coroutine, Iterator +from collections.abc import Awaitable, Callable, Coroutine, Iterable, Iterator from typing import TYPE_CHECKING, Any, Concatenate, ParamSpec, TypeVar, cast +import shortuuid + from music_assistant.common.helpers.util import get_changed_values from music_assistant.common.models.enums import ( EventType, PlayerFeature, PlayerState, PlayerType, + ProviderFeature, ProviderType, ) from music_assistant.common.models.errors import ( AlreadyRegisteredError, PlayerCommandFailed, PlayerUnavailableError, + ProviderUnavailableError, UnsupportedFeaturedException, ) -from music_assistant.common.models.player import Player +from music_assistant.common.models.player import DeviceInfo, Player +from music_assistant.common.models.queue_item import QueueItem from music_assistant.constants import ( CONF_AUTO_PLAY, - CONF_HIDE_GROUP_CHILDS, + CONF_GROUP_MEMBERS, CONF_PLAYERS, ROOT_LOGGER_NAME, + SYNCGROUP_PREFIX, ) from music_assistant.server.helpers.api import api_command from music_assistant.server.models.core_controller import CoreController @@ -108,16 +114,13 @@ class PlayerController(CoreController): def all( self, return_unavailable: bool = True, - return_hidden: bool = True, return_disabled: bool = False, ) -> tuple[Player, ...]: """Return all registered players.""" return tuple( player for player in self._players.values() - if (player.available or return_unavailable) - and (not player.hidden_by or return_hidden) - and (player.enabled or return_disabled) + if (player.available or return_unavailable) and (player.enabled or return_disabled) ) @api_command("players/get") @@ -176,6 +179,9 @@ class PlayerController(CoreController): if not player.enabled: return + # initialize sync groups as soon as a player is registered + self.mass.loop.create_task(self._register_syncgroups()) + LOGGER.info( "Player registered: %s/%s", player_id, @@ -224,7 +230,7 @@ class PlayerController(CoreController): player.active_source = self._get_active_source(player) # calculate group volume player.group_volume = self._get_group_volume_level(player) - if player.type == PlayerType.GROUP: + if player.type in (PlayerType.GROUP, PlayerType.SYNC_GROUP): player.volume_level = player.group_volume # prefer any overridden name from config player.display_name = ( @@ -232,14 +238,26 @@ class PlayerController(CoreController): or player.name or player.player_id ) - # handle special mute_as_power feature - if player.mute_as_power: - player.powered = player.powered and not player.volume_muted - elif player.state == PlayerState.PLAYING and not player.powered: + if ( + not player.powered + and player.state == PlayerState.PLAYING + and PlayerFeature.POWER not in player.supported_features + and player.active_source == player_id + ): # mark player as powered if its playing # could happen for players that do not officially support power commands player.powered = True + # handle syncgroup - get attributes from first player that has this group as source + if player.player_id.startswith(SYNCGROUP_PREFIX): + if sync_leader := self.get_sync_leader(player): + player.state = sync_leader.state + player.current_item_id = sync_leader.current_item_id + player.elapsed_time = sync_leader.elapsed_time + player.elapsed_time_last_updated = sync_leader.elapsed_time_last_updated + else: + player.state = PlayerState.IDLE + # basic throttle: do not send state changed events if player did not actually change prev_state = self._prev_states.get(player_id, {}) new_state = self._players[player_id].to_dict() @@ -264,31 +282,21 @@ class PlayerController(CoreController): if skip_forward: return - if player.type == PlayerType.GROUP: - # update group player child's when parent updates - hide_group_childs = self.mass.config.get_raw_player_config_value( - player.player_id, CONF_HIDE_GROUP_CHILDS, "active" - ) - for child_player in self._get_child_players(player): + # update/signal group player(s) child's when group updates + if player.type in (PlayerType.GROUP, PlayerType.SYNC_GROUP): + for child_player in self.iter_group_members(player): if child_player.player_id == player.player_id: continue - # handle 'hide group childs' feature here - if hide_group_childs == "always": # noqa: SIM114 - child_player.hidden_by.add(player.player_id) - elif player.powered and hide_group_childs == "active": - child_player.hidden_by.add(player.player_id) - elif not player.powered and player.player_id in child_player.hidden_by: - child_player.hidden_by.remove(player.player_id) self.update(child_player.player_id, skip_forward=True) - - # update group player(s) when child updates - for group_player in self._get_player_groups(player_id): - if not group_player.available: - continue + # update/signal group player(s) when child updates + for group_player in self._get_player_groups(player, powered_only=False): player_prov = self.get_player_provider(group_player.player_id) if not player_prov: continue - self.mass.create_task(player_prov.poll_player(group_player.player_id)) + if group_player.player_id.startswith(SYNCGROUP_PREFIX): + self.update(group_player.player_id, skip_forward=True) + else: + self.mass.create_task(player_prov.poll_player(group_player.player_id)) def get_player_provider(self, player_id: str) -> PlayerProvider: """Return PlayerProvider for given player.""" @@ -378,67 +386,54 @@ class PlayerController(CoreController): - player_id: player_id of the player to handle the command. - powered: bool if player should be powered on or off. """ - # TODO: Implement PlayerControl + if player_id.startswith(SYNCGROUP_PREFIX): + await self.cmd_group_power(player_id, powered) + return player = self.get(player_id, True) - cur_power = ( - (player.powered and not player.volume_muted) if player.mute_as_power else player.powered - ) - if cur_power == powered: + if player.powered == powered: return # nothing to do + # inform (active) group player if needed + # NOTE: this must be on the top to prevent race conditions + if active_group_player := self._get_active_player_group(player): + if active_group_player.player_id.startswith(SYNCGROUP_PREFIX): + self._on_syncgroup_child_power( + active_group_player.player_id, player.player_id, powered + ) + elif player_prov := self.get_player_provider(active_group_player.player_id): + player_prov.on_child_power(active_group_player.player_id, player.player_id, powered) # stop player at power off if ( not powered and player.state in (PlayerState.PLAYING, PlayerState.PAUSED) and not player.synced_to - and not player.mute_as_power + and player.powered ): await self.cmd_stop(player_id) # unsync player at power off - if not powered and not player.mute_as_power: + if not powered: if player.synced_to is not None: await self.cmd_unsync(player_id) - for child in self._get_child_players(player): + for child in self.iter_group_members(player): if not child.synced_to: continue await self.cmd_unsync(child.player_id) - if player.mute_as_power: - # handle mute as power feature - await self.cmd_volume_mute(player_id, not powered) - - # restore mute if needed on poweroff - if ( - not powered - and player.volume_muted - and not player.mute_as_power - and PlayerFeature.VOLUME_MUTE not in player.supported_features - ): - await self.cmd_volume_mute(player_id, False) - - if PlayerFeature.POWER not in player.supported_features: - # player does not support power, use fake state instead - player.powered = powered - self.update(player_id) - elif powered or not player.mute_as_power: - # regular power command + if PlayerFeature.POWER in player.supported_features: + # forward to player provider player_provider = self.get_player_provider(player_id) await player_provider.cmd_power(player_id, powered) - # handle forward to (active) group player if needed - for group_player in self._get_player_groups(player_id): - if not group_player.available: - continue - if not group_player.powered: - continue - if player_prov := self.get_player_provider(group_player.player_id): - await player_prov.on_child_power(group_player.player_id, player, powered) - break - else: - # auto play feature - if powered and self.mass.config.get_raw_player_config_value( - player_id, CONF_AUTO_PLAY, False - ): - await self.mass.player_queues.resume(player_id) + # always optimistically set the power state to update the UI + # as fast as possible and prevent race conditions + player.powered = powered + self.update(player_id) + # handle 'auto play on power on' feature + if ( + powered + and self.mass.config.get_raw_player_config_value(player_id, CONF_AUTO_PLAY, False) + and player.active_source in (None, player_id) + ): + await self.mass.player_queues.resume(player_id) @api_command("players/cmd/volume_set") @log_player_command @@ -450,18 +445,14 @@ class PlayerController(CoreController): """ # TODO: Implement PlayerControl player = self.get(player_id, True) - if player.type == PlayerType.GROUP: + if player.type in (PlayerType.GROUP, PlayerType.SYNC_GROUP): # redirect to group volume control await self.cmd_group_volume(player_id, volume_level) return if PlayerFeature.VOLUME_SET not in player.supported_features: - LOGGER.warning( - "Volume set command called but player %s does not support volume", - player_id, + raise UnsupportedFeaturedException( + f"Player {player.display_name} does not support volume_set" ) - player.volume_level = volume_level - self.update(player_id) - return player_provider = self.get_player_provider(player_id) await player_provider.cmd_volume_set(player_id, volume_level) @@ -501,7 +492,7 @@ class PlayerController(CoreController): new_volume = volume_level volume_dif = new_volume - cur_volume coros = [] - for child_player in self._get_child_players(group_player, True): + for child_player in self.iter_group_members(group_player, True): cur_child_volume = child_player.volume_level new_child_volume = int(cur_child_volume + volume_dif) new_child_volume = max(0, new_child_volume) @@ -509,6 +500,35 @@ class PlayerController(CoreController): coros.append(self.cmd_volume_set(child_player.player_id, new_child_volume)) await asyncio.gather(*coros) + @api_command("players/cmd/group_power") + async def cmd_group_power(self, player_id: str, power: bool) -> None: + """Handle power command for a PlayerGroup.""" + group_player = self.get(player_id, True) + + group_player.powered = power + if not power: + group_player.state = PlayerState.IDLE + + async with asyncio.TaskGroup() as tg: + members_powered = False + for member in self.iter_group_members(group_player, only_powered=True): + members_powered = True + if power: + # set active source to group player if the group (is going to be) powered + member.active_source = group_player.player_id + elif member.active_source == group_player.player_id: + # turn off child player when group turns off + tg.create_task(self.cmd_power(member.player_id, False)) + # edge case: group turned on but no members are powered, power them all! + if not members_powered and power: + for member in self.iter_group_members(group_player, only_powered=False): + tg.create_task(self.cmd_power(member.player_id, True)) + member.active_source = group_player.player_id + + if power and group_player.player_id.startswith(SYNCGROUP_PREFIX): + await self._sync_syncgroup(group_player.player_id) + self.update(player_id) + @api_command("players/cmd/volume_mute") @log_player_command async def cmd_volume_mute(self, player_id: str, muted: bool) -> None: @@ -520,34 +540,106 @@ class PlayerController(CoreController): player = self.get(player_id, True) assert player if PlayerFeature.VOLUME_MUTE not in player.supported_features: - LOGGER.debug("Mute command called but player %s does not support muting", player_id) - player.volume_muted = muted - # use volume to process the muting - cache_key = f"prev_volume_muting_{player_id}" - if muted: - await self.mass.cache.set(cache_key, player.volume_level) - await self.cmd_volume_set(player_id, 0) - else: - prev_volume = await self.mass.cache.get(cache_key, default=10) - await self.cmd_volume_set(player_id, prev_volume) - self.update(player_id) - return - # TODO: Implement PlayerControl + raise UnsupportedFeaturedException( + f"Player {player.display_name} does not support muting" + ) player_provider = self.get_player_provider(player_id) await player_provider.cmd_volume_mute(player_id, muted) + @api_command("players/cmd/seek") + async def cmd_seek(self, player_id: str, position: int) -> None: + """Handle SEEK command for given queue. + + - player_id: player_id of the player to handle the command. + - position: position in seconds to seek to in the current playing item. + """ + player_id = self._check_redirect(player_id) + + player = self.get(player_id, True) + if PlayerFeature.SEEK not in player.supported_features: + raise UnsupportedFeaturedException( + f"Player {player.display_name} does not support seeking" + ) + player_prov = self.mass.players.get_player_provider(player_id) + await player_prov.cmd_seek(player_id, position) + + async def play_media( + self, + player_id: str, + queue_item: QueueItem, + seek_position: int, + fade_in: bool, + ) -> None: + """Handle PLAY MEDIA on given player. + + This is called by the Queue controller to start playing a queue item on the given player. + The provider's own implementation should work out how to handle this request. + + - player_id: player_id of the player to handle the command. + - queue_item: The QueueItem that needs to be played on the player. + - seek_position: Optional seek to this position. + - fade_in: Optionally fade in the item at playback start. + """ + if player_id.startswith(SYNCGROUP_PREFIX): + # redirect to syncgroup-leader if needed + await self.cmd_group_power(player_id, True) + group_player = self.get(player_id, True) + if sync_leader := self.get_sync_leader(group_player): + await self.play_media( + sync_leader.player_id, + queue_item=queue_item, + seek_position=seek_position, + fade_in=fade_in, + ) + group_player.state = PlayerState.PLAYING + return + player_prov = self.mass.players.get_player_provider(player_id) + await player_prov.play_media( + player_id=player_id, + queue_item=queue_item, + seek_position=int(seek_position), + fade_in=fade_in, + ) + + async def enqueue_next_queue_item(self, player_id: str, queue_item: QueueItem): + """ + Handle enqueuing of the next queue item on the player. + + If the player supports PlayerFeature.ENQUE_NEXT: + This will be called about 10 seconds before the end of the track. + If the player does NOT report support for PlayerFeature.ENQUE_NEXT: + This will be called when the end of the track is reached. + + A PlayerProvider implementation is in itself responsible for handling this + so that the queue items keep playing until its empty or the player stopped. + + This will NOT be called if the end of the queue is reached (and repeat disabled). + This will NOT be called if the player is using flow mode to playback the queue. + """ + if player_id.startswith(SYNCGROUP_PREFIX): + # redirect to syncgroup-leader if needed + group_player = self.get(player_id, True) + if sync_leader := self.get_sync_leader(group_player): + await self.enqueue_next_queue_item( + sync_leader.player_id, + queue_item=queue_item, + ) + return + player_prov = self.mass.players.get_player_provider(player_id) + await player_prov.enqueue_next_queue_item(player_id=player_id, queue_item=queue_item) + @api_command("players/cmd/sync") @log_player_command async def cmd_sync(self, player_id: str, target_player: str) -> None: """Handle SYNC command for given player. - Join/add the given player(id) to the given (master) player/sync group. + Join/add the given player(id) to the given (leader) player/sync group. If the player is already synced to another player, it will be unsynced there first. - If the target player itself is already synced to another player, this will fail. - If the player can not be synced with the given target player, this will fail. + If the target player itself is already synced to another player, this may fail. + If the player can not be synced with the given target player, this may fail. - player_id: player_id of the player to handle the command. - - target_player: player_id of the syncgroup master or group player. + - target_player: player_id of the syncgroup leader or group player. """ child_player = self.get(player_id, True) parent_player = self.get(target_player, True) @@ -561,10 +653,6 @@ class PlayerController(CoreController): raise UnsupportedFeaturedException( f"Player {parent_player.name} does not support (un)sync commands" ) - if parent_player.synced_to is not None: - raise PlayerCommandFailed( - f"Player {target_player} is already synced to another player." - ) if player_id not in parent_player.can_sync_with: raise PlayerCommandFailed(f"Player {player_id} can not be synced to {target_player}.") if child_player.synced_to: @@ -579,9 +667,9 @@ class PlayerController(CoreController): # all checks passed, forward command to the player provider player_provider = self.get_player_provider(player_id) await player_provider.cmd_sync(player_id, target_player) - child_player.hidden_by.add(target_player) # optimistically update the player to update the UI as fast as possible - self.mass.create_task(player_provider.poll_player(player_id)) + parent_player.group_childs.add(player_id) + self.update(player_id) @api_command("players/cmd/unsync") @log_player_command @@ -606,49 +694,93 @@ class PlayerController(CoreController): return # all checks passed, forward command to the player provider - if player.synced_to in player.hidden_by: - player.hidden_by.remove(player.synced_to) player_provider = self.get_player_provider(player_id) await player_provider.cmd_unsync(player_id) # optimistically update the player to update the UI as fast as possible - self.mass.create_task(player_provider.poll_player(player_id)) + player.synced_to = None + self.update(player_id) + + @api_command("players/create_group") + async def create_group(self, provider: str, name: str, members: list[str]) -> Player: + """Create new Player/Sync Group on given PlayerProvider with name and members. + + - provider: provider domain or instance id to create the new group on. + - name: Name for the new group to create. + - members: A list of player_id's that should be part of this group. + + Returns the newly created player on success. + NOTE: Fails if the given provider does not support creating new groups + or members are given that can not be handled by the provider. + """ + # perform basic checks + if (player_prov := self.mass.get_provider(provider)) is None: + raise ProviderUnavailableError(f"Provider {provider} is not available!") + if ProviderFeature.PLAYER_GROUP_CREATE in player_prov.supported_features: + # provider supports group create feature: forward request to provider + # the provider is itself responsible for + # checking if the members can be used for grouping + return await player_prov.create_group(name, members=members) + if ProviderFeature.SYNC_PLAYERS in player_prov.supported_features: + # default syncgroup implementation + return await self._create_syncgroup(provider, name, members) + raise UnsupportedFeaturedException( + f"Provider {player_prov.name} does not support creating groups" + ) def _check_redirect(self, player_id: str) -> str: """Check if playback related command should be redirected.""" player = self.get(player_id, True) + if player_id.startswith(SYNCGROUP_PREFIX) and (sync_leader := self.get_sync_leader(player)): + return sync_leader.player_id if player.synced_to: - sync_master = self.get(player.synced_to, True) + sync_leader = self.get(player.synced_to, True) LOGGER.warning( "Player %s is synced to %s and can not accept " "playback related commands itself, " "redirected the command to the sync leader.", player.name, - sync_master.name, + sync_leader.name, ) return player.synced_to return player_id - def _get_player_groups(self, player_id: str) -> tuple[Player, ...]: - """Return all (player_ids of) any groupplayers the given player belongs to.""" - return tuple(x for x in self if x.type == PlayerType.GROUP and player_id in x.group_childs) + def _get_player_groups( + self, player: Player, available_only: bool = True, powered_only: bool = False + ) -> Iterator[Player]: + """Return all groupplayers the given player belongs to.""" + for _player in self: + if _player.player_id == player.player_id: + continue + if _player.type not in (PlayerType.GROUP, PlayerType.SYNC_GROUP): + continue + if available_only and not _player.available: + continue + if powered_only and not _player.powered: + continue + if ( + player.player_id in _player.group_childs + or player.active_source == _player.player_id + ): + yield _player + + def _get_active_player_group(self, player: Player) -> Player | None: + """Return the currently active groupplayer for the given player (if any).""" + # prefer active source group + for group_player in self._get_player_groups(player, available_only=True, powered_only=True): + if player.active_source in (group_player.player_id, group_player.active_source): + return group_player + # fallback to just the first powered group + for group_player in self._get_player_groups(player, available_only=True, powered_only=True): + return group_player + return None def _get_active_source(self, player: Player) -> str: """Return the active_source id for given player.""" - # if player is synced, return master/group leader's active source + # if player is synced, return group leader's active source if player.synced_to and (parent_player := self.get(player.synced_to)): return self._get_active_source(parent_player) - # iterate player groups to find out if one is playing - if group_players := self._get_player_groups(player.player_id): - # prefer the first playing (or paused) group parent - for group_player in group_players: - # if the group player's playerid is within the current_item_id - # this group is definitely active - if player.current_item_id and group_player.player_id in player.current_item_id: - return group_player.player_id - # fallback to the first powered group player - for group_player in group_players: - if group_player.powered: - return group_player.player_id + if active_player_group := self._get_active_player_group(player): + return active_player_group.player_id # defaults to the player's own player id if not active source set return player.active_source or player.player_id @@ -660,22 +792,21 @@ class PlayerController(CoreController): # calculate group volume from all (turned on) players group_volume = 0 active_players = 0 - for child_player in self._get_child_players(player, True): + for child_player in self.iter_group_members(player, True): group_volume += child_player.volume_level active_players += 1 if active_players: group_volume = group_volume / active_players return int(group_volume) - def _get_child_players( + def iter_group_members( self, - player: Player, + group_player: Player, only_powered: bool = False, only_playing: bool = False, - ) -> list[Player]: + ) -> Iterator[Player]: """Get (child) players attached to a grouped player.""" - child_players: list[Player] = [] - for child_id in player.group_childs: + for child_id in list(group_player.group_childs): if child_player := self.get(child_id, False): if not child_player.available: continue @@ -686,8 +817,7 @@ class PlayerController(CoreController): or child_player.state in (PlayerState.PLAYING, PlayerState.PAUSED) ): continue - child_players.append(child_player) - return child_players + yield child_player async def _poll_players(self) -> None: """Background task that polls players for updates.""" @@ -708,17 +838,20 @@ class PlayerController(CoreController): # - every 30 seconds if the player is powered # - every 10 seconds if the player is playing if ( - (player.available and player.powered and count % 30 == 0) - or (player.available and player_playing and count % 10 == 0) - or count == 360 - ) and (player_prov := self.get_player_provider(player_id)): + player.available + and ( + (player.powered and count % 30 == 0) + or (player_playing and count % 10 == 0) + or count == 360 + ) + and (player_prov := self.get_player_provider(player_id)) + ): try: await player_prov.poll_player(player_id) except PlayerUnavailableError: player.available = False player.state = PlayerState.IDLE player.powered = False - self.mass.loop.call_soon(self.update, player_id) except Exception as err: # pylint: disable=broad-except LOGGER.warning( "Error while requesting latest state from player %s: %s", @@ -726,6 +859,160 @@ class PlayerController(CoreController): str(err), exc_info=err, ) + finally: + # always update player state + self.mass.loop.call_soon(self.update, player_id) if count >= 360: count = 0 await asyncio.sleep(1) + + # Syncgroup specific functions/helpers + + async def _create_syncgroup(self, provider: str, name: str, members: list[str]) -> Player: + """Create new (providers-specific) SyncGroup with given name and members.""" + new_group_id = f"{SYNCGROUP_PREFIX}{shortuuid.random(8).lower()}" + # cleanup list, filter groups (should be handled by frontend, but just in case) + members = [ + x.player_id + for x in self + if x.player_id in members + if not x.player_id.startswith(SYNCGROUP_PREFIX) + if x.provider == provider and PlayerFeature.SYNC in x.supported_features + ] + # create default config with the user chosen name + self.mass.config.create_default_player_config( + new_group_id, + provider, + name=name, + enabled=True, + values={CONF_GROUP_MEMBERS: members}, + ) + player = self._register_syncgroup( + group_player_id=new_group_id, provider=provider, name=name, members=members + ) + return player + + def get_sync_leader(self, group_player: Player) -> Player | None: + """Get the sync leader player for a syncgroup or synced player.""" + if group_player.synced_to: + # should not happen but just in case... + return group_player.synced_to + for child_player in self.iter_group_members( + group_player, only_powered=True, only_playing=False + ): + if not child_player.group_childs: + continue + return child_player + return None + + async def _sync_syncgroup(self, player_id: str) -> None: + """Sync all (possible) players of a syncgroup.""" + group_player = self.get(player_id, True) + sync_leader = self.get_sync_leader(group_player) + for member in self.iter_group_members(group_player, only_powered=True): + if not member.can_sync_with: + continue + if not sync_leader: + # elect the first member as the sync leader if we do not have one + sync_leader = member + continue + if sync_leader.player_id == member.player_id: + continue + await self.cmd_sync(member.player_id, sync_leader.player_id) + + async def _register_syncgroups(self) -> None: + """Register all (virtual/fake) syncgroup players.""" + player_configs = await self.mass.config.get_player_configs() + for player_config in player_configs: + if not player_config.player_id.startswith(SYNCGROUP_PREFIX): + continue + members = player_config.get_value(CONF_GROUP_MEMBERS) + self._register_syncgroup( + group_player_id=player_config.player_id, + provider=player_config.provider, + name=player_config.name or player_config.default_name, + members=members, + ) + + def _register_syncgroup( + self, group_player_id: str, provider: str, name: str, members: Iterable[str] + ) -> Player: + """Register a (virtual/fake) syncgroup player.""" + # extract player features from first/random player + for member in members: + if first_player := self.get(member): + supported_features = first_player.supported_features + break + else: + # edge case: no child player is (yet) available; postpone register + return + player = Player( + player_id=group_player_id, + provider=provider, + type=PlayerType.SYNC_GROUP, + name=name, + available=True, + powered=False, + device_info=DeviceInfo(model="SyncGroup", manufacturer=provider.title()), + supported_features=supported_features, + group_childs=set(members), + ) + self.mass.players.register_or_update(player) + return player + + def _on_syncgroup_child_power( + self, player_id: str, child_player_id: str, new_power: bool + ) -> None: + """ + Call when a power command was executed on one of the child player of a Player/Sync group. + + This is used to handle special actions such as (re)syncing. + """ + group_player = self.mass.players.get(player_id) + child_player = self.mass.players.get(child_player_id) + + if not group_player.powered: + # guard, this should be caught in the player controller but just in case... + return + + powered_childs = [x for x in self.iter_group_members(group_player, True)] + if not new_power and child_player in powered_childs: + powered_childs.remove(child_player) + if new_power and child_player not in powered_childs: + powered_childs.append(child_player) + + # if the last player of a group turned off, turn off the group + if len(powered_childs) == 0: + self.logger.debug( + "Group %s has no more powered members, turning off group player", + group_player.display_name, + ) + self.mass.create_task(self.cmd_power(player_id, False)) + return + + group_playing = group_player.state == PlayerState.PLAYING + is_sync_leader = ( + len(child_player.group_childs) > 0 + and child_player.active_source == group_player.player_id + ) + if group_playing and not new_power and is_sync_leader: + # the current sync leader player turned OFF while the group player + # should still be playing - we need to select a new sync leader and resume + self.logger.warning( + "Syncleader %s turned off while syncgroup is playing, " + "a forced resume for syngroup %s will be attempted in 5 seconds...", + child_player.display_name, + group_player.display_name, + ) + + async def forced_resync(): + # we need to wait a bit here to not run into massive race conditions + await asyncio.sleep(5) + await self._sync_syncgroup(group_player.player_id) + await self.mass.player_queues.resume(group_player.player_id) + + self.mass.create_task(forced_resync()) + return + if new_power: + # if a child player turned ON while the group player is on, we need to resync/resume + self.mass.create_task(self._sync_syncgroup(group_player.player_id)) diff --git a/music_assistant/server/controllers/streams.py b/music_assistant/server/controllers/streams.py index fe485d07..3ffb213a 100644 --- a/music_assistant/server/controllers/streams.py +++ b/music_assistant/server/controllers/streams.py @@ -78,6 +78,8 @@ class MultiClientStreamJob: In case a stream is restarted (e.g. when seeking), a new MultiClientStreamJob will be created. """ + _audio_task: asyncio.Task | None = None + def __init__( self, stream_controller: StreamsController, @@ -102,15 +104,14 @@ class MultiClientStreamJob: self.bytes_streamed: int = 0 self.client_seconds_skipped: dict[str, int] = {} self._all_clients_connected = asyncio.Event() - # start running the audio task in the background - self._audio_task = asyncio.create_task(self._stream_job_runner()) self.logger = stream_controller.logger.getChild(f"streamjob_{self.job_id}") self._finished: bool = False + self._first_chunk: bytes = b"" @property def finished(self) -> bool: """Return if this StreamJob is finished.""" - return self._finished or self._audio_task.done() + return self._finished or self._audio_task and self._audio_task.done() @property def pending(self) -> bool: @@ -122,12 +123,18 @@ class MultiClientStreamJob: """Return if this Job is running.""" return not self.finished and not self.pending + def start(self) -> None: + """Start running this streamjob.""" + # start running the audio task in the background + self._audio_task = asyncio.create_task(self._stream_job_runner()) + def stop(self) -> None: """Stop running this job.""" self._finished = True - if self._audio_task.done(): + if self._audio_task and self._audio_task.done(): return - self._audio_task.cancel() + if self._audio_task: + self._audio_task.cancel() for sub_queue in self.subscribed_players.values(): with suppress(asyncio.QueueFull): sub_queue.put_nowait(b"") @@ -158,6 +165,9 @@ class MultiClientStreamJob: try: self.subscribed_players[player_id] = sub_queue = asyncio.Queue(2) + if self._first_chunk: + yield self._first_chunk + if self._all_clients_connected.is_set(): # client subscribes while we're already started self.logger.debug( @@ -205,7 +215,8 @@ class MultiClientStreamJob: async for chunk in self.stream_controller.get_flow_stream( self.queue, self.start_queue_item, self.pcm_format, self.seek_position, self.fade_in ): - if chunk_num == 0: + chunk_num += 1 + if chunk_num == 1: # wait until all expected clients are connected try: async with asyncio.timeout(10): @@ -229,8 +240,13 @@ class MultiClientStreamJob: len(self.subscribed_players), len(self.expected_players), ) + await self._put_chunk(chunk) - chunk_num += 1 + + # keep first chunk to workaround (dlna) players that do multiple get requests + if chunk_num == 1: + self._first_chunk = chunk + await asyncio.sleep(0.1) # mark EOF with empty chunk await self._put_chunk(b"") @@ -416,6 +432,8 @@ class StreamsController(CoreController): start_queue_item: QueueItem, seek_position: int = 0, fade_in: bool = False, + pcm_bit_depth: int = 24, + pcm_sample_rate: int = 48000, ) -> MultiClientStreamJob: """Create a MultiClientStreamJob for the given queue.. @@ -427,9 +445,6 @@ class StreamsController(CoreController): if not existing_job.finished: self.logger.warning("Detected existing (running) stream job for queue %s", queue_id) existing_job.stop() - queue_player = self.mass.players.get(queue_id) - pcm_bit_depth = 24 if queue_player.supports_24bit else 16 - pcm_sample_rate = min(queue_player.max_sample_rate, 96000) self.multi_client_jobs[queue_id] = stream_job = MultiClientStreamJob( self, queue_id=queue_id, @@ -464,6 +479,7 @@ class StreamsController(CoreController): reason=f"Unable to retrieve streamdetails for item: {queue_item}" ) seek_position = int(request.query.get("seek_position", 0)) + queue_item.streamdetails.seconds_skipped = seek_position fade_in = bool(request.query.get("fade_in", 0)) # work out output format/details output_format = await self._get_output_format( @@ -812,10 +828,11 @@ class StreamsController(CoreController): chunk_num += 1 # throttle buffer, do not allow more than 30 seconds in buffer - seconds_buffered = total_bytes_written / pcm_sample_size + seconds_buffered = (total_bytes_written + bytes_written) / pcm_sample_size player = self.mass.players.get(queue.queue_id) - while (seconds_buffered - player.corrected_elapsed_time) > 30: - await asyncio.sleep(1) + if seconds_buffered > 60 and player.corrected_elapsed_time > 30: + while (seconds_buffered - player.corrected_elapsed_time) > 30: + await asyncio.sleep(1) #### HANDLE FIRST PART OF TRACK @@ -874,6 +891,7 @@ class StreamsController(CoreController): queue_track.streamdetails.duration = ( seek_position + queue_track.streamdetails.seconds_streamed ) + total_bytes_written += bytes_written self.logger.debug( "Finished Streaming queue track: %s (%s) on queue %s - seconds streamed: %s", queue_track.streamdetails.uri, diff --git a/music_assistant/server/models/player_provider.py b/music_assistant/server/models/player_provider.py index 29584b4e..f6d62eaa 100644 --- a/music_assistant/server/models/player_provider.py +++ b/music_assistant/server/models/player_provider.py @@ -8,14 +8,20 @@ from music_assistant.common.models.config_entries import ( CONF_ENTRY_AUTO_PLAY, CONF_ENTRY_VOLUME_NORMALIZATION, CONF_ENTRY_VOLUME_NORMALIZATION_TARGET, + ConfigEntry, + ConfigValueOption, + PlayerConfig, ) +from music_assistant.common.models.enums import ConfigEntryType from music_assistant.common.models.player import Player +from music_assistant.constants import CONF_GROUP_MEMBERS, CONF_GROUP_PLAYERS, SYNCGROUP_PREFIX from .provider import Provider if TYPE_CHECKING: - from music_assistant.common.models.config_entries import ConfigEntry, PlayerConfig from music_assistant.common.models.queue_item import QueueItem + from music_assistant.server.controllers.streams import MultiClientStreamJob + # ruff: noqa: ARG001, ARG002 @@ -28,17 +34,49 @@ class PlayerProvider(Provider): async def get_player_config_entries(self, player_id: str) -> tuple[ConfigEntry, ...]: """Return all (provider/player specific) Config Entries for the given player (if any).""" - return ( + entries = ( CONF_ENTRY_VOLUME_NORMALIZATION, CONF_ENTRY_AUTO_PLAY, CONF_ENTRY_VOLUME_NORMALIZATION_TARGET, ) + if player_id.startswith(SYNCGROUP_PREFIX): + # add default entries for syncgroups + return entries + ( + ConfigEntry( + key=CONF_GROUP_MEMBERS, + type=ConfigEntryType.STRING, + label="Group members", + default_value=[], + options=tuple( + ConfigValueOption(x.display_name, x.player_id) + for x in self.mass.players.all(True, False) + if x.player_id != player_id and x.provider == self.instance_id + ), + description="Select all players you want to be part of this group", + multi_value=True, + required=True, + ), + ) + return entries def on_player_config_changed(self, config: PlayerConfig, changed_keys: set[str]) -> None: """Call (by config manager) when the configuration of a player changes.""" + if f"values/{CONF_GROUP_MEMBERS}" in changed_keys: + player = self.mass.players.get(config.player_id) + player.group_childs = config.get_value(CONF_GROUP_MEMBERS) + self.mass.players.update(config.player_id) def on_player_config_removed(self, player_id: str) -> None: """Call (by config manager) when the configuration of a player is removed.""" + # ensure that any group players get removed + group_players = self.mass.config.get_raw_provider_config_value( + self.instance_id, CONF_GROUP_PLAYERS, {} + ) + if player_id in group_players: + del group_players[player_id] + self.mass.config.set_raw_provider_config_value( + self.instance_id, CONF_GROUP_PLAYERS, group_players + ) @abstractmethod async def cmd_stop(self, player_id: str) -> None: @@ -60,6 +98,7 @@ class PlayerProvider(Provider): - player_id: player_id of the player to handle the command. """ # will only be called for players with Pause feature set. + raise NotImplementedError() async def play_media( self, @@ -78,6 +117,14 @@ class PlayerProvider(Provider): - seek_position: Optional seek to this position. - fade_in: Optionally fade in the item at playback start. """ + raise NotImplementedError() + + async def play_stream(self, player_id: str, stream_job: MultiClientStreamJob) -> None: + """Handle PLAY STREAM on given player. + + This is a special feature from the Universal Group provider. + """ + raise NotImplementedError() async def enqueue_next_queue_item(self, player_id: str, queue_item: QueueItem): """ @@ -108,6 +155,7 @@ class PlayerProvider(Provider): - powered: bool if player should be powered on or off. """ # will only be called for players with Power feature set. + raise NotImplementedError() async def cmd_volume_set(self, player_id: str, volume_level: int) -> None: """Send VOLUME_SET command to given player. @@ -116,6 +164,7 @@ class PlayerProvider(Provider): - volume_level: volume level (0..100) to set on the player. """ # will only be called for players with Volume feature set. + raise NotImplementedError() async def cmd_volume_mute(self, player_id: str, muted: bool) -> None: """Send VOLUME MUTE command to given player. @@ -124,6 +173,7 @@ class PlayerProvider(Provider): - muted: bool if player should be muted. """ # will only be called for players with Mute feature set. + raise NotImplementedError() async def cmd_seek(self, player_id: str, position: int) -> None: """Handle SEEK command for given queue. @@ -132,6 +182,7 @@ class PlayerProvider(Provider): - position: position in seconds to seek to in the current playing item. """ # will only be called for players with Seek feature set. + raise NotImplementedError() async def cmd_sync(self, player_id: str, target_player: str) -> None: """Handle SYNC command for given player. @@ -142,6 +193,7 @@ class PlayerProvider(Provider): - target_player: player_id of the syncgroup master or group player. """ # will only be called for players with SYNC feature set. + raise NotImplementedError() async def cmd_unsync(self, player_id: str) -> None: """Handle UNSYNC command for given player. @@ -151,6 +203,18 @@ class PlayerProvider(Provider): - player_id: player_id of the player to handle the command. """ # will only be called for players with SYNC feature set. + raise NotImplementedError() + + async def create_group(self, name: str, members: list[str]) -> Player: + """Create new PlayerGroup on this provider. + + Create a new SyncGroup (or PlayerGroup) with given name and members. + + - name: Name for the new group to create. + - members: A list of player_id's that should be part of this group. + """ + # will only be called for players with PLAYER_GROUP_CREATE feature set. + raise NotImplementedError() async def poll_player(self, player_id: str) -> None: """Poll player for state updates. @@ -168,11 +232,11 @@ class PlayerProvider(Provider): If the player does not need any polling, simply do not override this method. """ - async def on_child_power(self, player_id: str, child_player: Player, new_power: bool) -> None: + def on_child_power(self, player_id: str, child_player_id: str, new_power: bool) -> None: """ - Call when a power command was executed on one of the child players. + Call when a power command was executed on one of the child player of a Player/Sync group. - This is used to handle special actions such as muting as power or (re)syncing. + This is used to handle special actions such as (re)syncing. """ # DO NOT OVERRIDE BELOW diff --git a/music_assistant/server/models/provider.py b/music_assistant/server/models/provider.py index d16b8dc9..c649af1e 100644 --- a/music_assistant/server/models/provider.py +++ b/music_assistant/server/models/provider.py @@ -52,6 +52,9 @@ class Provider: """Return the features supported by this Provider.""" return tuple() + async def handle_setup(self) -> None: + """Handle async initialization of the provider.""" + async def unload(self) -> None: """ Handle unload/close of the provider. diff --git a/music_assistant/server/providers/airplay/__init__.py b/music_assistant/server/providers/airplay/__init__.py index b56f4373..c5808589 100644 --- a/music_assistant/server/providers/airplay/__init__.py +++ b/music_assistant/server/providers/airplay/__init__.py @@ -16,7 +16,7 @@ from typing import TYPE_CHECKING import aiofiles from music_assistant.common.models.config_entries import ConfigEntry, ConfigValueType -from music_assistant.common.models.enums import ConfigEntryType +from music_assistant.common.models.enums import ConfigEntryType, ProviderFeature from music_assistant.common.models.player import DeviceInfo, Player from music_assistant.constants import CONF_LOG_LEVEL, CONF_PLAYERS from music_assistant.server.models.player_provider import PlayerProvider @@ -26,6 +26,7 @@ if TYPE_CHECKING: from music_assistant.common.models.provider import ProviderManifest from music_assistant.common.models.queue_item import QueueItem from music_assistant.server import MusicAssistant + from music_assistant.server.controllers.streams import MultiClientStreamJob from music_assistant.server.models import ProviderInstanceType from music_assistant.server.providers.slimproto import SlimprotoProvider @@ -130,6 +131,11 @@ class AirplayProvider(PlayerProvider): _log_reader_task: asyncio.Task | None = None _removed_players: set[str] | None = None + @property + def supported_features(self) -> tuple[ProviderFeature, ...]: + """Return the features supported by this Provider.""" + return (ProviderFeature.SYNC_PLAYERS,) + async def handle_setup(self) -> None: """Handle async initialization of the provider.""" self._removed_players = set() @@ -164,6 +170,7 @@ class AirplayProvider(PlayerProvider): def on_player_config_changed(self, config: PlayerConfig, changed_keys: set[str]) -> None: """Call (by config manager) when the configuration of a player changes.""" + super().on_player_config_changed(config, changed_keys) # forward to slimproto too slimproto_prov = self.mass.get_provider("slimproto") slimproto_prov.on_player_config_changed(config, changed_keys) @@ -177,6 +184,7 @@ class AirplayProvider(PlayerProvider): def on_player_config_removed(self, player_id: str) -> None: """Call (by config manager) when the configuration of a player is removed.""" + super().on_player_config_removed() self._removed_players.add(player_id) self.restart_bridge() @@ -218,6 +226,15 @@ class AirplayProvider(PlayerProvider): fade_in=fade_in, ) + async def play_stream(self, player_id: str, stream_job: MultiClientStreamJob) -> None: + """Handle PLAY STREAM on given player. + + This is a special feature from the Universal Group provider. + """ + # simply forward to underlying slimproto player + slimproto_prov = self.mass.get_provider("slimproto") + await slimproto_prov.play_stream(player_id, stream_job) + async def enqueue_next_queue_item(self, player_id: str, queue_item: QueueItem): """Handle enqueuing of the next queue item on the player.""" # simply forward to underlying slimproto player diff --git a/music_assistant/server/providers/chromecast/__init__.py b/music_assistant/server/providers/chromecast/__init__.py index 4088bce0..e5f4a3b7 100644 --- a/music_assistant/server/providers/chromecast/__init__.py +++ b/music_assistant/server/providers/chromecast/__init__.py @@ -21,7 +21,6 @@ from pychromecast.socket_client import CONNECTION_STATUS_CONNECTED, CONNECTION_S from music_assistant.common.models.config_entries import ( CONF_ENTRY_CROSSFADE_DURATION, - CONF_ENTRY_HIDE_GROUP_MEMBERS, ConfigEntry, ConfigValueType, ) @@ -49,6 +48,7 @@ if TYPE_CHECKING: from music_assistant.common.models.config_entries import PlayerConfig, ProviderConfig from music_assistant.common.models.provider import ProviderManifest from music_assistant.server import MusicAssistant + from music_assistant.server.controllers.streams import MultiClientStreamJob from music_assistant.server.models import ProviderInstanceType @@ -172,21 +172,14 @@ class ChromecastProvider(PlayerProvider): async def get_player_config_entries(self, player_id: str) -> tuple[ConfigEntry, ...]: """Return all (provider/player specific) Config Entries for the given player (if any).""" - cast_player = self.castplayers.get(player_id) base_entries = await super().get_player_config_entries(player_id) - entries = base_entries + PLAYER_CONFIG_ENTRIES - if ( - cast_player - and cast_player.cast_info.is_audio_group - and not cast_player.cast_info.is_multichannel_group - ): - entries = entries + (CONF_ENTRY_HIDE_GROUP_MEMBERS,) - return entries + return base_entries + PLAYER_CONFIG_ENTRIES def on_player_config_changed( self, config: PlayerConfig, changed_keys: set[str] # noqa: ARG002 ) -> None: """Call (by config manager) when the configuration of a player changes.""" + super().on_player_config_changed(config, changed_keys) if "enabled" in changed_keys and config.player_id not in self.castplayers: self.mass.create_task(self.mass.config.reload_provider, self.instance_id) @@ -208,11 +201,6 @@ class ChromecastProvider(PlayerProvider): async def cmd_power(self, player_id: str, powered: bool) -> None: """Send POWER command to given player.""" castplayer = self.castplayers[player_id] - # set mute_as_power feature for group members - if castplayer.player.type == PlayerType.GROUP: - for child_player_id in castplayer.player.group_childs: - if child_player := self.mass.players.get(child_player_id): - child_player.mute_as_power = powered if powered: await self._launch_app(castplayer) else: @@ -282,6 +270,21 @@ class ChromecastProvider(PlayerProvider): media_controller = castplayer.cc.media_controller await asyncio.to_thread(media_controller.send_message, queuedata, True) + async def play_stream(self, player_id: str, stream_job: MultiClientStreamJob) -> None: + """Handle PLAY STREAM on given player. + + This is a special feature from the Universal Group provider. + """ + url = stream_job.resolve_stream_url(player_id, ContentType.FLAC) + castplayer = self.castplayers[player_id] + await asyncio.to_thread( + castplayer.cc.play_media, + url, + content_type="audio/flac", + title="Music Assistant", + thumb=MASS_LOGO_ONLINE, + ) + async def enqueue_next_queue_item(self, player_id: str, queue_item: QueueItem): """Handle enqueuing of the next queue item on the player.""" castplayer = self.castplayers[player_id] @@ -440,17 +443,12 @@ class ChromecastProvider(PlayerProvider): castplayer.player.name = castplayer.cast_info.friendly_name castplayer.player.volume_level = int(status.volume_level * 100) castplayer.player.volume_muted = status.volume_muted - if castplayer.active_group: - # use mute as power when group is active - castplayer.player.powered = not status.volume_muted - else: - castplayer.player.powered = ( - castplayer.cc.app_id is not None - and castplayer.cc.app_id != pychromecast.IDLE_APP_ID - ) + castplayer.player.powered = ( + castplayer.cc.app_id is not None and castplayer.cc.app_id != pychromecast.IDLE_APP_ID + ) # handle stereo pairs if castplayer.cast_info.is_multichannel_group: - castplayer.player.type = PlayerType.STEREO_PAIR + castplayer.player.type = PlayerType.PLAYER castplayer.player.group_childs = set() # handle cast groups if castplayer.cast_info.is_audio_group and not castplayer.cast_info.is_multichannel_group: diff --git a/music_assistant/server/providers/chromecast/helpers.py b/music_assistant/server/providers/chromecast/helpers.py index 483602f9..925471dd 100644 --- a/music_assistant/server/providers/chromecast/helpers.py +++ b/music_assistant/server/providers/chromecast/helpers.py @@ -176,8 +176,8 @@ class CastStatusListener: """Handle the cast removed from a group.""" if not self._valid: return - if group_uuid in self.castplayer.player.hidden_by: - self.castplayer.player.hidden_by.remove(group_uuid) + if group_uuid == self.castplayer.player.active_source: + self.castplayer.player.active_source = "" self.prov.logger.debug( "%s is removed from multizone: %s", self.castplayer.player.display_name, group_uuid ) diff --git a/music_assistant/server/providers/dlna/__init__.py b/music_assistant/server/providers/dlna/__init__.py index 94f827fa..e9d0845a 100644 --- a/music_assistant/server/providers/dlna/__init__.py +++ b/music_assistant/server/providers/dlna/__init__.py @@ -50,6 +50,7 @@ if TYPE_CHECKING: from music_assistant.common.models.config_entries import PlayerConfig, ProviderConfig from music_assistant.common.models.provider import ProviderManifest from music_assistant.server import MusicAssistant + from music_assistant.server.controllers.streams import MultiClientStreamJob from music_assistant.server.models import ProviderInstanceType BASE_PLAYER_FEATURES = ( @@ -303,6 +304,7 @@ class DLNAPlayerProvider(PlayerProvider): self, config: PlayerConfig, changed_keys: set[str] # noqa: ARG002 ) -> None: """Call (by config manager) when the configuration of a player changes.""" + super().on_player_config_changed(config, changed_keys) # run discovery to catch any re-enabled players self.mass.create_task(self._run_discovery()) # reset player features based on config values @@ -377,6 +379,35 @@ class DLNAPlayerProvider(PlayerProvider): dlna_player.force_poll = True await self.poll_player(dlna_player.udn) + @catch_request_errors + async def play_stream(self, player_id: str, stream_job: MultiClientStreamJob) -> None: + """Handle PLAY STREAM on given player. + + This is a special feature from the Universal Group provider. + """ + enforce_mp3 = await self.mass.config.get_player_config_value(player_id, CONF_ENFORCE_MP3) + output_codec = ContentType.MP3 if enforce_mp3 else ContentType.FLAC + url = stream_job.resolve_stream_url(player_id, output_codec) + dlna_player = self.dlnaplayers[player_id] + # always clear queue (by sending stop) first + if dlna_player.device.can_stop: + await self.cmd_stop(player_id) + didl_metadata = create_didl_metadata(self.mass, url, None) + await dlna_player.device.async_set_transport_uri(url, "Music Assistant", didl_metadata) + # Play it + await dlna_player.device.async_wait_for_can_play(10) + # optimistically set this timestamp to help in case of a player + # that does not report the progress + now = time.time() + dlna_player.player.elapsed_time = 0 + dlna_player.player.elapsed_time_last_updated = now + await dlna_player.device.async_play() + # force poll the device + for sleep in (1, 2): + await asyncio.sleep(sleep) + dlna_player.force_poll = True + await self.poll_player(dlna_player.udn) + @catch_request_errors async def enqueue_next_queue_item(self, player_id: str, queue_item: QueueItem): """ diff --git a/music_assistant/server/providers/slimproto/__init__.py b/music_assistant/server/providers/slimproto/__init__.py index a0e85a97..949fa92f 100644 --- a/music_assistant/server/providers/slimproto/__init__.py +++ b/music_assistant/server/providers/slimproto/__init__.py @@ -32,6 +32,7 @@ from music_assistant.common.models.enums import ( PlayerFeature, PlayerState, PlayerType, + ProviderFeature, ) from music_assistant.common.models.errors import QueueEmpty, SetupFailedError from music_assistant.common.models.player import DeviceInfo, Player @@ -45,6 +46,7 @@ if TYPE_CHECKING: from music_assistant.common.models.config_entries import ProviderConfig from music_assistant.common.models.provider import ProviderManifest from music_assistant.server import MusicAssistant + from music_assistant.server.controllers.streams import MultiClientStreamJob from music_assistant.server.models import ProviderInstanceType @@ -190,6 +192,11 @@ class SlimprotoProvider(PlayerProvider): _cli: LmsCli port: int = DEFAULT_SLIMPROTO_PORT + @property + def supported_features(self) -> tuple[ProviderFeature, ...]: + """Return the features supported by this Provider.""" + return (ProviderFeature.SYNC_PLAYERS,) + async def handle_setup(self) -> None: """Handle async initialization of the provider.""" self._socket_clients = {} @@ -390,6 +397,10 @@ class SlimprotoProvider(PlayerProvider): - seek_position: Optional seek to this position. - fade_in: Optionally fade in the item at playback start. """ + # fix race condition where resync and play media are called at more or less the same time + if self._resync_handle: + self._resync_handle.cancel() + self._resync_handle = None player = self.mass.players.get(player_id) if player.synced_to: raise RuntimeError("A synced player cannot receive play commands directly") @@ -418,6 +429,7 @@ class SlimprotoProvider(PlayerProvider): auto_play=False, ) ) + stream_job.start() else: # regular, single player playback client = self._socket_clients[player_id] @@ -438,6 +450,27 @@ class SlimprotoProvider(PlayerProvider): auto_play=True, ) + async def play_stream(self, player_id: str, stream_job: MultiClientStreamJob) -> None: + """Handle PLAY STREAM on given player. + + This is a special feature from the Universal Group provider. + """ + # forward command to player and any connected sync members + sync_clients = self._get_sync_clients(player_id) + async with asyncio.TaskGroup() as tg: + for client in sync_clients: + tg.create_task( + self._handle_play_url( + client, + url=stream_job.resolve_stream_url( + client.player_id, output_codec=ContentType.FLAC + ), + queue_item=None, + send_flush=True, + auto_play=False, + ) + ) + async def enqueue_next_queue_item(self, player_id: str, queue_item: QueueItem): """Handle enqueuing of the next queue item on the player.""" # we don't have to do anything, @@ -590,7 +623,6 @@ class SlimprotoProvider(PlayerProvider): manufacturer=client.device_type, ), supported_features=( - PlayerFeature.ACCURATE_TIME, PlayerFeature.POWER, PlayerFeature.SYNC, PlayerFeature.VOLUME_SET, diff --git a/music_assistant/server/providers/snapcast/__init__.py b/music_assistant/server/providers/snapcast/__init__.py index 2660f223..b71a3ace 100644 --- a/music_assistant/server/providers/snapcast/__init__.py +++ b/music_assistant/server/providers/snapcast/__init__.py @@ -24,6 +24,7 @@ from music_assistant.common.models.enums import ( PlayerFeature, PlayerState, PlayerType, + ProviderFeature, ) from music_assistant.common.models.errors import SetupFailedError from music_assistant.common.models.media_items import AudioFormat @@ -37,6 +38,7 @@ if TYPE_CHECKING: from music_assistant.common.models.config_entries import ProviderConfig from music_assistant.common.models.provider import ProviderManifest from music_assistant.server import MusicAssistant + from music_assistant.server.controllers.streams import MultiClientStreamJob from music_assistant.server.models import ProviderInstanceType CONF_SNAPCAST_SERVER_HOST = "snapcast_server_host" @@ -98,6 +100,11 @@ class SnapCastProvider(PlayerProvider): snapcast_server_control_port: int _stream_tasks: dict[str, asyncio.Task] + @property + def supported_features(self) -> tuple[ProviderFeature, ...]: + """Return the features supported by this Provider.""" + return (ProviderFeature.SYNC_PLAYERS,) + async def handle_setup(self) -> None: """Handle async initialization of the provider.""" self.snapcast_server_host = self.config.get_value(CONF_SNAPCAST_SERVER_HOST) @@ -198,7 +205,7 @@ class SnapCastProvider(PlayerProvider): async def cmd_volume_set(self, player_id: str, volume_level: int) -> None: """Send VOLUME_SET command to given player.""" await self._snapserver.client_volume( - player_id, {"percent": volume_level, "muted": volume_level != 0} + player_id, {"percent": volume_level, "muted": volume_level == 0} ) async def cmd_stop(self, player_id: str) -> None: @@ -257,7 +264,7 @@ class SnapCastProvider(PlayerProvider): snap_group = self._get_snapgroup(player_id) await snap_group.set_stream(stream.identifier) - async def queue_streamer(): + async def _streamer(): host = self.snapcast_server_host _, writer = await asyncio.open_connection(host, port) self.logger.debug("Opened connection to %s:%s", host, port) @@ -294,7 +301,54 @@ class SnapCastProvider(PlayerProvider): self.logger.debug("Closed connection to %s:%s", host, port) # start streaming the queue (pcm) audio in a background task - self._stream_tasks[player_id] = asyncio.create_task(queue_streamer()) + self._stream_tasks[player_id] = asyncio.create_task(_streamer()) + + async def play_stream(self, player_id: str, stream_job: MultiClientStreamJob) -> None: + """Handle PLAY STREAM on given player. + + This is a special feature from the Universal Group provider. + """ + player = self.mass.players.get(player_id) + if player.synced_to: + raise RuntimeError("A synced player cannot receive play commands directly") + # stop any existing streams first + await self.cmd_stop(player_id) + # TEMP - TODO - WARNING - ACHTUNG - HACK + # override pcm format of streamjob due to issue with snapcast + # that seems to only accept a 48000/16 stream somehow ?! + stream_job.pcm_format.content_type = ContentType.PCM_S16LE + stream_job.pcm_format.sample_rate = 48000 + stream_job.pcm_format.bit_depth = 16 + # end of hack + stream, port = await self._create_stream() + stream_job.expected_players.add(player_id) + snap_group = self._get_snapgroup(player_id) + await snap_group.set_stream(stream.identifier) + + async def _streamer(): + host = self.snapcast_server_host + _, writer = await asyncio.open_connection(host, port) + self.logger.debug("Opened connection to %s:%s", host, port) + player.current_item_id = f"flow/{stream_job.queue_id}" + player.elapsed_time = 0 + player.elapsed_time_last_updated = time.time() + player.state = PlayerState.PLAYING + self._set_childs_state(player_id, PlayerState.PLAYING) + self.mass.players.register_or_update(player) + try: + async for pcm_chunk in stream_job.subscribe(player_id): + writer.write(pcm_chunk) + await writer.drain() + finally: + await self._snapserver.stream_remove_stream(stream.identifier) + if writer.can_write_eof(): + writer.close() + if not writer.is_closing(): + writer.close() + self.logger.debug("Closed connection to %s:%s", host, port) + + # start streaming the queue (pcm) audio in a background task + self._stream_tasks[player_id] = asyncio.create_task(_streamer()) def _get_snapgroup(self, player_id: str) -> Snapgroup: """Get snapcast group for given player_id.""" @@ -328,7 +382,8 @@ class SnapCastProvider(PlayerProvider): port = random.randint(4953, 4953 + 200) name = f"MusicAssistant--{port}" result = await self._snapserver.stream_add_stream( - # TODO: can we handle 24 bits bit depth ? + # NOTE: setting the sampleformat to something else + # (like 24 bits bit depth) does not seem to work at all! f"tcp://0.0.0.0:{port}?name={name}&sampleformat=48000:16:2", ) if "id" not in result: diff --git a/music_assistant/server/providers/snapcast/icon.svg b/music_assistant/server/providers/snapcast/icon.svg index 03069571..853f3659 100644 --- a/music_assistant/server/providers/snapcast/icon.svg +++ b/music_assistant/server/providers/snapcast/icon.svg @@ -1,26 +1,27 @@ - - - - - - - - - - - - - - - - - - - - - - - - + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/music_assistant/server/providers/sonos/__init__.py b/music_assistant/server/providers/sonos/__init__.py index e35d6523..36f2d2d3 100644 --- a/music_assistant/server/providers/sonos/__init__.py +++ b/music_assistant/server/providers/sonos/__init__.py @@ -25,6 +25,7 @@ from music_assistant.common.models.enums import ( PlayerFeature, PlayerState, PlayerType, + ProviderFeature, ) from music_assistant.common.models.errors import PlayerCommandFailed, PlayerUnavailableError from music_assistant.common.models.player import DeviceInfo, Player @@ -37,6 +38,7 @@ if TYPE_CHECKING: from music_assistant.common.models.config_entries import PlayerConfig, ProviderConfig from music_assistant.common.models.provider import ProviderManifest from music_assistant.server import MusicAssistant + from music_assistant.server.controllers.streams import MultiClientStreamJob from music_assistant.server.models import ProviderInstanceType @@ -212,7 +214,7 @@ class SonosPlayer: group_members = {x.uid for x in self.group_info.members if x.is_visible} if not group_members: # not sure about this ?! - self.player.type = PlayerType.STEREO_PAIR + self.player.type = PlayerType.PLAYER elif group_members == {self.player_id}: self.player.group_childs = set() else: @@ -262,6 +264,11 @@ class SonosPlayerProvider(PlayerProvider): _discovery_running: bool = False _discovery_reschedule_timer: asyncio.TimerHandle | None = None + @property + def supported_features(self) -> tuple[ProviderFeature, ...]: + """Return the features supported by this Provider.""" + return (ProviderFeature.SYNC_PLAYERS,) + async def handle_setup(self) -> None: """Handle async initialization of the provider.""" self.sonosplayers = {} @@ -332,6 +339,7 @@ class SonosPlayerProvider(PlayerProvider): self, config: PlayerConfig, changed_keys: set[str] # noqa: ARG002 ) -> None: """Call (by config manager) when the configuration of a player changes.""" + super().on_player_config_changed(config, changed_keys) if "enabled" in changed_keys: # run discovery to catch any re-enabled players self.mass.create_task(self._run_discovery()) @@ -424,7 +432,18 @@ class SonosPlayerProvider(PlayerProvider): - target_player: player_id of the syncgroup master or group player. """ sonos_player = self.sonosplayers[player_id] - await asyncio.to_thread(sonos_player.soco.join, self.sonosplayers[target_player].soco) + retries = 0 + while True: + try: + await asyncio.to_thread( + sonos_player.soco.join, self.sonosplayers[target_player].soco + ) + break + except soco.exceptions.SoCoUPnPException as err: + if retries >= 3: + raise err + retries += 1 + await asyncio.sleep(1) await asyncio.to_thread( sonos_player.update_info, update_group_info=True, @@ -486,6 +505,29 @@ class SonosPlayerProvider(PlayerProvider): sonos_player.player.elapsed_time = 0 sonos_player.player.elapsed_time_last_updated = now + async def play_stream(self, player_id: str, stream_job: MultiClientStreamJob) -> None: + """Handle PLAY STREAM on given player. + + This is a special feature from the Universal Group provider. + """ + url = stream_job.resolve_stream_url(player_id, ContentType.MP3) + sonos_player = self.sonosplayers[player_id] + if not sonos_player.soco.is_coordinator: + # this should be already handled by the player manager, but just in case... + raise PlayerCommandFailed( + f"Player {sonos_player.player.display_name} can not " + "accept play_stream command, it is synced to another player." + ) + # always stop and clear queue first + await asyncio.to_thread(sonos_player.soco.stop) + await asyncio.to_thread(sonos_player.soco.clear_queue) + await asyncio.to_thread(sonos_player.soco.play_uri, url, force_radio=True) + # optimistically set this timestamp to help figure out elapsed time later + now = time.time() + sonos_player.playback_started = now + sonos_player.player.elapsed_time = 0 + sonos_player.player.elapsed_time_last_updated = now + async def enqueue_next_queue_item(self, player_id: str, queue_item: QueueItem): """ Handle enqueuing of the next queue item on the player. @@ -533,6 +575,8 @@ class SonosPlayerProvider(PlayerProvider): the next successful poll or event where it becomes available again. If the player does not need any polling, simply do not override this method. """ + if player_id not in self.sonosplayers: + return sonos_player = self.sonosplayers[player_id] try: # the check_poll logic will work out what endpoints need polling now @@ -602,7 +646,7 @@ class SonosPlayerProvider(PlayerProvider): type=PlayerType.PLAYER, name=soco_device.player_name, available=True, - powered=True, + powered=False, supported_features=PLAYER_FEATURES, device_info=DeviceInfo( model=speaker_info["model_name"], diff --git a/music_assistant/server/providers/ugp/__init__.py b/music_assistant/server/providers/ugp/__init__.py index 1d2b0378..054eb4e2 100644 --- a/music_assistant/server/providers/ugp/__init__.py +++ b/music_assistant/server/providers/ugp/__init__.py @@ -7,15 +7,13 @@ allowing the user to create player groups from all players known in the system. from __future__ import annotations import asyncio +from collections.abc import Iterable from typing import TYPE_CHECKING +import shortuuid + from music_assistant.common.models.config_entries import ( - CONF_ENTRY_EQ_BASS, - CONF_ENTRY_EQ_MID, - CONF_ENTRY_EQ_TREBLE, - CONF_ENTRY_FLOW_MODE, - CONF_ENTRY_HIDE_GROUP_MEMBERS, - CONF_ENTRY_OUTPUT_CHANNELS, + CONF_ENTRY_CROSSFADE_DURATION, ConfigEntry, ConfigValueOption, ConfigValueType, @@ -25,10 +23,11 @@ from music_assistant.common.models.enums import ( PlayerFeature, PlayerState, PlayerType, + ProviderFeature, ) from music_assistant.common.models.player import DeviceInfo, Player from music_assistant.common.models.queue_item import QueueItem -from music_assistant.constants import CONF_GROUPED_POWER_ON +from music_assistant.constants import CONF_CROSSFADE, CONF_GROUP_MEMBERS, SYNCGROUP_PREFIX from music_assistant.server.models.player_provider import PlayerProvider if TYPE_CHECKING: @@ -37,39 +36,9 @@ if TYPE_CHECKING: from music_assistant.server import MusicAssistant from music_assistant.server.models import ProviderInstanceType +UGP_PREFIX = "ugp_" -CONF_GROUP_MEMBERS = "group_members" -CONF_MUTE_CHILDS = "mute_childs" -CONF_ENTRY_OUTPUT_CHANNELS_FORCED_STEREO = ConfigEntry.from_dict( - { - **CONF_ENTRY_OUTPUT_CHANNELS.to_dict(), - "hidden": True, - "default_value": "stereo", - "value": "stereo", - } -) -CONF_ENTRY_FORCED_FLOW_MODE = ConfigEntry.from_dict( - {**CONF_ENTRY_FLOW_MODE.to_dict(), "default_value": True, "value": True, "hidden": True} -) -CONF_ENTRY_EQ_BASS_HIDDEN = ConfigEntry.from_dict({**CONF_ENTRY_EQ_BASS.to_dict(), "hidden": True}) -CONF_ENTRY_EQ_MID_HIDDEN = ConfigEntry.from_dict({**CONF_ENTRY_EQ_MID.to_dict(), "hidden": True}) -CONF_ENTRY_EQ_TREBLE_HIDDEN = ConfigEntry.from_dict( - {**CONF_ENTRY_EQ_TREBLE.to_dict(), "hidden": True} -) -CONF_ENTRY_GROUPED_POWER_ON = ConfigEntry( - key=CONF_GROUPED_POWER_ON, - type=ConfigEntryType.BOOLEAN, - default_value=False, - label="Forced Power ON of all group members", - description="Power ON all child players when the group player is powered on " - "(or playback started). \n" - "If this setting is disabled, playback will only start on players that " - "are already powered ON at the time of playback start.\n" - "When turning OFF the group player, all group members are turned off, " - "regardless of this setting.", - advanced=False, -) # ruff: noqa: ARG002 @@ -83,10 +52,10 @@ async def setup( async def get_config_entries( - mass: MusicAssistant, - instance_id: str | None = None, - action: str | None = None, - values: dict[str, ConfigValueType] | None = None, + mass: MusicAssistant, # noqa: ARG001 + instance_id: str | None = None, # noqa: ARG001 + action: str | None = None, # noqa: ARG001 + values: dict[str, ConfigValueType] | None = None, # noqa: ARG001 ) -> tuple[ConfigEntry, ...]: """ Return Config entries to setup this provider. @@ -95,34 +64,7 @@ async def get_config_entries( action: [optional] action key called from config entries UI. values: the (intermediate) raw values for config entries sent with the action. """ - # ruff: noqa: ARG001 - # dynamically extend the amount of entries when needed - if values.get("ugp_15"): - player_count = 20 - elif values.get("ugp_10"): - player_count = 15 - elif values.get("ugp_5"): - player_count = 10 - else: - player_count = 5 - player_entries = tuple( - ConfigEntry( - key=f"ugp_{index}", - type=ConfigEntryType.STRING, - label=f"Group player {index}: Group members", - default_value=[], - options=tuple( - ConfigValueOption(x.display_name, x.player_id) - for x in mass.players.all(True, True, False) - if x.player_id != f"ugp_{index}" - ), - description="Select all players you want to be part of this group", - multi_value=True, - required=False, - ) - for index in range(1, player_count + 1) - ) - return player_entries + return tuple() class UniversalGroupProvider(PlayerProvider): @@ -131,94 +73,81 @@ class UniversalGroupProvider(PlayerProvider): prev_sync_leaders: dict[str, tuple[str]] | None = None debounce_id: str | None = None + @property + def supported_features(self) -> tuple[ProviderFeature, ...]: + """Return the features supported by this Provider.""" + return (ProviderFeature.PLAYER_GROUP_CREATE,) + async def handle_setup(self) -> None: """Handle async initialization of the provider.""" self.prev_sync_leaders = {} - self.muted_clients = set() - - for index in range(1, 100): - conf_key = f"ugp_{index}" - try: - player_conf = self.config.get_value(conf_key) - except KeyError: - break - if player_conf == []: - # cleanup player config if player config is removed/reset - self.mass.players.remove(conf_key) - continue - elif not player_conf: - continue - - player = Player( - player_id=conf_key, - provider=self.domain, - type=PlayerType.GROUP, - name=f"{self.name}: {index}", - available=True, - powered=False, - device_info=DeviceInfo(model=self.manifest.name, manufacturer="Music Assistant"), - # TODO: derive playerfeatures from (all) underlying child players? - supported_features=( - PlayerFeature.POWER, - PlayerFeature.PAUSE, - PlayerFeature.VOLUME_SET, - PlayerFeature.VOLUME_MUTE, - ), - max_sample_rate=48000, - supports_24bit=True, - active_source=conf_key, - group_childs=player_conf, - ) - player.extra_data["optimistic_state"] = PlayerState.IDLE - self.prev_sync_leaders[conf_key] = None - self.mass.players.register_or_update(player) + self.mass.loop.create_task(self._register_all_players()) async def get_player_config_entries(self, player_id: str) -> tuple[ConfigEntry]: # noqa: ARG002 """Return all (provider/player specific) Config Entries for the given player (if any).""" - return ( - CONF_ENTRY_HIDE_GROUP_MEMBERS, - CONF_ENTRY_GROUPED_POWER_ON, + base_entries = await super().get_player_config_entries(player_id) + return base_entries + ( ConfigEntry( - key=CONF_MUTE_CHILDS, + key=CONF_GROUP_MEMBERS, type=ConfigEntryType.STRING, - label="Use muting for power commands", - multi_value=True, - options=( + label="Group members", + default_value=[], + options=tuple( ConfigValueOption(x.display_name, x.player_id) - for x in self._get_active_members(player_id, False, False) + for x in self.mass.players.all(True, False) + if x.player_id != player_id ), - default_value=[], - description="To prevent a restart of the stream, when a child player " - "turns on while the group is already playing, you can enable a workaround " - "where Music Assistant uses muting to control the group players. \n\n" - "This means that while the group player is playing, power actions to these " - "child players will be treated as (un)mute commands to prevent the small " - "interruption of music when the stream is restarted.", + description="Select all players you want to be part of this universal group", + multi_value=True, + required=True, + ), + ConfigEntry( + key="ugp_note", + type=ConfigEntryType.LABEL, + label="Please note that although the universal group " + "allows you to group any player, it will not enable audio sync " + "between players of different ecosystems.", ), + ConfigEntry( + key=CONF_CROSSFADE, + type=ConfigEntryType.BOOLEAN, + label="Enable crossfade", + default_value=False, + description="Enable a crossfade transition between (queue) tracks. \n\n" + "Note that DLNA does not natively support crossfading so you need to enable " + "the 'flow mode' workaround to use crossfading with DLNA players.", + advanced=False, + ), + CONF_ENTRY_CROSSFADE_DURATION, ) async def cmd_stop(self, player_id: str) -> None: """Send STOP command to given player.""" group_player = self.mass.players.get(player_id) - group_player.extra_data["optimistic_state"] = PlayerState.IDLE + group_player.state = PlayerState.IDLE # forward command to player and any connected sync child's async with asyncio.TaskGroup() as tg: - for member in self._get_active_members( - player_id, only_powered=True, skip_sync_childs=True - ): + for member in self.mass.players.iter_group_members(group_player, only_powered=True): if member.state == PlayerState.IDLE: continue tg.create_task(self.mass.players.cmd_stop(member.player_id)) async def cmd_play(self, player_id: str) -> None: """Send PLAY command to given player.""" - group_player = self.mass.players.get(player_id) - group_player.extra_data["optimistic_state"] = PlayerState.PLAYING - async with asyncio.TaskGroup() as tg: - for member in self._get_active_members( - player_id, only_powered=False, skip_sync_childs=True - ): - tg.create_task(self.mass.players.cmd_play(member.player_id)) + + async def cmd_pause(self, player_id: str) -> None: + """Send PAUSE command to given player.""" + + async def cmd_power(self, player_id: str, powered: bool) -> None: + """Send POWER command to given player.""" + await self.mass.players.cmd_group_power(player_id, powered) + + async def cmd_volume_set(self, player_id: str, volume_level: int) -> None: + """Send VOLUME_SET command to given player.""" + # group volume is already handled in the player manager + + async def cmd_volume_mute(self, player_id: str, muted: bool) -> None: + """Send VOLUME MUTE command to given player.""" async def play_media( self, @@ -240,270 +169,136 @@ class UniversalGroupProvider(PlayerProvider): # power ON await self.cmd_power(player_id, True) group_player = self.mass.players.get(player_id) - active_members = self._get_active_members( - player_id, only_powered=True, skip_sync_childs=True - ) - if len(active_members) == 0: - self.logger.warning( - "Play media requested for player %s but no member players are powered, " - "the request will be ignored", - group_player.display_name, - ) - return - group_player.extra_data["optimistic_state"] = PlayerState.PLAYING + # create multi-client stream job + stream_job = await self.mass.streams.create_multi_client_stream_job( + player_id, start_queue_item=queue_item, seek_position=seek_position, fade_in=fade_in + ) - # forward the command to all (sync master) group child's + # forward the stream job to all group members async with asyncio.TaskGroup() as tg: - for member in active_members: + for member in self.mass.players.iter_group_members(group_player, only_powered=True): player_prov = self.mass.players.get_player_provider(member.player_id) - tg.create_task( - player_prov.play_media( - member.player_id, - queue_item=queue_item, - seek_position=seek_position, - fade_in=fade_in, - ) - ) - - async def enqueue_next_queue_item(self, player_id: str, queue_item: QueueItem): - """ - Handle enqueuing of the next queue item on the player. + if member.player_id.startswith(SYNCGROUP_PREFIX): + member = self.mass.players.get_sync_leader(member) # noqa: PLW2901 + if member is None: + continue + tg.create_task(player_prov.play_stream(member.player_id, stream_job)) + stream_job.start() - If the player supports PlayerFeature.ENQUE_NEXT: - This will be called about 10 seconds before the end of the track. - If the player does NOT report support for PlayerFeature.ENQUE_NEXT: - This will be called when the end of the track is reached. - - A PlayerProvider implementation is in itself responsible for handling this - so that the queue items keep playing until its empty or the player stopped. + async def poll_player(self, player_id: str) -> None: + """Poll player for state updates.""" + self.update_attributes(player_id) + self.mass.players.update(player_id, skip_forward=True) - This will NOT be called if the end of the queue is reached (and repeat disabled). - This will NOT be called if the player is using flow mode to playback the queue. - """ - # forward the command to all (sync master) group child's - async with asyncio.TaskGroup() as tg: - for member in self._get_active_members( - player_id, only_powered=False, skip_sync_childs=True - ): - player_prov = self.mass.players.get_player_provider(member.player_id) - tg.create_task(player_prov.enqueue_next_queue_item(member.player_id, queue_item)) + async def create_group(self, name: str, members: list[str]) -> Player: + """Create new PlayerGroup on this provider. - async def cmd_pause(self, player_id: str) -> None: - """Send PAUSE command to given player.""" - group_player = self.mass.players.get(player_id) - group_player.extra_data["optimistic_state"] = PlayerState.PAUSED - async with asyncio.TaskGroup() as tg: - for member in self._get_active_members( - player_id, only_powered=True, skip_sync_childs=True - ): - tg.create_task(self.mass.players.cmd_pause(member.player_id)) + Create a new PlayerGroup with given name and members. - async def cmd_power(self, player_id: str, powered: bool) -> None: - """Send POWER command to given player.""" - group_power_on = await self.mass.config.get_player_config_value( - player_id, CONF_GROUPED_POWER_ON + - name: Name for the new group to create. + - members: A list of player_id's that should be part of this group. + """ + new_group_id = f"{UGP_PREFIX}{shortuuid.random(8).lower()}" + # cleanup list, filter groups (should be handled by frontend, but just in case) + members = [ + x.player_id + for x in self.mass.players + if x.player_id in members + if x.provider != self.instance_id + ] + # create default config with the user chosen name + self.mass.config.create_default_player_config( + new_group_id, + self.instance_id, + name=name, + enabled=True, + values={CONF_GROUP_MEMBERS: members}, ) - mute_childs = self.mass.config.get_raw_player_config_value(player_id, CONF_MUTE_CHILDS, []) - group_player = self.mass.players.get(player_id) - - async def set_child_power(child_player: Player) -> None: - # do not turn on the player if not explicitly requested - # so either the group player turns off OR - # it turns ON and we have the group_power_on config option enabled - if not (not powered or group_power_on): - return - # make sure to disable the mute as power workaround, - # otherwise the player keeps on playing "invisible" - if not powered and child_player.player_id in mute_childs: - child_player.mute_as_power = False - if child_player.volume_muted: - await self.mass.players.cmd_volume_mute(child_player.player_id, False) - # send actual power command to child player - await self.mass.players.cmd_power(child_player.player_id, powered) - - # set optimistic state on child player to prevent race conditions in other actions - child_player.powered = powered - - # turn on/off child players if needed - async with asyncio.TaskGroup() as tg: - for member in self._get_active_members( - player_id, only_powered=False, skip_sync_childs=False - ): - tg.create_task(set_child_power(member)) - - # (re)set mute_as_power feature for group members - for child_player_id in mute_childs: - if child_player := self.mass.players.get(child_player_id): - child_player.mute_as_power = powered - - group_player.powered = powered - if not powered: - group_player.extra_data["optimistic_state"] = PlayerState.IDLE - self.mass.players.update(player_id) - if powered: - # sync all players on power on - await self._sync_players(player_id) - - async def cmd_volume_set(self, player_id: str, volume_level: int) -> None: - """Send VOLUME_SET command to given player.""" - # group volume is already handled in the player manager - - async def cmd_volume_mute(self, player_id: str, muted: bool) -> None: - """Send VOLUME MUTE command to given player.""" + player = self._register_group_player(new_group_id, name=name, members=members) + return player + + async def _register_all_players(self) -> None: + """Register all (virtual/fake) group players in the Player controller.""" + player_configs = await self.mass.config.get_player_configs(self.instance_id) + for player_config in player_configs: + members = player_config.get_value(CONF_GROUP_MEMBERS) + self._register_group_player( + player_config.player_id, player_config.name or player_config.default_name, members + ) - async def poll_player(self, player_id: str) -> None: - """Poll player for state updates.""" - self.update_attributes(player_id) - self.mass.players.update(player_id, skip_forward=True) + def _register_group_player( + self, group_player_id: str, name: str, members: Iterable[str] + ) -> Player: + """Register a UGP group player in the Player controller.""" + player = Player( + player_id=group_player_id, + provider=self.instance_id, + type=PlayerType.SYNC_GROUP, + name=name, + available=True, + powered=False, + device_info=DeviceInfo(model="Group", manufacturer=self.name), + supported_features=(PlayerFeature.VOLUME_SET, PlayerFeature.POWER), + group_childs=set(members), + ) + self.mass.players.register_or_update(player) + return player def update_attributes(self, player_id: str) -> None: """Update player attributes.""" group_player = self.mass.players.get(player_id) if not group_player.powered: group_player.state = PlayerState.IDLE - group_player.active_source = None return - all_members = self._get_active_members( - player_id, only_powered=False, skip_sync_childs=False - ) - group_player.group_childs = list(x.player_id for x in all_members) - group_player.active_source = player_id # read the state from the first active group member - for member in all_members: - if member.synced_to: - continue - if member.mute_as_power: - player_powered = member.powered and not member.volume_muted - else: - player_powered = member.powered - if not player_powered: - continue + for member in self.mass.players.iter_group_members(group_player, only_powered=True): group_player.current_item_id = member.current_item_id group_player.elapsed_time = member.elapsed_time group_player.elapsed_time_last_updated = member.elapsed_time_last_updated group_player.state = member.state break - async def on_child_power(self, player_id: str, child_player: Player, new_power: bool) -> None: + def on_child_power(self, player_id: str, child_player_id: str, new_power: bool) -> None: """ - Call when a power command was executed on one of the child players. + Call when a power command was executed on one of the child player of a PlayerGroup. - This is used to handle special actions such as mute-as-power or (re)syncing. + This is used to handle special actions such as (re)syncing. """ group_player = self.mass.players.get(player_id) + child_player = self.mass.players.get(child_player_id) if not group_player.powered: # guard, this should be caught in the player controller but just in case... return - powered_childs = self._get_active_members(player_id, True, False) - if not new_power and child_player in powered_childs: - powered_childs.remove(child_player) + powered_childs = [ + x + for x in self.mass.players.iter_group_members(group_player, True) + if not (not new_power and x.player_id == child_player_id) + ] + if new_power and child_player not in powered_childs: + powered_childs.append(child_player) # if the last player of a group turned off, turn off the group if len(powered_childs) == 0: self.logger.debug( - "Group %s has no more powered members, turning off group player", player_id + "Group %s has no more powered members, turning off group player", + group_player.display_name, ) self.mass.create_task(self.cmd_power(player_id, False)) return False - group_playing = group_player.extra_data["optimistic_state"] == PlayerState.PLAYING # if a child player turned ON while the group player is already playing # we need to resync/resume - if new_power and group_playing: - if sync_leader := next( - (x for x in child_player.can_sync_with if x in self.prev_sync_leaders[player_id]), - None, - ): - # prevent resume when player platform supports sync - # and one of its players is already playing - self.logger.debug( - "Groupplayer %s forced resync due to groupmember change", player_id - ) - self.mass.create_task( - self.mass.players.cmd_sync(child_player.player_id, sync_leader) - ) - else: - # send active source because the group may be within another group - self.logger.debug( - "Groupplayer %s forced resume due to groupmember change", player_id - ) - self.mass.create_task(self.mass.player_queues.resume(group_player.active_source)) - elif ( - not new_power - and group_playing - and child_player.player_id in self.prev_sync_leaders[player_id] - and not child_player.mute_as_power - ): - # a sync master player turned OFF while the group player - # should still be playing - we need to resync/resume - # send atcive source because the group may be within another group - self.logger.debug("Groupplayer %s forced resume due to groupmember change", player_id) - self.mass.create_task(self.mass.player_queues.resume, group_player.active_source) - - def _get_active_members( - self, - player_id: str, - only_powered: bool = False, - skip_sync_childs: bool = True, - ) -> list[Player]: - """Get (child) players attached to a grouped player.""" - child_players: list[Player] = [] - conf_members: list[str] = self.config.get_value(player_id) - ignore_ids = set() - if group_player := self.mass.players.get(player_id): - parent_source = group_player.active_source - else: - parent_source = player_id - for child_id in conf_members: - if child_player := self.mass.players.get(child_id, False): - if not child_player.available: - continue - # work out power state - if child_player.mute_as_power: - player_powered = child_player.powered and not child_player.volume_muted - else: - player_powered = child_player.powered - if not (not only_powered or player_powered): - continue - if child_player.synced_to and skip_sync_childs: - continue - allowed_sources = [child_player.player_id, player_id, parent_source] + conf_members - if child_player.active_source not in allowed_sources: - # edge case: the child player has another group already active! - continue - if child_player.synced_to and child_player.synced_to not in allowed_sources: - # edge case: the child player is already synced to another player - continue - child_players.append(child_player) - # handle edge case where a group is in the group and both the group - # and (one of its) child's are added to this universal group. - if child_player.type == PlayerType.GROUP: - ignore_ids.update( - x for x in child_player.group_childs if x != child_player.player_id - ) - return [x for x in child_players if x.player_id not in ignore_ids] - - async def _sync_players(self, player_id: str) -> None: - """Sync all (possible) players.""" - sync_leaders = set() - # TODO: sort members on sync master priority attribute ? - for member in self._get_active_members(player_id, only_powered=True): - if member.synced_to is not None: - continue - if not member.can_sync_with: - continue - # check if we can join this player to an already chosen sync leader - if existing_leader := next( - (x for x in member.can_sync_with if x in sync_leaders), None - ): - await self.mass.players.cmd_sync(member.player_id, existing_leader) - # set optimistic state to prevent race condition in play media - member.synced_to = existing_leader - continue - # pick this member as new sync leader - sync_leaders.add(member.player_id) - self.prev_sync_leaders[player_id] = tuple(sync_leaders) + if new_power and group_player.state == PlayerState.PLAYING: + self.logger.warning( + "Player %s turned on while syncgroup is playing, " + "a forced resume for %s will be performed...", + child_player.display_name, + group_player.display_name, + ) + self.mass.loop.call_later( + 1, self.mass.create_task, self.mass.player_queues.resume(group_player.player_id) + ) -- 2.34.1