"[python]": {
"editor.formatOnSave": true,
"editor.codeActionsOnSave": {
- "source.fixAll": true,
- "source.organizeImports": true
+ "source.fixAll": "explicit",
+ "source.organizeImports": "explicit"
}
},
"python.formatting.provider": "black",
CONF_EQ_MID,
CONF_EQ_TREBLE,
CONF_FLOW_MODE,
- CONF_HIDE_GROUP_CHILDS,
CONF_LOG_LEVEL,
CONF_OUTPUT_CHANNELS,
CONF_VOLUME_NORMALIZATION,
advanced=True,
)
-CONF_ENTRY_HIDE_GROUP_MEMBERS = ConfigEntry(
- key=CONF_HIDE_GROUP_CHILDS,
- type=ConfigEntryType.STRING,
- options=[
- ConfigValueOption("Always", "always"),
- ConfigValueOption("Only if the group is active/powered", "active"),
- ConfigValueOption("Never", "never"),
- ],
- default_value="active",
- label="Hide playergroup members in UI",
- description="Hide the individual player entry for the members of this group "
- "in the user interface.",
- advanced=False,
-)
CONF_ENTRY_CROSSFADE = ConfigEntry(
key=CONF_CROSSFADE,
"""Enum with possible Player Types.
player: A regular player.
- group: A (dedicated) group player or playergroup.
- stereo_pair: Two speakers playing as one stereo pair.
+ group: A (dedicated) group player or (universal) playergroup.
+ sync_group: A group/preset of players that can be synced together.
"""
PLAYER = "player"
GROUP = "group"
- STEREO_PAIR = "stereo_pair"
+ SYNC_GROUP = "sync_group"
class PlayerFeature(StrEnum):
sync: The player supports syncing with other players (of the same platform).
accurate_time: The player provides millisecond accurate timing information.
seek: The player supports seeking to a specific.
- set_members: The PlayerGroup supports adding/removing members.
- queue: The player supports (en)queuing of media items.
+ queue: The player supports (en)queuing of media items natively.
"""
POWER = "power"
VOLUME_MUTE = "volume_mute"
PAUSE = "pause"
SYNC = "sync"
- ACCURATE_TIME = "accurate_time"
SEEK = "seek"
ENQUEUE_NEXT = "enqueue_next"
- CROSSFADE = "crossfade"
class EventType(StrEnum):
#
# PLAYERPROVIDER FEATURES
#
- # we currently have none ;-)
+ PLAYER_GROUP_CREATE = "player_group_create"
+ SYNC_PLAYERS = "sync_players"
#
# METADATAPROVIDER FEATURES
# enabled: if the player is enabled
# will be set by the player manager based on config
# a disabled player is hidden in the UI and updates will not be processed
+ # nor will it be added to the HA integration
enabled: bool = True
# hidden_by: if the player is enabled
# will be set by the player manager based on config
- # a disabled player is hidden in the UI only
+ # a disabled player is hidden in the UI only but can still be controlled
hidden_by: set = field(default_factory=set)
# group_volume: if the player is a player group or syncgroup master,
# and pass along freely
extra_data: dict[str, Any] = field(default_factory=dict)
- # mute_as_power: special feature from the universal group
- mute_as_power: bool = False
-
@property
def corrected_elapsed_time(self) -> float:
"""Return the corrected/realtime elapsed time."""
CONF_FLOW_MODE: Final[str] = "flow_mode"
CONF_LOG_LEVEL: Final[str] = "log_level"
CONF_HIDE_GROUP_CHILDS: Final[str] = "hide_group_childs"
-CONF_GROUPED_POWER_ON: Final[str] = "grouped_power_on"
CONF_CROSSFADE_DURATION: Final[str] = "crossfade_duration"
CONF_BIND_IP: Final[str] = "bind_ip"
CONF_BIND_PORT: Final[str] = "bind_port"
CONF_PUBLISH_IP: Final[str] = "publish_ip"
CONF_AUTO_PLAY: Final[str] = "auto_play"
+CONF_GROUP_PLAYERS: Final[str] = "group_players"
CONF_CROSSFADE: Final[str] = "crossfade"
+CONF_GROUP_MEMBERS: Final[str] = "group_members"
# config default values
DEFAULT_HOST: Final[str] = "0.0.0.0"
"music",
"player_queues",
)
+SYNCGROUP_PREFIX: Final[str] = "syncgroup_"
Note that this only returns the stored value without any validation or default.
"""
- return self.get(f"{CONF_PLAYERS}/{player_id}/values/{key}", default)
+ return self.get(
+ f"{CONF_PLAYERS}/{player_id}/values/{key}",
+ self.get(f"{CONF_PLAYERS}/{player_id}/{key}", default),
+ )
@api_command("config/players/save")
async def save_player_config(
provider.on_player_config_removed(player_id)
def create_default_player_config(
- self, player_id: str, provider: str, name: str, enabled: bool
+ self,
+ player_id: str,
+ provider: str,
+ name: str,
+ enabled: bool,
+ values: dict[str, ConfigValueType] | None = None,
) -> None:
"""
Create default/empty PlayerConfig.
# return early if the config already exists
if self.get(f"{CONF_PLAYERS}/{player_id}"):
# update default name if needed
- self.set(f"{CONF_PLAYERS}/{player_id}/default_name", name)
+ if name:
+ self.set(f"{CONF_PLAYERS}/{player_id}/default_name", name)
return
# config does not yet exist, create a default one
conf_key = f"{CONF_PLAYERS}/{player_id}"
default_conf = PlayerConfig(
values={}, provider=provider, player_id=player_id, enabled=enabled, default_name=name
)
+ default_conf_raw = default_conf.to_raw()
+ if values is not None:
+ default_conf_raw["values"] = values
self.set(
conf_key,
- default_conf.to_raw(),
+ default_conf_raw,
)
async def create_default_provider_config(self, provider_domain: str) -> None:
"""
return self.get(f"{CONF_CORE}/{core_module}/{key}", default)
+ def get_raw_provider_config_value(
+ self, provider_instance: str, key: str, default: ConfigValueType = None
+ ) -> ConfigValueType:
+ """
+ Return (raw) single config(entry) value for a provider.
+
+ Note that this only returns the stored value without any validation or default.
+ """
+ return self.get(f"{CONF_PROVIDERS}/{provider_instance}/{key}", default)
+
+ def set_raw_provider_config_value(
+ self, provider_instance: str, key: str, value: ConfigValueType
+ ) -> None:
+ """
+ Set (raw) single config(entry) value for a provider.
+
+ Note that this only returns the stored value without any validation or default.
+ """
+ return self.set(f"{CONF_PROVIDERS}/{provider_instance}/{key}", value)
+
def save(self, immediate: bool = False) -> None:
"""Schedule save of data to disk."""
self._value_cache = {}
await self.mass.unload_provider(config.instance_id)
if config.type == ProviderType.PLAYER:
# cleanup entries in player manager
- for player in self.mass.players.all(
- return_unavailable=True, return_hidden=True, return_disabled=True
- ):
+ for player in self.mass.players.all(return_unavailable=True, return_disabled=True):
if player.provider != instance_id:
continue
self.mass.players.remove(player.player_id, cleanup_config=False)
assert queue.current_item.media_item.media_type == MediaType.TRACK
assert queue.current_item.duration
assert position < queue.current_item.duration
- player = self.mass.players.get(queue_id)
- if PlayerFeature.SEEK in player.supported_features:
- player_prov = self.mass.players.get_player_provider(queue_id)
- await player_prov.cmd_seek(player.player_id, position)
- return
await self.play_index(queue_id, queue.current_index, position)
@api_command("players/queue/resume")
queue.index_in_buffer = index
queue.flow_mode_start_index = index
queue.flow_mode = False # reset
- player_prov = self.mass.players.get_player_provider(queue_id)
- await player_prov.play_media(
+ await self.mass.players.play_media(
player_id=queue_id,
queue_item=queue_item,
seek_position=int(seek_position),
return # already enqueued
async def _enqueue_next(index: int):
- player_prov = self.mass.players.get_player_provider(player.player_id)
with suppress(QueueEmpty):
next_item = await self.preload_next_item(queue.queue_id, index)
- await player_prov.enqueue_next_queue_item(
+ await self.mass.players.enqueue_next_queue_item(
player_id=player.player_id, queue_item=next_item
)
import asyncio
import functools
import logging
-from collections.abc import Awaitable, Callable, Coroutine, Iterator
+from collections.abc import Awaitable, Callable, Coroutine, Iterable, Iterator
from typing import TYPE_CHECKING, Any, Concatenate, ParamSpec, TypeVar, cast
+import shortuuid
+
from music_assistant.common.helpers.util import get_changed_values
from music_assistant.common.models.enums import (
EventType,
PlayerFeature,
PlayerState,
PlayerType,
+ ProviderFeature,
ProviderType,
)
from music_assistant.common.models.errors import (
AlreadyRegisteredError,
PlayerCommandFailed,
PlayerUnavailableError,
+ ProviderUnavailableError,
UnsupportedFeaturedException,
)
-from music_assistant.common.models.player import Player
+from music_assistant.common.models.player import DeviceInfo, Player
+from music_assistant.common.models.queue_item import QueueItem
from music_assistant.constants import (
CONF_AUTO_PLAY,
- CONF_HIDE_GROUP_CHILDS,
+ CONF_GROUP_MEMBERS,
CONF_PLAYERS,
ROOT_LOGGER_NAME,
+ SYNCGROUP_PREFIX,
)
from music_assistant.server.helpers.api import api_command
from music_assistant.server.models.core_controller import CoreController
def all(
self,
return_unavailable: bool = True,
- return_hidden: bool = True,
return_disabled: bool = False,
) -> tuple[Player, ...]:
"""Return all registered players."""
return tuple(
player
for player in self._players.values()
- if (player.available or return_unavailable)
- and (not player.hidden_by or return_hidden)
- and (player.enabled or return_disabled)
+ if (player.available or return_unavailable) and (player.enabled or return_disabled)
)
@api_command("players/get")
if not player.enabled:
return
+ # initialize sync groups as soon as a player is registered
+ self.mass.loop.create_task(self._register_syncgroups())
+
LOGGER.info(
"Player registered: %s/%s",
player_id,
player.active_source = self._get_active_source(player)
# calculate group volume
player.group_volume = self._get_group_volume_level(player)
- if player.type == PlayerType.GROUP:
+ if player.type in (PlayerType.GROUP, PlayerType.SYNC_GROUP):
player.volume_level = player.group_volume
# prefer any overridden name from config
player.display_name = (
or player.name
or player.player_id
)
- # handle special mute_as_power feature
- if player.mute_as_power:
- player.powered = player.powered and not player.volume_muted
- elif player.state == PlayerState.PLAYING and not player.powered:
+ if (
+ not player.powered
+ and player.state == PlayerState.PLAYING
+ and PlayerFeature.POWER not in player.supported_features
+ and player.active_source == player_id
+ ):
# mark player as powered if its playing
# could happen for players that do not officially support power commands
player.powered = True
+ # handle syncgroup - get attributes from first player that has this group as source
+ if player.player_id.startswith(SYNCGROUP_PREFIX):
+ if sync_leader := self.get_sync_leader(player):
+ player.state = sync_leader.state
+ player.current_item_id = sync_leader.current_item_id
+ player.elapsed_time = sync_leader.elapsed_time
+ player.elapsed_time_last_updated = sync_leader.elapsed_time_last_updated
+ else:
+ player.state = PlayerState.IDLE
+
# basic throttle: do not send state changed events if player did not actually change
prev_state = self._prev_states.get(player_id, {})
new_state = self._players[player_id].to_dict()
if skip_forward:
return
- if player.type == PlayerType.GROUP:
- # update group player child's when parent updates
- hide_group_childs = self.mass.config.get_raw_player_config_value(
- player.player_id, CONF_HIDE_GROUP_CHILDS, "active"
- )
- for child_player in self._get_child_players(player):
+ # update/signal group player(s) child's when group updates
+ if player.type in (PlayerType.GROUP, PlayerType.SYNC_GROUP):
+ for child_player in self.iter_group_members(player):
if child_player.player_id == player.player_id:
continue
- # handle 'hide group childs' feature here
- if hide_group_childs == "always": # noqa: SIM114
- child_player.hidden_by.add(player.player_id)
- elif player.powered and hide_group_childs == "active":
- child_player.hidden_by.add(player.player_id)
- elif not player.powered and player.player_id in child_player.hidden_by:
- child_player.hidden_by.remove(player.player_id)
self.update(child_player.player_id, skip_forward=True)
-
- # update group player(s) when child updates
- for group_player in self._get_player_groups(player_id):
- if not group_player.available:
- continue
+ # update/signal group player(s) when child updates
+ for group_player in self._get_player_groups(player, powered_only=False):
player_prov = self.get_player_provider(group_player.player_id)
if not player_prov:
continue
- self.mass.create_task(player_prov.poll_player(group_player.player_id))
+ if group_player.player_id.startswith(SYNCGROUP_PREFIX):
+ self.update(group_player.player_id, skip_forward=True)
+ else:
+ self.mass.create_task(player_prov.poll_player(group_player.player_id))
def get_player_provider(self, player_id: str) -> PlayerProvider:
"""Return PlayerProvider for given player."""
- player_id: player_id of the player to handle the command.
- powered: bool if player should be powered on or off.
"""
- # TODO: Implement PlayerControl
+ if player_id.startswith(SYNCGROUP_PREFIX):
+ await self.cmd_group_power(player_id, powered)
+ return
player = self.get(player_id, True)
- cur_power = (
- (player.powered and not player.volume_muted) if player.mute_as_power else player.powered
- )
- if cur_power == powered:
+ if player.powered == powered:
return # nothing to do
+ # inform (active) group player if needed
+ # NOTE: this must be on the top to prevent race conditions
+ if active_group_player := self._get_active_player_group(player):
+ if active_group_player.player_id.startswith(SYNCGROUP_PREFIX):
+ self._on_syncgroup_child_power(
+ active_group_player.player_id, player.player_id, powered
+ )
+ elif player_prov := self.get_player_provider(active_group_player.player_id):
+ player_prov.on_child_power(active_group_player.player_id, player.player_id, powered)
# stop player at power off
if (
not powered
and player.state in (PlayerState.PLAYING, PlayerState.PAUSED)
and not player.synced_to
- and not player.mute_as_power
+ and player.powered
):
await self.cmd_stop(player_id)
# unsync player at power off
- if not powered and not player.mute_as_power:
+ if not powered:
if player.synced_to is not None:
await self.cmd_unsync(player_id)
- for child in self._get_child_players(player):
+ for child in self.iter_group_members(player):
if not child.synced_to:
continue
await self.cmd_unsync(child.player_id)
- if player.mute_as_power:
- # handle mute as power feature
- await self.cmd_volume_mute(player_id, not powered)
-
- # restore mute if needed on poweroff
- if (
- not powered
- and player.volume_muted
- and not player.mute_as_power
- and PlayerFeature.VOLUME_MUTE not in player.supported_features
- ):
- await self.cmd_volume_mute(player_id, False)
-
- if PlayerFeature.POWER not in player.supported_features:
- # player does not support power, use fake state instead
- player.powered = powered
- self.update(player_id)
- elif powered or not player.mute_as_power:
- # regular power command
+ if PlayerFeature.POWER in player.supported_features:
+ # forward to player provider
player_provider = self.get_player_provider(player_id)
await player_provider.cmd_power(player_id, powered)
- # handle forward to (active) group player if needed
- for group_player in self._get_player_groups(player_id):
- if not group_player.available:
- continue
- if not group_player.powered:
- continue
- if player_prov := self.get_player_provider(group_player.player_id):
- await player_prov.on_child_power(group_player.player_id, player, powered)
- break
- else:
- # auto play feature
- if powered and self.mass.config.get_raw_player_config_value(
- player_id, CONF_AUTO_PLAY, False
- ):
- await self.mass.player_queues.resume(player_id)
+ # always optimistically set the power state to update the UI
+ # as fast as possible and prevent race conditions
+ player.powered = powered
+ self.update(player_id)
+ # handle 'auto play on power on' feature
+ if (
+ powered
+ and self.mass.config.get_raw_player_config_value(player_id, CONF_AUTO_PLAY, False)
+ and player.active_source in (None, player_id)
+ ):
+ await self.mass.player_queues.resume(player_id)
@api_command("players/cmd/volume_set")
@log_player_command
"""
# TODO: Implement PlayerControl
player = self.get(player_id, True)
- if player.type == PlayerType.GROUP:
+ if player.type in (PlayerType.GROUP, PlayerType.SYNC_GROUP):
# redirect to group volume control
await self.cmd_group_volume(player_id, volume_level)
return
if PlayerFeature.VOLUME_SET not in player.supported_features:
- LOGGER.warning(
- "Volume set command called but player %s does not support volume",
- player_id,
+ raise UnsupportedFeaturedException(
+ f"Player {player.display_name} does not support volume_set"
)
- player.volume_level = volume_level
- self.update(player_id)
- return
player_provider = self.get_player_provider(player_id)
await player_provider.cmd_volume_set(player_id, volume_level)
new_volume = volume_level
volume_dif = new_volume - cur_volume
coros = []
- for child_player in self._get_child_players(group_player, True):
+ for child_player in self.iter_group_members(group_player, True):
cur_child_volume = child_player.volume_level
new_child_volume = int(cur_child_volume + volume_dif)
new_child_volume = max(0, new_child_volume)
coros.append(self.cmd_volume_set(child_player.player_id, new_child_volume))
await asyncio.gather(*coros)
+ @api_command("players/cmd/group_power")
+ async def cmd_group_power(self, player_id: str, power: bool) -> None:
+ """Handle power command for a PlayerGroup."""
+ group_player = self.get(player_id, True)
+
+ group_player.powered = power
+ if not power:
+ group_player.state = PlayerState.IDLE
+
+ async with asyncio.TaskGroup() as tg:
+ members_powered = False
+ for member in self.iter_group_members(group_player, only_powered=True):
+ members_powered = True
+ if power:
+ # set active source to group player if the group (is going to be) powered
+ member.active_source = group_player.player_id
+ elif member.active_source == group_player.player_id:
+ # turn off child player when group turns off
+ tg.create_task(self.cmd_power(member.player_id, False))
+ # edge case: group turned on but no members are powered, power them all!
+ if not members_powered and power:
+ for member in self.iter_group_members(group_player, only_powered=False):
+ tg.create_task(self.cmd_power(member.player_id, True))
+ member.active_source = group_player.player_id
+
+ if power and group_player.player_id.startswith(SYNCGROUP_PREFIX):
+ await self._sync_syncgroup(group_player.player_id)
+ self.update(player_id)
+
@api_command("players/cmd/volume_mute")
@log_player_command
async def cmd_volume_mute(self, player_id: str, muted: bool) -> None:
player = self.get(player_id, True)
assert player
if PlayerFeature.VOLUME_MUTE not in player.supported_features:
- LOGGER.debug("Mute command called but player %s does not support muting", player_id)
- player.volume_muted = muted
- # use volume to process the muting
- cache_key = f"prev_volume_muting_{player_id}"
- if muted:
- await self.mass.cache.set(cache_key, player.volume_level)
- await self.cmd_volume_set(player_id, 0)
- else:
- prev_volume = await self.mass.cache.get(cache_key, default=10)
- await self.cmd_volume_set(player_id, prev_volume)
- self.update(player_id)
- return
- # TODO: Implement PlayerControl
+ raise UnsupportedFeaturedException(
+ f"Player {player.display_name} does not support muting"
+ )
player_provider = self.get_player_provider(player_id)
await player_provider.cmd_volume_mute(player_id, muted)
+ @api_command("players/cmd/seek")
+ async def cmd_seek(self, player_id: str, position: int) -> None:
+ """Handle SEEK command for given queue.
+
+ - player_id: player_id of the player to handle the command.
+ - position: position in seconds to seek to in the current playing item.
+ """
+ player_id = self._check_redirect(player_id)
+
+ player = self.get(player_id, True)
+ if PlayerFeature.SEEK not in player.supported_features:
+ raise UnsupportedFeaturedException(
+ f"Player {player.display_name} does not support seeking"
+ )
+ player_prov = self.mass.players.get_player_provider(player_id)
+ await player_prov.cmd_seek(player_id, position)
+
+ async def play_media(
+ self,
+ player_id: str,
+ queue_item: QueueItem,
+ seek_position: int,
+ fade_in: bool,
+ ) -> None:
+ """Handle PLAY MEDIA on given player.
+
+ This is called by the Queue controller to start playing a queue item on the given player.
+ The provider's own implementation should work out how to handle this request.
+
+ - player_id: player_id of the player to handle the command.
+ - queue_item: The QueueItem that needs to be played on the player.
+ - seek_position: Optional seek to this position.
+ - fade_in: Optionally fade in the item at playback start.
+ """
+ if player_id.startswith(SYNCGROUP_PREFIX):
+ # redirect to syncgroup-leader if needed
+ await self.cmd_group_power(player_id, True)
+ group_player = self.get(player_id, True)
+ if sync_leader := self.get_sync_leader(group_player):
+ await self.play_media(
+ sync_leader.player_id,
+ queue_item=queue_item,
+ seek_position=seek_position,
+ fade_in=fade_in,
+ )
+ group_player.state = PlayerState.PLAYING
+ return
+ player_prov = self.mass.players.get_player_provider(player_id)
+ await player_prov.play_media(
+ player_id=player_id,
+ queue_item=queue_item,
+ seek_position=int(seek_position),
+ fade_in=fade_in,
+ )
+
+ async def enqueue_next_queue_item(self, player_id: str, queue_item: QueueItem):
+ """
+ Handle enqueuing of the next queue item on the player.
+
+ If the player supports PlayerFeature.ENQUE_NEXT:
+ This will be called about 10 seconds before the end of the track.
+ If the player does NOT report support for PlayerFeature.ENQUE_NEXT:
+ This will be called when the end of the track is reached.
+
+ A PlayerProvider implementation is in itself responsible for handling this
+ so that the queue items keep playing until its empty or the player stopped.
+
+ This will NOT be called if the end of the queue is reached (and repeat disabled).
+ This will NOT be called if the player is using flow mode to playback the queue.
+ """
+ if player_id.startswith(SYNCGROUP_PREFIX):
+ # redirect to syncgroup-leader if needed
+ group_player = self.get(player_id, True)
+ if sync_leader := self.get_sync_leader(group_player):
+ await self.enqueue_next_queue_item(
+ sync_leader.player_id,
+ queue_item=queue_item,
+ )
+ return
+ player_prov = self.mass.players.get_player_provider(player_id)
+ await player_prov.enqueue_next_queue_item(player_id=player_id, queue_item=queue_item)
+
@api_command("players/cmd/sync")
@log_player_command
async def cmd_sync(self, player_id: str, target_player: str) -> None:
"""Handle SYNC command for given player.
- Join/add the given player(id) to the given (master) player/sync group.
+ Join/add the given player(id) to the given (leader) player/sync group.
If the player is already synced to another player, it will be unsynced there first.
- If the target player itself is already synced to another player, this will fail.
- If the player can not be synced with the given target player, this will fail.
+ If the target player itself is already synced to another player, this may fail.
+ If the player can not be synced with the given target player, this may fail.
- player_id: player_id of the player to handle the command.
- - target_player: player_id of the syncgroup master or group player.
+ - target_player: player_id of the syncgroup leader or group player.
"""
child_player = self.get(player_id, True)
parent_player = self.get(target_player, True)
raise UnsupportedFeaturedException(
f"Player {parent_player.name} does not support (un)sync commands"
)
- if parent_player.synced_to is not None:
- raise PlayerCommandFailed(
- f"Player {target_player} is already synced to another player."
- )
if player_id not in parent_player.can_sync_with:
raise PlayerCommandFailed(f"Player {player_id} can not be synced to {target_player}.")
if child_player.synced_to:
# all checks passed, forward command to the player provider
player_provider = self.get_player_provider(player_id)
await player_provider.cmd_sync(player_id, target_player)
- child_player.hidden_by.add(target_player)
# optimistically update the player to update the UI as fast as possible
- self.mass.create_task(player_provider.poll_player(player_id))
+ parent_player.group_childs.add(player_id)
+ self.update(player_id)
@api_command("players/cmd/unsync")
@log_player_command
return
# all checks passed, forward command to the player provider
- if player.synced_to in player.hidden_by:
- player.hidden_by.remove(player.synced_to)
player_provider = self.get_player_provider(player_id)
await player_provider.cmd_unsync(player_id)
# optimistically update the player to update the UI as fast as possible
- self.mass.create_task(player_provider.poll_player(player_id))
+ player.synced_to = None
+ self.update(player_id)
+
+ @api_command("players/create_group")
+ async def create_group(self, provider: str, name: str, members: list[str]) -> Player:
+ """Create new Player/Sync Group on given PlayerProvider with name and members.
+
+ - provider: provider domain or instance id to create the new group on.
+ - name: Name for the new group to create.
+ - members: A list of player_id's that should be part of this group.
+
+ Returns the newly created player on success.
+ NOTE: Fails if the given provider does not support creating new groups
+ or members are given that can not be handled by the provider.
+ """
+ # perform basic checks
+ if (player_prov := self.mass.get_provider(provider)) is None:
+ raise ProviderUnavailableError(f"Provider {provider} is not available!")
+ if ProviderFeature.PLAYER_GROUP_CREATE in player_prov.supported_features:
+ # provider supports group create feature: forward request to provider
+ # the provider is itself responsible for
+ # checking if the members can be used for grouping
+ return await player_prov.create_group(name, members=members)
+ if ProviderFeature.SYNC_PLAYERS in player_prov.supported_features:
+ # default syncgroup implementation
+ return await self._create_syncgroup(provider, name, members)
+ raise UnsupportedFeaturedException(
+ f"Provider {player_prov.name} does not support creating groups"
+ )
def _check_redirect(self, player_id: str) -> str:
"""Check if playback related command should be redirected."""
player = self.get(player_id, True)
+ if player_id.startswith(SYNCGROUP_PREFIX) and (sync_leader := self.get_sync_leader(player)):
+ return sync_leader.player_id
if player.synced_to:
- sync_master = self.get(player.synced_to, True)
+ sync_leader = self.get(player.synced_to, True)
LOGGER.warning(
"Player %s is synced to %s and can not accept "
"playback related commands itself, "
"redirected the command to the sync leader.",
player.name,
- sync_master.name,
+ sync_leader.name,
)
return player.synced_to
return player_id
- def _get_player_groups(self, player_id: str) -> tuple[Player, ...]:
- """Return all (player_ids of) any groupplayers the given player belongs to."""
- return tuple(x for x in self if x.type == PlayerType.GROUP and player_id in x.group_childs)
+ def _get_player_groups(
+ self, player: Player, available_only: bool = True, powered_only: bool = False
+ ) -> Iterator[Player]:
+ """Return all groupplayers the given player belongs to."""
+ for _player in self:
+ if _player.player_id == player.player_id:
+ continue
+ if _player.type not in (PlayerType.GROUP, PlayerType.SYNC_GROUP):
+ continue
+ if available_only and not _player.available:
+ continue
+ if powered_only and not _player.powered:
+ continue
+ if (
+ player.player_id in _player.group_childs
+ or player.active_source == _player.player_id
+ ):
+ yield _player
+
+ def _get_active_player_group(self, player: Player) -> Player | None:
+ """Return the currently active groupplayer for the given player (if any)."""
+ # prefer active source group
+ for group_player in self._get_player_groups(player, available_only=True, powered_only=True):
+ if player.active_source in (group_player.player_id, group_player.active_source):
+ return group_player
+ # fallback to just the first powered group
+ for group_player in self._get_player_groups(player, available_only=True, powered_only=True):
+ return group_player
+ return None
def _get_active_source(self, player: Player) -> str:
"""Return the active_source id for given player."""
- # if player is synced, return master/group leader's active source
+ # if player is synced, return group leader's active source
if player.synced_to and (parent_player := self.get(player.synced_to)):
return self._get_active_source(parent_player)
- # iterate player groups to find out if one is playing
- if group_players := self._get_player_groups(player.player_id):
- # prefer the first playing (or paused) group parent
- for group_player in group_players:
- # if the group player's playerid is within the current_item_id
- # this group is definitely active
- if player.current_item_id and group_player.player_id in player.current_item_id:
- return group_player.player_id
- # fallback to the first powered group player
- for group_player in group_players:
- if group_player.powered:
- return group_player.player_id
+ if active_player_group := self._get_active_player_group(player):
+ return active_player_group.player_id
# defaults to the player's own player id if not active source set
return player.active_source or player.player_id
# calculate group volume from all (turned on) players
group_volume = 0
active_players = 0
- for child_player in self._get_child_players(player, True):
+ for child_player in self.iter_group_members(player, True):
group_volume += child_player.volume_level
active_players += 1
if active_players:
group_volume = group_volume / active_players
return int(group_volume)
- def _get_child_players(
+ def iter_group_members(
self,
- player: Player,
+ group_player: Player,
only_powered: bool = False,
only_playing: bool = False,
- ) -> list[Player]:
+ ) -> Iterator[Player]:
"""Get (child) players attached to a grouped player."""
- child_players: list[Player] = []
- for child_id in player.group_childs:
+ for child_id in list(group_player.group_childs):
if child_player := self.get(child_id, False):
if not child_player.available:
continue
or child_player.state in (PlayerState.PLAYING, PlayerState.PAUSED)
):
continue
- child_players.append(child_player)
- return child_players
+ yield child_player
async def _poll_players(self) -> None:
"""Background task that polls players for updates."""
# - every 30 seconds if the player is powered
# - every 10 seconds if the player is playing
if (
- (player.available and player.powered and count % 30 == 0)
- or (player.available and player_playing and count % 10 == 0)
- or count == 360
- ) and (player_prov := self.get_player_provider(player_id)):
+ player.available
+ and (
+ (player.powered and count % 30 == 0)
+ or (player_playing and count % 10 == 0)
+ or count == 360
+ )
+ and (player_prov := self.get_player_provider(player_id))
+ ):
try:
await player_prov.poll_player(player_id)
except PlayerUnavailableError:
player.available = False
player.state = PlayerState.IDLE
player.powered = False
- self.mass.loop.call_soon(self.update, player_id)
except Exception as err: # pylint: disable=broad-except
LOGGER.warning(
"Error while requesting latest state from player %s: %s",
str(err),
exc_info=err,
)
+ finally:
+ # always update player state
+ self.mass.loop.call_soon(self.update, player_id)
if count >= 360:
count = 0
await asyncio.sleep(1)
+
+ # Syncgroup specific functions/helpers
+
+ async def _create_syncgroup(self, provider: str, name: str, members: list[str]) -> Player:
+ """Create new (providers-specific) SyncGroup with given name and members."""
+ new_group_id = f"{SYNCGROUP_PREFIX}{shortuuid.random(8).lower()}"
+ # cleanup list, filter groups (should be handled by frontend, but just in case)
+ members = [
+ x.player_id
+ for x in self
+ if x.player_id in members
+ if not x.player_id.startswith(SYNCGROUP_PREFIX)
+ if x.provider == provider and PlayerFeature.SYNC in x.supported_features
+ ]
+ # create default config with the user chosen name
+ self.mass.config.create_default_player_config(
+ new_group_id,
+ provider,
+ name=name,
+ enabled=True,
+ values={CONF_GROUP_MEMBERS: members},
+ )
+ player = self._register_syncgroup(
+ group_player_id=new_group_id, provider=provider, name=name, members=members
+ )
+ return player
+
+ def get_sync_leader(self, group_player: Player) -> Player | None:
+ """Get the sync leader player for a syncgroup or synced player."""
+ if group_player.synced_to:
+ # should not happen but just in case...
+ return group_player.synced_to
+ for child_player in self.iter_group_members(
+ group_player, only_powered=True, only_playing=False
+ ):
+ if not child_player.group_childs:
+ continue
+ return child_player
+ return None
+
+ async def _sync_syncgroup(self, player_id: str) -> None:
+ """Sync all (possible) players of a syncgroup."""
+ group_player = self.get(player_id, True)
+ sync_leader = self.get_sync_leader(group_player)
+ for member in self.iter_group_members(group_player, only_powered=True):
+ if not member.can_sync_with:
+ continue
+ if not sync_leader:
+ # elect the first member as the sync leader if we do not have one
+ sync_leader = member
+ continue
+ if sync_leader.player_id == member.player_id:
+ continue
+ await self.cmd_sync(member.player_id, sync_leader.player_id)
+
+ async def _register_syncgroups(self) -> None:
+ """Register all (virtual/fake) syncgroup players."""
+ player_configs = await self.mass.config.get_player_configs()
+ for player_config in player_configs:
+ if not player_config.player_id.startswith(SYNCGROUP_PREFIX):
+ continue
+ members = player_config.get_value(CONF_GROUP_MEMBERS)
+ self._register_syncgroup(
+ group_player_id=player_config.player_id,
+ provider=player_config.provider,
+ name=player_config.name or player_config.default_name,
+ members=members,
+ )
+
+ def _register_syncgroup(
+ self, group_player_id: str, provider: str, name: str, members: Iterable[str]
+ ) -> Player:
+ """Register a (virtual/fake) syncgroup player."""
+ # extract player features from first/random player
+ for member in members:
+ if first_player := self.get(member):
+ supported_features = first_player.supported_features
+ break
+ else:
+ # edge case: no child player is (yet) available; postpone register
+ return
+ player = Player(
+ player_id=group_player_id,
+ provider=provider,
+ type=PlayerType.SYNC_GROUP,
+ name=name,
+ available=True,
+ powered=False,
+ device_info=DeviceInfo(model="SyncGroup", manufacturer=provider.title()),
+ supported_features=supported_features,
+ group_childs=set(members),
+ )
+ self.mass.players.register_or_update(player)
+ return player
+
+ def _on_syncgroup_child_power(
+ self, player_id: str, child_player_id: str, new_power: bool
+ ) -> None:
+ """
+ Call when a power command was executed on one of the child player of a Player/Sync group.
+
+ This is used to handle special actions such as (re)syncing.
+ """
+ group_player = self.mass.players.get(player_id)
+ child_player = self.mass.players.get(child_player_id)
+
+ if not group_player.powered:
+ # guard, this should be caught in the player controller but just in case...
+ return
+
+ powered_childs = [x for x in self.iter_group_members(group_player, True)]
+ if not new_power and child_player in powered_childs:
+ powered_childs.remove(child_player)
+ if new_power and child_player not in powered_childs:
+ powered_childs.append(child_player)
+
+ # if the last player of a group turned off, turn off the group
+ if len(powered_childs) == 0:
+ self.logger.debug(
+ "Group %s has no more powered members, turning off group player",
+ group_player.display_name,
+ )
+ self.mass.create_task(self.cmd_power(player_id, False))
+ return
+
+ group_playing = group_player.state == PlayerState.PLAYING
+ is_sync_leader = (
+ len(child_player.group_childs) > 0
+ and child_player.active_source == group_player.player_id
+ )
+ if group_playing and not new_power and is_sync_leader:
+ # the current sync leader player turned OFF while the group player
+ # should still be playing - we need to select a new sync leader and resume
+ self.logger.warning(
+ "Syncleader %s turned off while syncgroup is playing, "
+ "a forced resume for syngroup %s will be attempted in 5 seconds...",
+ child_player.display_name,
+ group_player.display_name,
+ )
+
+ async def forced_resync():
+ # we need to wait a bit here to not run into massive race conditions
+ await asyncio.sleep(5)
+ await self._sync_syncgroup(group_player.player_id)
+ await self.mass.player_queues.resume(group_player.player_id)
+
+ self.mass.create_task(forced_resync())
+ return
+ if new_power:
+ # if a child player turned ON while the group player is on, we need to resync/resume
+ self.mass.create_task(self._sync_syncgroup(group_player.player_id))
In case a stream is restarted (e.g. when seeking), a new MultiClientStreamJob will be created.
"""
+ _audio_task: asyncio.Task | None = None
+
def __init__(
self,
stream_controller: StreamsController,
self.bytes_streamed: int = 0
self.client_seconds_skipped: dict[str, int] = {}
self._all_clients_connected = asyncio.Event()
- # start running the audio task in the background
- self._audio_task = asyncio.create_task(self._stream_job_runner())
self.logger = stream_controller.logger.getChild(f"streamjob_{self.job_id}")
self._finished: bool = False
+ self._first_chunk: bytes = b""
@property
def finished(self) -> bool:
"""Return if this StreamJob is finished."""
- return self._finished or self._audio_task.done()
+ return self._finished or self._audio_task and self._audio_task.done()
@property
def pending(self) -> bool:
"""Return if this Job is running."""
return not self.finished and not self.pending
+ def start(self) -> None:
+ """Start running this streamjob."""
+ # start running the audio task in the background
+ self._audio_task = asyncio.create_task(self._stream_job_runner())
+
def stop(self) -> None:
"""Stop running this job."""
self._finished = True
- if self._audio_task.done():
+ if self._audio_task and self._audio_task.done():
return
- self._audio_task.cancel()
+ if self._audio_task:
+ self._audio_task.cancel()
for sub_queue in self.subscribed_players.values():
with suppress(asyncio.QueueFull):
sub_queue.put_nowait(b"")
try:
self.subscribed_players[player_id] = sub_queue = asyncio.Queue(2)
+ if self._first_chunk:
+ yield self._first_chunk
+
if self._all_clients_connected.is_set():
# client subscribes while we're already started
self.logger.debug(
async for chunk in self.stream_controller.get_flow_stream(
self.queue, self.start_queue_item, self.pcm_format, self.seek_position, self.fade_in
):
- if chunk_num == 0:
+ chunk_num += 1
+ if chunk_num == 1:
# wait until all expected clients are connected
try:
async with asyncio.timeout(10):
len(self.subscribed_players),
len(self.expected_players),
)
+
await self._put_chunk(chunk)
- chunk_num += 1
+
+ # keep first chunk to workaround (dlna) players that do multiple get requests
+ if chunk_num == 1:
+ self._first_chunk = chunk
+ await asyncio.sleep(0.1)
# mark EOF with empty chunk
await self._put_chunk(b"")
start_queue_item: QueueItem,
seek_position: int = 0,
fade_in: bool = False,
+ pcm_bit_depth: int = 24,
+ pcm_sample_rate: int = 48000,
) -> MultiClientStreamJob:
"""Create a MultiClientStreamJob for the given queue..
if not existing_job.finished:
self.logger.warning("Detected existing (running) stream job for queue %s", queue_id)
existing_job.stop()
- queue_player = self.mass.players.get(queue_id)
- pcm_bit_depth = 24 if queue_player.supports_24bit else 16
- pcm_sample_rate = min(queue_player.max_sample_rate, 96000)
self.multi_client_jobs[queue_id] = stream_job = MultiClientStreamJob(
self,
queue_id=queue_id,
reason=f"Unable to retrieve streamdetails for item: {queue_item}"
)
seek_position = int(request.query.get("seek_position", 0))
+ queue_item.streamdetails.seconds_skipped = seek_position
fade_in = bool(request.query.get("fade_in", 0))
# work out output format/details
output_format = await self._get_output_format(
chunk_num += 1
# throttle buffer, do not allow more than 30 seconds in buffer
- seconds_buffered = total_bytes_written / pcm_sample_size
+ seconds_buffered = (total_bytes_written + bytes_written) / pcm_sample_size
player = self.mass.players.get(queue.queue_id)
- while (seconds_buffered - player.corrected_elapsed_time) > 30:
- await asyncio.sleep(1)
+ if seconds_buffered > 60 and player.corrected_elapsed_time > 30:
+ while (seconds_buffered - player.corrected_elapsed_time) > 30:
+ await asyncio.sleep(1)
#### HANDLE FIRST PART OF TRACK
queue_track.streamdetails.duration = (
seek_position + queue_track.streamdetails.seconds_streamed
)
+ total_bytes_written += bytes_written
self.logger.debug(
"Finished Streaming queue track: %s (%s) on queue %s - seconds streamed: %s",
queue_track.streamdetails.uri,
CONF_ENTRY_AUTO_PLAY,
CONF_ENTRY_VOLUME_NORMALIZATION,
CONF_ENTRY_VOLUME_NORMALIZATION_TARGET,
+ ConfigEntry,
+ ConfigValueOption,
+ PlayerConfig,
)
+from music_assistant.common.models.enums import ConfigEntryType
from music_assistant.common.models.player import Player
+from music_assistant.constants import CONF_GROUP_MEMBERS, CONF_GROUP_PLAYERS, SYNCGROUP_PREFIX
from .provider import Provider
if TYPE_CHECKING:
- from music_assistant.common.models.config_entries import ConfigEntry, PlayerConfig
from music_assistant.common.models.queue_item import QueueItem
+ from music_assistant.server.controllers.streams import MultiClientStreamJob
+
# ruff: noqa: ARG001, ARG002
async def get_player_config_entries(self, player_id: str) -> tuple[ConfigEntry, ...]:
"""Return all (provider/player specific) Config Entries for the given player (if any)."""
- return (
+ entries = (
CONF_ENTRY_VOLUME_NORMALIZATION,
CONF_ENTRY_AUTO_PLAY,
CONF_ENTRY_VOLUME_NORMALIZATION_TARGET,
)
+ if player_id.startswith(SYNCGROUP_PREFIX):
+ # add default entries for syncgroups
+ return entries + (
+ ConfigEntry(
+ key=CONF_GROUP_MEMBERS,
+ type=ConfigEntryType.STRING,
+ label="Group members",
+ default_value=[],
+ options=tuple(
+ ConfigValueOption(x.display_name, x.player_id)
+ for x in self.mass.players.all(True, False)
+ if x.player_id != player_id and x.provider == self.instance_id
+ ),
+ description="Select all players you want to be part of this group",
+ multi_value=True,
+ required=True,
+ ),
+ )
+ return entries
def on_player_config_changed(self, config: PlayerConfig, changed_keys: set[str]) -> None:
"""Call (by config manager) when the configuration of a player changes."""
+ if f"values/{CONF_GROUP_MEMBERS}" in changed_keys:
+ player = self.mass.players.get(config.player_id)
+ player.group_childs = config.get_value(CONF_GROUP_MEMBERS)
+ self.mass.players.update(config.player_id)
def on_player_config_removed(self, player_id: str) -> None:
"""Call (by config manager) when the configuration of a player is removed."""
+ # ensure that any group players get removed
+ group_players = self.mass.config.get_raw_provider_config_value(
+ self.instance_id, CONF_GROUP_PLAYERS, {}
+ )
+ if player_id in group_players:
+ del group_players[player_id]
+ self.mass.config.set_raw_provider_config_value(
+ self.instance_id, CONF_GROUP_PLAYERS, group_players
+ )
@abstractmethod
async def cmd_stop(self, player_id: str) -> None:
- player_id: player_id of the player to handle the command.
"""
# will only be called for players with Pause feature set.
+ raise NotImplementedError()
async def play_media(
self,
- seek_position: Optional seek to this position.
- fade_in: Optionally fade in the item at playback start.
"""
+ raise NotImplementedError()
+
+ async def play_stream(self, player_id: str, stream_job: MultiClientStreamJob) -> None:
+ """Handle PLAY STREAM on given player.
+
+ This is a special feature from the Universal Group provider.
+ """
+ raise NotImplementedError()
async def enqueue_next_queue_item(self, player_id: str, queue_item: QueueItem):
"""
- powered: bool if player should be powered on or off.
"""
# will only be called for players with Power feature set.
+ raise NotImplementedError()
async def cmd_volume_set(self, player_id: str, volume_level: int) -> None:
"""Send VOLUME_SET command to given player.
- volume_level: volume level (0..100) to set on the player.
"""
# will only be called for players with Volume feature set.
+ raise NotImplementedError()
async def cmd_volume_mute(self, player_id: str, muted: bool) -> None:
"""Send VOLUME MUTE command to given player.
- muted: bool if player should be muted.
"""
# will only be called for players with Mute feature set.
+ raise NotImplementedError()
async def cmd_seek(self, player_id: str, position: int) -> None:
"""Handle SEEK command for given queue.
- position: position in seconds to seek to in the current playing item.
"""
# will only be called for players with Seek feature set.
+ raise NotImplementedError()
async def cmd_sync(self, player_id: str, target_player: str) -> None:
"""Handle SYNC command for given player.
- target_player: player_id of the syncgroup master or group player.
"""
# will only be called for players with SYNC feature set.
+ raise NotImplementedError()
async def cmd_unsync(self, player_id: str) -> None:
"""Handle UNSYNC command for given player.
- player_id: player_id of the player to handle the command.
"""
# will only be called for players with SYNC feature set.
+ raise NotImplementedError()
+
+ async def create_group(self, name: str, members: list[str]) -> Player:
+ """Create new PlayerGroup on this provider.
+
+ Create a new SyncGroup (or PlayerGroup) with given name and members.
+
+ - name: Name for the new group to create.
+ - members: A list of player_id's that should be part of this group.
+ """
+ # will only be called for players with PLAYER_GROUP_CREATE feature set.
+ raise NotImplementedError()
async def poll_player(self, player_id: str) -> None:
"""Poll player for state updates.
If the player does not need any polling, simply do not override this method.
"""
- async def on_child_power(self, player_id: str, child_player: Player, new_power: bool) -> None:
+ def on_child_power(self, player_id: str, child_player_id: str, new_power: bool) -> None:
"""
- Call when a power command was executed on one of the child players.
+ Call when a power command was executed on one of the child player of a Player/Sync group.
- This is used to handle special actions such as muting as power or (re)syncing.
+ This is used to handle special actions such as (re)syncing.
"""
# DO NOT OVERRIDE BELOW
"""Return the features supported by this Provider."""
return tuple()
+ async def handle_setup(self) -> None:
+ """Handle async initialization of the provider."""
+
async def unload(self) -> None:
"""
Handle unload/close of the provider.
import aiofiles
from music_assistant.common.models.config_entries import ConfigEntry, ConfigValueType
-from music_assistant.common.models.enums import ConfigEntryType
+from music_assistant.common.models.enums import ConfigEntryType, ProviderFeature
from music_assistant.common.models.player import DeviceInfo, Player
from music_assistant.constants import CONF_LOG_LEVEL, CONF_PLAYERS
from music_assistant.server.models.player_provider import PlayerProvider
from music_assistant.common.models.provider import ProviderManifest
from music_assistant.common.models.queue_item import QueueItem
from music_assistant.server import MusicAssistant
+ from music_assistant.server.controllers.streams import MultiClientStreamJob
from music_assistant.server.models import ProviderInstanceType
from music_assistant.server.providers.slimproto import SlimprotoProvider
_log_reader_task: asyncio.Task | None = None
_removed_players: set[str] | None = None
+ @property
+ def supported_features(self) -> tuple[ProviderFeature, ...]:
+ """Return the features supported by this Provider."""
+ return (ProviderFeature.SYNC_PLAYERS,)
+
async def handle_setup(self) -> None:
"""Handle async initialization of the provider."""
self._removed_players = set()
def on_player_config_changed(self, config: PlayerConfig, changed_keys: set[str]) -> None:
"""Call (by config manager) when the configuration of a player changes."""
+ super().on_player_config_changed(config, changed_keys)
# forward to slimproto too
slimproto_prov = self.mass.get_provider("slimproto")
slimproto_prov.on_player_config_changed(config, changed_keys)
def on_player_config_removed(self, player_id: str) -> None:
"""Call (by config manager) when the configuration of a player is removed."""
+ super().on_player_config_removed()
self._removed_players.add(player_id)
self.restart_bridge()
fade_in=fade_in,
)
+ async def play_stream(self, player_id: str, stream_job: MultiClientStreamJob) -> None:
+ """Handle PLAY STREAM on given player.
+
+ This is a special feature from the Universal Group provider.
+ """
+ # simply forward to underlying slimproto player
+ slimproto_prov = self.mass.get_provider("slimproto")
+ await slimproto_prov.play_stream(player_id, stream_job)
+
async def enqueue_next_queue_item(self, player_id: str, queue_item: QueueItem):
"""Handle enqueuing of the next queue item on the player."""
# simply forward to underlying slimproto player
from music_assistant.common.models.config_entries import (
CONF_ENTRY_CROSSFADE_DURATION,
- CONF_ENTRY_HIDE_GROUP_MEMBERS,
ConfigEntry,
ConfigValueType,
)
from music_assistant.common.models.config_entries import PlayerConfig, ProviderConfig
from music_assistant.common.models.provider import ProviderManifest
from music_assistant.server import MusicAssistant
+ from music_assistant.server.controllers.streams import MultiClientStreamJob
from music_assistant.server.models import ProviderInstanceType
async def get_player_config_entries(self, player_id: str) -> tuple[ConfigEntry, ...]:
"""Return all (provider/player specific) Config Entries for the given player (if any)."""
- cast_player = self.castplayers.get(player_id)
base_entries = await super().get_player_config_entries(player_id)
- entries = base_entries + PLAYER_CONFIG_ENTRIES
- if (
- cast_player
- and cast_player.cast_info.is_audio_group
- and not cast_player.cast_info.is_multichannel_group
- ):
- entries = entries + (CONF_ENTRY_HIDE_GROUP_MEMBERS,)
- return entries
+ return base_entries + PLAYER_CONFIG_ENTRIES
def on_player_config_changed(
self, config: PlayerConfig, changed_keys: set[str] # noqa: ARG002
) -> None:
"""Call (by config manager) when the configuration of a player changes."""
+ super().on_player_config_changed(config, changed_keys)
if "enabled" in changed_keys and config.player_id not in self.castplayers:
self.mass.create_task(self.mass.config.reload_provider, self.instance_id)
async def cmd_power(self, player_id: str, powered: bool) -> None:
"""Send POWER command to given player."""
castplayer = self.castplayers[player_id]
- # set mute_as_power feature for group members
- if castplayer.player.type == PlayerType.GROUP:
- for child_player_id in castplayer.player.group_childs:
- if child_player := self.mass.players.get(child_player_id):
- child_player.mute_as_power = powered
if powered:
await self._launch_app(castplayer)
else:
media_controller = castplayer.cc.media_controller
await asyncio.to_thread(media_controller.send_message, queuedata, True)
+ async def play_stream(self, player_id: str, stream_job: MultiClientStreamJob) -> None:
+ """Handle PLAY STREAM on given player.
+
+ This is a special feature from the Universal Group provider.
+ """
+ url = stream_job.resolve_stream_url(player_id, ContentType.FLAC)
+ castplayer = self.castplayers[player_id]
+ await asyncio.to_thread(
+ castplayer.cc.play_media,
+ url,
+ content_type="audio/flac",
+ title="Music Assistant",
+ thumb=MASS_LOGO_ONLINE,
+ )
+
async def enqueue_next_queue_item(self, player_id: str, queue_item: QueueItem):
"""Handle enqueuing of the next queue item on the player."""
castplayer = self.castplayers[player_id]
castplayer.player.name = castplayer.cast_info.friendly_name
castplayer.player.volume_level = int(status.volume_level * 100)
castplayer.player.volume_muted = status.volume_muted
- if castplayer.active_group:
- # use mute as power when group is active
- castplayer.player.powered = not status.volume_muted
- else:
- castplayer.player.powered = (
- castplayer.cc.app_id is not None
- and castplayer.cc.app_id != pychromecast.IDLE_APP_ID
- )
+ castplayer.player.powered = (
+ castplayer.cc.app_id is not None and castplayer.cc.app_id != pychromecast.IDLE_APP_ID
+ )
# handle stereo pairs
if castplayer.cast_info.is_multichannel_group:
- castplayer.player.type = PlayerType.STEREO_PAIR
+ castplayer.player.type = PlayerType.PLAYER
castplayer.player.group_childs = set()
# handle cast groups
if castplayer.cast_info.is_audio_group and not castplayer.cast_info.is_multichannel_group:
"""Handle the cast removed from a group."""
if not self._valid:
return
- if group_uuid in self.castplayer.player.hidden_by:
- self.castplayer.player.hidden_by.remove(group_uuid)
+ if group_uuid == self.castplayer.player.active_source:
+ self.castplayer.player.active_source = ""
self.prov.logger.debug(
"%s is removed from multizone: %s", self.castplayer.player.display_name, group_uuid
)
from music_assistant.common.models.config_entries import PlayerConfig, ProviderConfig
from music_assistant.common.models.provider import ProviderManifest
from music_assistant.server import MusicAssistant
+ from music_assistant.server.controllers.streams import MultiClientStreamJob
from music_assistant.server.models import ProviderInstanceType
BASE_PLAYER_FEATURES = (
self, config: PlayerConfig, changed_keys: set[str] # noqa: ARG002
) -> None:
"""Call (by config manager) when the configuration of a player changes."""
+ super().on_player_config_changed(config, changed_keys)
# run discovery to catch any re-enabled players
self.mass.create_task(self._run_discovery())
# reset player features based on config values
dlna_player.force_poll = True
await self.poll_player(dlna_player.udn)
+ @catch_request_errors
+ async def play_stream(self, player_id: str, stream_job: MultiClientStreamJob) -> None:
+ """Handle PLAY STREAM on given player.
+
+ This is a special feature from the Universal Group provider.
+ """
+ enforce_mp3 = await self.mass.config.get_player_config_value(player_id, CONF_ENFORCE_MP3)
+ output_codec = ContentType.MP3 if enforce_mp3 else ContentType.FLAC
+ url = stream_job.resolve_stream_url(player_id, output_codec)
+ dlna_player = self.dlnaplayers[player_id]
+ # always clear queue (by sending stop) first
+ if dlna_player.device.can_stop:
+ await self.cmd_stop(player_id)
+ didl_metadata = create_didl_metadata(self.mass, url, None)
+ await dlna_player.device.async_set_transport_uri(url, "Music Assistant", didl_metadata)
+ # Play it
+ await dlna_player.device.async_wait_for_can_play(10)
+ # optimistically set this timestamp to help in case of a player
+ # that does not report the progress
+ now = time.time()
+ dlna_player.player.elapsed_time = 0
+ dlna_player.player.elapsed_time_last_updated = now
+ await dlna_player.device.async_play()
+ # force poll the device
+ for sleep in (1, 2):
+ await asyncio.sleep(sleep)
+ dlna_player.force_poll = True
+ await self.poll_player(dlna_player.udn)
+
@catch_request_errors
async def enqueue_next_queue_item(self, player_id: str, queue_item: QueueItem):
"""
PlayerFeature,
PlayerState,
PlayerType,
+ ProviderFeature,
)
from music_assistant.common.models.errors import QueueEmpty, SetupFailedError
from music_assistant.common.models.player import DeviceInfo, Player
from music_assistant.common.models.config_entries import ProviderConfig
from music_assistant.common.models.provider import ProviderManifest
from music_assistant.server import MusicAssistant
+ from music_assistant.server.controllers.streams import MultiClientStreamJob
from music_assistant.server.models import ProviderInstanceType
_cli: LmsCli
port: int = DEFAULT_SLIMPROTO_PORT
+ @property
+ def supported_features(self) -> tuple[ProviderFeature, ...]:
+ """Return the features supported by this Provider."""
+ return (ProviderFeature.SYNC_PLAYERS,)
+
async def handle_setup(self) -> None:
"""Handle async initialization of the provider."""
self._socket_clients = {}
- seek_position: Optional seek to this position.
- fade_in: Optionally fade in the item at playback start.
"""
+ # fix race condition where resync and play media are called at more or less the same time
+ if self._resync_handle:
+ self._resync_handle.cancel()
+ self._resync_handle = None
player = self.mass.players.get(player_id)
if player.synced_to:
raise RuntimeError("A synced player cannot receive play commands directly")
auto_play=False,
)
)
+ stream_job.start()
else:
# regular, single player playback
client = self._socket_clients[player_id]
auto_play=True,
)
+ async def play_stream(self, player_id: str, stream_job: MultiClientStreamJob) -> None:
+ """Handle PLAY STREAM on given player.
+
+ This is a special feature from the Universal Group provider.
+ """
+ # forward command to player and any connected sync members
+ sync_clients = self._get_sync_clients(player_id)
+ async with asyncio.TaskGroup() as tg:
+ for client in sync_clients:
+ tg.create_task(
+ self._handle_play_url(
+ client,
+ url=stream_job.resolve_stream_url(
+ client.player_id, output_codec=ContentType.FLAC
+ ),
+ queue_item=None,
+ send_flush=True,
+ auto_play=False,
+ )
+ )
+
async def enqueue_next_queue_item(self, player_id: str, queue_item: QueueItem):
"""Handle enqueuing of the next queue item on the player."""
# we don't have to do anything,
manufacturer=client.device_type,
),
supported_features=(
- PlayerFeature.ACCURATE_TIME,
PlayerFeature.POWER,
PlayerFeature.SYNC,
PlayerFeature.VOLUME_SET,
PlayerFeature,
PlayerState,
PlayerType,
+ ProviderFeature,
)
from music_assistant.common.models.errors import SetupFailedError
from music_assistant.common.models.media_items import AudioFormat
from music_assistant.common.models.config_entries import ProviderConfig
from music_assistant.common.models.provider import ProviderManifest
from music_assistant.server import MusicAssistant
+ from music_assistant.server.controllers.streams import MultiClientStreamJob
from music_assistant.server.models import ProviderInstanceType
CONF_SNAPCAST_SERVER_HOST = "snapcast_server_host"
snapcast_server_control_port: int
_stream_tasks: dict[str, asyncio.Task]
+ @property
+ def supported_features(self) -> tuple[ProviderFeature, ...]:
+ """Return the features supported by this Provider."""
+ return (ProviderFeature.SYNC_PLAYERS,)
+
async def handle_setup(self) -> None:
"""Handle async initialization of the provider."""
self.snapcast_server_host = self.config.get_value(CONF_SNAPCAST_SERVER_HOST)
async def cmd_volume_set(self, player_id: str, volume_level: int) -> None:
"""Send VOLUME_SET command to given player."""
await self._snapserver.client_volume(
- player_id, {"percent": volume_level, "muted": volume_level != 0}
+ player_id, {"percent": volume_level, "muted": volume_level == 0}
)
async def cmd_stop(self, player_id: str) -> None:
snap_group = self._get_snapgroup(player_id)
await snap_group.set_stream(stream.identifier)
- async def queue_streamer():
+ async def _streamer():
host = self.snapcast_server_host
_, writer = await asyncio.open_connection(host, port)
self.logger.debug("Opened connection to %s:%s", host, port)
self.logger.debug("Closed connection to %s:%s", host, port)
# start streaming the queue (pcm) audio in a background task
- self._stream_tasks[player_id] = asyncio.create_task(queue_streamer())
+ self._stream_tasks[player_id] = asyncio.create_task(_streamer())
+
+ async def play_stream(self, player_id: str, stream_job: MultiClientStreamJob) -> None:
+ """Handle PLAY STREAM on given player.
+
+ This is a special feature from the Universal Group provider.
+ """
+ player = self.mass.players.get(player_id)
+ if player.synced_to:
+ raise RuntimeError("A synced player cannot receive play commands directly")
+ # stop any existing streams first
+ await self.cmd_stop(player_id)
+ # TEMP - TODO - WARNING - ACHTUNG - HACK
+ # override pcm format of streamjob due to issue with snapcast
+ # that seems to only accept a 48000/16 stream somehow ?!
+ stream_job.pcm_format.content_type = ContentType.PCM_S16LE
+ stream_job.pcm_format.sample_rate = 48000
+ stream_job.pcm_format.bit_depth = 16
+ # end of hack
+ stream, port = await self._create_stream()
+ stream_job.expected_players.add(player_id)
+ snap_group = self._get_snapgroup(player_id)
+ await snap_group.set_stream(stream.identifier)
+
+ async def _streamer():
+ host = self.snapcast_server_host
+ _, writer = await asyncio.open_connection(host, port)
+ self.logger.debug("Opened connection to %s:%s", host, port)
+ player.current_item_id = f"flow/{stream_job.queue_id}"
+ player.elapsed_time = 0
+ player.elapsed_time_last_updated = time.time()
+ player.state = PlayerState.PLAYING
+ self._set_childs_state(player_id, PlayerState.PLAYING)
+ self.mass.players.register_or_update(player)
+ try:
+ async for pcm_chunk in stream_job.subscribe(player_id):
+ writer.write(pcm_chunk)
+ await writer.drain()
+ finally:
+ await self._snapserver.stream_remove_stream(stream.identifier)
+ if writer.can_write_eof():
+ writer.close()
+ if not writer.is_closing():
+ writer.close()
+ self.logger.debug("Closed connection to %s:%s", host, port)
+
+ # start streaming the queue (pcm) audio in a background task
+ self._stream_tasks[player_id] = asyncio.create_task(_streamer())
def _get_snapgroup(self, player_id: str) -> Snapgroup:
"""Get snapcast group for given player_id."""
port = random.randint(4953, 4953 + 200)
name = f"MusicAssistant--{port}"
result = await self._snapserver.stream_add_stream(
- # TODO: can we handle 24 bits bit depth ?
+ # NOTE: setting the sampleformat to something else
+ # (like 24 bits bit depth) does not seem to work at all!
f"tcp://0.0.0.0:{port}?name={name}&sampleformat=48000:16:2",
)
if "id" not in result:
<?xml version="1.0" encoding="UTF-8"?>
-<svg xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink" width="40px" height="40px" viewBox="0 0 40 40" version="1.1">
-<g id="surface1">
-<path style=" stroke:none;fill-rule:nonzero;fill:rgb(100%,81.568627%,0%);fill-opacity:1;" d="M 20 0 C 31.046875 0 40 8.953125 40 20 C 40 31.046875 31.046875 40 20 40 C 8.953125 40 0 31.046875 0 20 C 0 8.953125 8.953125 0 20 0 Z M 20 0 "/>
-<path style="fill:none;stroke-width:0.864;stroke-linecap:round;stroke-linejoin:round;stroke:rgb(100%,100%,100%);stroke-opacity:1;stroke-miterlimit:4;" d="M 28.021875 15.689062 C 27.848437 15.689062 27.679687 15.75 27.54375 15.857812 L 22.3125 20.189062 L 17.709375 20.189062 C 17.296875 20.189062 16.959375 20.521875 16.959375 20.939062 L 16.959375 26.657812 C 16.964062 27.070312 17.296875 27.403125 17.709375 27.407812 L 17.728125 27.407812 L 22.678125 27.31875 L 27.520312 31.715625 C 27.65625 31.8375 27.839062 31.907812 28.021875 31.907812 C 28.129687 31.907812 28.232812 31.884375 28.33125 31.842187 C 28.598437 31.720312 28.776562 31.453125 28.771875 31.157812 L 28.771875 16.439062 C 28.771875 16.148437 28.607812 15.88125 28.340625 15.759375 C 28.242187 15.7125 28.134375 15.689062 28.021875 15.689062 Z M 28.021875 15.689062 " transform="matrix(0.833333,0,0,0.833333,0,0)"/>
-<path style="fill:none;stroke-width:0.864;stroke-linecap:round;stroke-linejoin:round;stroke:rgb(100%,100%,100%);stroke-opacity:1;stroke-miterlimit:4;" d="M 7.884375 15.15 C 10.9875 9.80625 16.6875 6.515625 22.865625 6.501562 " transform="matrix(0.833333,0,0,0.833333,0,0)"/>
-<path style="fill:none;stroke-width:0.864;stroke-linecap:round;stroke-linejoin:round;stroke:rgb(100%,100%,100%);stroke-opacity:1;stroke-miterlimit:4;" d="M 23.142187 41.49375 C 16.964062 41.484375 11.259375 38.19375 8.160937 32.85 " transform="matrix(0.833333,0,0,0.833333,0,0)"/>
-<path style="fill:none;stroke-width:0.864;stroke-linecap:round;stroke-linejoin:round;stroke:rgb(100%,100%,100%);stroke-opacity:1;stroke-miterlimit:4;" d="M 37.865625 15.15 C 40.954687 20.503125 40.954687 27.09375 37.865625 32.446875 " transform="matrix(0.833333,0,0,0.833333,0,0)"/>
-<path style="fill:none;stroke-width:0.864;stroke-linecap:round;stroke-linejoin:round;stroke:rgb(100%,100%,100%);stroke-opacity:1;stroke-miterlimit:4;" d="M 10.354687 16.528125 C 12.960937 12.0375 17.751562 9.271875 22.940625 9.2625 " transform="matrix(0.833333,0,0,0.833333,0,0)"/>
-<path style="fill:none;stroke-width:0.864;stroke-linecap:round;stroke-linejoin:round;stroke:rgb(100%,100%,100%);stroke-opacity:1;stroke-miterlimit:4;" d="M 22.940625 38.329687 C 17.751562 38.320312 12.960937 35.554687 10.359375 31.06875 " transform="matrix(0.833333,0,0,0.833333,0,0)"/>
-<path style="fill:none;stroke-width:0.864;stroke-linecap:round;stroke-linejoin:round;stroke:rgb(100%,100%,100%);stroke-opacity:1;stroke-miterlimit:4;" d="M 35.3625 16.528125 C 37.959375 21.023437 37.959375 26.56875 35.3625 31.064062 " transform="matrix(0.833333,0,0,0.833333,0,0)"/>
-<path style="fill:none;stroke-width:0.864;stroke-linecap:round;stroke-linejoin:round;stroke:rgb(100%,100%,100%);stroke-opacity:1;stroke-miterlimit:4;" d="M 12.815625 17.925 C 14.896875 14.339062 18.721875 12.13125 22.865625 12.121875 " transform="matrix(0.833333,0,0,0.833333,0,0)"/>
-<path style="fill:none;stroke-width:0.864;stroke-linecap:round;stroke-linejoin:round;stroke:rgb(100%,100%,100%);stroke-opacity:1;stroke-miterlimit:4;" d="M 22.865625 35.470312 C 18.721875 35.460937 14.896875 33.253125 12.815625 29.671875 " transform="matrix(0.833333,0,0,0.833333,0,0)"/>
-<path style="fill:none;stroke-width:0.864;stroke-linecap:round;stroke-linejoin:round;stroke:rgb(100%,100%,100%);stroke-opacity:1;stroke-miterlimit:4;" d="M 33.042187 17.995312 C 35.109375 21.585937 35.109375 26.00625 33.042187 29.596875 " transform="matrix(0.833333,0,0,0.833333,0,0)"/>
-<path style="fill:none;stroke-width:1.2;stroke-linecap:round;stroke-linejoin:round;stroke:rgb(100%,100%,100%);stroke-opacity:1;stroke-miterlimit:4;" d="M 28.021875 15.689062 C 27.848437 15.689062 27.679687 15.75 27.54375 15.857812 L 22.3125 20.189062 L 17.709375 20.189062 C 17.296875 20.189062 16.959375 20.521875 16.959375 20.939062 L 16.959375 26.657812 C 16.964062 27.070312 17.296875 27.403125 17.709375 27.407812 L 17.728125 27.407812 L 22.678125 27.31875 L 27.520312 31.715625 C 27.65625 31.8375 27.839062 31.907812 28.021875 31.907812 C 28.129687 31.907812 28.232812 31.884375 28.33125 31.842187 C 28.598437 31.720312 28.776562 31.453125 28.771875 31.157812 L 28.771875 16.439062 C 28.771875 16.148437 28.607812 15.88125 28.340625 15.759375 C 28.242187 15.7125 28.134375 15.689062 28.021875 15.689062 Z M 28.021875 15.689062 " transform="matrix(0.833333,0,0,0.833333,0,0)"/>
-<path style="fill:none;stroke-width:1.2;stroke-linecap:round;stroke-linejoin:round;stroke:rgb(100%,100%,100%);stroke-opacity:1;stroke-miterlimit:4;" d="M 7.884375 15.15 C 10.9875 9.80625 16.6875 6.515625 22.865625 6.501562 " transform="matrix(0.833333,0,0,0.833333,0,0)"/>
-<path style="fill:none;stroke-width:1.2;stroke-linecap:round;stroke-linejoin:round;stroke:rgb(100%,100%,100%);stroke-opacity:1;stroke-miterlimit:4;" d="M 23.142187 41.49375 C 16.964062 41.484375 11.259375 38.19375 8.160937 32.85 " transform="matrix(0.833333,0,0,0.833333,0,0)"/>
-<path style="fill:none;stroke-width:1.2;stroke-linecap:round;stroke-linejoin:round;stroke:rgb(100%,100%,100%);stroke-opacity:1;stroke-miterlimit:4;" d="M 37.865625 15.15 C 40.954687 20.503125 40.954687 27.09375 37.865625 32.446875 " transform="matrix(0.833333,0,0,0.833333,0,0)"/>
-<path style="fill:none;stroke-width:1.2;stroke-linecap:round;stroke-linejoin:round;stroke:rgb(100%,100%,100%);stroke-opacity:1;stroke-miterlimit:4;" d="M 10.354687 16.528125 C 12.960937 12.0375 17.751562 9.271875 22.940625 9.2625 " transform="matrix(0.833333,0,0,0.833333,0,0)"/>
-<path style="fill:none;stroke-width:1.2;stroke-linecap:round;stroke-linejoin:round;stroke:rgb(100%,100%,100%);stroke-opacity:1;stroke-miterlimit:4;" d="M 22.940625 38.329687 C 17.751562 38.320312 12.960937 35.554687 10.359375 31.06875 " transform="matrix(0.833333,0,0,0.833333,0,0)"/>
-<path style="fill:none;stroke-width:1.2;stroke-linecap:round;stroke-linejoin:round;stroke:rgb(100%,100%,100%);stroke-opacity:1;stroke-miterlimit:4;" d="M 35.3625 16.528125 C 37.959375 21.023437 37.959375 26.56875 35.3625 31.064062 " transform="matrix(0.833333,0,0,0.833333,0,0)"/>
-<path style="fill:none;stroke-width:1.2;stroke-linecap:round;stroke-linejoin:round;stroke:rgb(100%,100%,100%);stroke-opacity:1;stroke-miterlimit:4;" d="M 12.815625 17.925 C 14.896875 14.339062 18.721875 12.13125 22.865625 12.121875 " transform="matrix(0.833333,0,0,0.833333,0,0)"/>
-<path style="fill:none;stroke-width:1.2;stroke-linecap:round;stroke-linejoin:round;stroke:rgb(100%,100%,100%);stroke-opacity:1;stroke-miterlimit:4;" d="M 22.865625 35.470312 C 18.721875 35.460937 14.896875 33.253125 12.815625 29.671875 " transform="matrix(0.833333,0,0,0.833333,0,0)"/>
-<path style="fill:none;stroke-width:1.2;stroke-linecap:round;stroke-linejoin:round;stroke:rgb(100%,100%,100%);stroke-opacity:1;stroke-miterlimit:4;" d="M 33.042187 17.995312 C 35.109375 21.585937 35.109375 26.00625 33.042187 29.596875 " transform="matrix(0.833333,0,0,0.833333,0,0)"/>
-</g>
+<svg xmlns="http://www.w3.org/2000/svg"
+ xmlns:xlink="http://www.w3.org/1999/xlink" viewBox="0 0 40 40" version="1.1">
+ <g id="surface1">
+ <path style=" stroke:none;fill-rule:nonzero;fill:rgb(100%,81.568627%,0%);fill-opacity:1;" d="M 20 0 C 31.046875 0 40 8.953125 40 20 C 40 31.046875 31.046875 40 20 40 C 8.953125 40 0 31.046875 0 20 C 0 8.953125 8.953125 0 20 0 Z M 20 0 "/>
+ <path style="fill:none;stroke-width:0.864;stroke-linecap:round;stroke-linejoin:round;stroke:rgb(100%,100%,100%);stroke-opacity:1;stroke-miterlimit:4;" d="M 28.021875 15.689062 C 27.848437 15.689062 27.679687 15.75 27.54375 15.857812 L 22.3125 20.189062 L 17.709375 20.189062 C 17.296875 20.189062 16.959375 20.521875 16.959375 20.939062 L 16.959375 26.657812 C 16.964062 27.070312 17.296875 27.403125 17.709375 27.407812 L 17.728125 27.407812 L 22.678125 27.31875 L 27.520312 31.715625 C 27.65625 31.8375 27.839062 31.907812 28.021875 31.907812 C 28.129687 31.907812 28.232812 31.884375 28.33125 31.842187 C 28.598437 31.720312 28.776562 31.453125 28.771875 31.157812 L 28.771875 16.439062 C 28.771875 16.148437 28.607812 15.88125 28.340625 15.759375 C 28.242187 15.7125 28.134375 15.689062 28.021875 15.689062 Z M 28.021875 15.689062 " transform="matrix(0.833333,0,0,0.833333,0,0)"/>
+ <path style="fill:none;stroke-width:0.864;stroke-linecap:round;stroke-linejoin:round;stroke:rgb(100%,100%,100%);stroke-opacity:1;stroke-miterlimit:4;" d="M 7.884375 15.15 C 10.9875 9.80625 16.6875 6.515625 22.865625 6.501562 " transform="matrix(0.833333,0,0,0.833333,0,0)"/>
+ <path style="fill:none;stroke-width:0.864;stroke-linecap:round;stroke-linejoin:round;stroke:rgb(100%,100%,100%);stroke-opacity:1;stroke-miterlimit:4;" d="M 23.142187 41.49375 C 16.964062 41.484375 11.259375 38.19375 8.160937 32.85 " transform="matrix(0.833333,0,0,0.833333,0,0)"/>
+ <path style="fill:none;stroke-width:0.864;stroke-linecap:round;stroke-linejoin:round;stroke:rgb(100%,100%,100%);stroke-opacity:1;stroke-miterlimit:4;" d="M 37.865625 15.15 C 40.954687 20.503125 40.954687 27.09375 37.865625 32.446875 " transform="matrix(0.833333,0,0,0.833333,0,0)"/>
+ <path style="fill:none;stroke-width:0.864;stroke-linecap:round;stroke-linejoin:round;stroke:rgb(100%,100%,100%);stroke-opacity:1;stroke-miterlimit:4;" d="M 10.354687 16.528125 C 12.960937 12.0375 17.751562 9.271875 22.940625 9.2625 " transform="matrix(0.833333,0,0,0.833333,0,0)"/>
+ <path style="fill:none;stroke-width:0.864;stroke-linecap:round;stroke-linejoin:round;stroke:rgb(100%,100%,100%);stroke-opacity:1;stroke-miterlimit:4;" d="M 22.940625 38.329687 C 17.751562 38.320312 12.960937 35.554687 10.359375 31.06875 " transform="matrix(0.833333,0,0,0.833333,0,0)"/>
+ <path style="fill:none;stroke-width:0.864;stroke-linecap:round;stroke-linejoin:round;stroke:rgb(100%,100%,100%);stroke-opacity:1;stroke-miterlimit:4;" d="M 35.3625 16.528125 C 37.959375 21.023437 37.959375 26.56875 35.3625 31.064062 " transform="matrix(0.833333,0,0,0.833333,0,0)"/>
+ <path style="fill:none;stroke-width:0.864;stroke-linecap:round;stroke-linejoin:round;stroke:rgb(100%,100%,100%);stroke-opacity:1;stroke-miterlimit:4;" d="M 12.815625 17.925 C 14.896875 14.339062 18.721875 12.13125 22.865625 12.121875 " transform="matrix(0.833333,0,0,0.833333,0,0)"/>
+ <path style="fill:none;stroke-width:0.864;stroke-linecap:round;stroke-linejoin:round;stroke:rgb(100%,100%,100%);stroke-opacity:1;stroke-miterlimit:4;" d="M 22.865625 35.470312 C 18.721875 35.460937 14.896875 33.253125 12.815625 29.671875 " transform="matrix(0.833333,0,0,0.833333,0,0)"/>
+ <path style="fill:none;stroke-width:0.864;stroke-linecap:round;stroke-linejoin:round;stroke:rgb(100%,100%,100%);stroke-opacity:1;stroke-miterlimit:4;" d="M 33.042187 17.995312 C 35.109375 21.585937 35.109375 26.00625 33.042187 29.596875 " transform="matrix(0.833333,0,0,0.833333,0,0)"/>
+ <path style="fill:none;stroke-width:1.2;stroke-linecap:round;stroke-linejoin:round;stroke:rgb(100%,100%,100%);stroke-opacity:1;stroke-miterlimit:4;" d="M 28.021875 15.689062 C 27.848437 15.689062 27.679687 15.75 27.54375 15.857812 L 22.3125 20.189062 L 17.709375 20.189062 C 17.296875 20.189062 16.959375 20.521875 16.959375 20.939062 L 16.959375 26.657812 C 16.964062 27.070312 17.296875 27.403125 17.709375 27.407812 L 17.728125 27.407812 L 22.678125 27.31875 L 27.520312 31.715625 C 27.65625 31.8375 27.839062 31.907812 28.021875 31.907812 C 28.129687 31.907812 28.232812 31.884375 28.33125 31.842187 C 28.598437 31.720312 28.776562 31.453125 28.771875 31.157812 L 28.771875 16.439062 C 28.771875 16.148437 28.607812 15.88125 28.340625 15.759375 C 28.242187 15.7125 28.134375 15.689062 28.021875 15.689062 Z M 28.021875 15.689062 " transform="matrix(0.833333,0,0,0.833333,0,0)"/>
+ <path style="fill:none;stroke-width:1.2;stroke-linecap:round;stroke-linejoin:round;stroke:rgb(100%,100%,100%);stroke-opacity:1;stroke-miterlimit:4;" d="M 7.884375 15.15 C 10.9875 9.80625 16.6875 6.515625 22.865625 6.501562 " transform="matrix(0.833333,0,0,0.833333,0,0)"/>
+ <path style="fill:none;stroke-width:1.2;stroke-linecap:round;stroke-linejoin:round;stroke:rgb(100%,100%,100%);stroke-opacity:1;stroke-miterlimit:4;" d="M 23.142187 41.49375 C 16.964062 41.484375 11.259375 38.19375 8.160937 32.85 " transform="matrix(0.833333,0,0,0.833333,0,0)"/>
+ <path style="fill:none;stroke-width:1.2;stroke-linecap:round;stroke-linejoin:round;stroke:rgb(100%,100%,100%);stroke-opacity:1;stroke-miterlimit:4;" d="M 37.865625 15.15 C 40.954687 20.503125 40.954687 27.09375 37.865625 32.446875 " transform="matrix(0.833333,0,0,0.833333,0,0)"/>
+ <path style="fill:none;stroke-width:1.2;stroke-linecap:round;stroke-linejoin:round;stroke:rgb(100%,100%,100%);stroke-opacity:1;stroke-miterlimit:4;" d="M 10.354687 16.528125 C 12.960937 12.0375 17.751562 9.271875 22.940625 9.2625 " transform="matrix(0.833333,0,0,0.833333,0,0)"/>
+ <path style="fill:none;stroke-width:1.2;stroke-linecap:round;stroke-linejoin:round;stroke:rgb(100%,100%,100%);stroke-opacity:1;stroke-miterlimit:4;" d="M 22.940625 38.329687 C 17.751562 38.320312 12.960937 35.554687 10.359375 31.06875 " transform="matrix(0.833333,0,0,0.833333,0,0)"/>
+ <path style="fill:none;stroke-width:1.2;stroke-linecap:round;stroke-linejoin:round;stroke:rgb(100%,100%,100%);stroke-opacity:1;stroke-miterlimit:4;" d="M 35.3625 16.528125 C 37.959375 21.023437 37.959375 26.56875 35.3625 31.064062 " transform="matrix(0.833333,0,0,0.833333,0,0)"/>
+ <path style="fill:none;stroke-width:1.2;stroke-linecap:round;stroke-linejoin:round;stroke:rgb(100%,100%,100%);stroke-opacity:1;stroke-miterlimit:4;" d="M 12.815625 17.925 C 14.896875 14.339062 18.721875 12.13125 22.865625 12.121875 " transform="matrix(0.833333,0,0,0.833333,0,0)"/>
+ <path style="fill:none;stroke-width:1.2;stroke-linecap:round;stroke-linejoin:round;stroke:rgb(100%,100%,100%);stroke-opacity:1;stroke-miterlimit:4;" d="M 22.865625 35.470312 C 18.721875 35.460937 14.896875 33.253125 12.815625 29.671875 " transform="matrix(0.833333,0,0,0.833333,0,0)"/>
+ <path style="fill:none;stroke-width:1.2;stroke-linecap:round;stroke-linejoin:round;stroke:rgb(100%,100%,100%);stroke-opacity:1;stroke-miterlimit:4;" d="M 33.042187 17.995312 C 35.109375 21.585937 35.109375 26.00625 33.042187 29.596875 " transform="matrix(0.833333,0,0,0.833333,0,0)"/>
+ </g>
</svg>
PlayerFeature,
PlayerState,
PlayerType,
+ ProviderFeature,
)
from music_assistant.common.models.errors import PlayerCommandFailed, PlayerUnavailableError
from music_assistant.common.models.player import DeviceInfo, Player
from music_assistant.common.models.config_entries import PlayerConfig, ProviderConfig
from music_assistant.common.models.provider import ProviderManifest
from music_assistant.server import MusicAssistant
+ from music_assistant.server.controllers.streams import MultiClientStreamJob
from music_assistant.server.models import ProviderInstanceType
group_members = {x.uid for x in self.group_info.members if x.is_visible}
if not group_members:
# not sure about this ?!
- self.player.type = PlayerType.STEREO_PAIR
+ self.player.type = PlayerType.PLAYER
elif group_members == {self.player_id}:
self.player.group_childs = set()
else:
_discovery_running: bool = False
_discovery_reschedule_timer: asyncio.TimerHandle | None = None
+ @property
+ def supported_features(self) -> tuple[ProviderFeature, ...]:
+ """Return the features supported by this Provider."""
+ return (ProviderFeature.SYNC_PLAYERS,)
+
async def handle_setup(self) -> None:
"""Handle async initialization of the provider."""
self.sonosplayers = {}
self, config: PlayerConfig, changed_keys: set[str] # noqa: ARG002
) -> None:
"""Call (by config manager) when the configuration of a player changes."""
+ super().on_player_config_changed(config, changed_keys)
if "enabled" in changed_keys:
# run discovery to catch any re-enabled players
self.mass.create_task(self._run_discovery())
- target_player: player_id of the syncgroup master or group player.
"""
sonos_player = self.sonosplayers[player_id]
- await asyncio.to_thread(sonos_player.soco.join, self.sonosplayers[target_player].soco)
+ retries = 0
+ while True:
+ try:
+ await asyncio.to_thread(
+ sonos_player.soco.join, self.sonosplayers[target_player].soco
+ )
+ break
+ except soco.exceptions.SoCoUPnPException as err:
+ if retries >= 3:
+ raise err
+ retries += 1
+ await asyncio.sleep(1)
await asyncio.to_thread(
sonos_player.update_info,
update_group_info=True,
sonos_player.player.elapsed_time = 0
sonos_player.player.elapsed_time_last_updated = now
+ async def play_stream(self, player_id: str, stream_job: MultiClientStreamJob) -> None:
+ """Handle PLAY STREAM on given player.
+
+ This is a special feature from the Universal Group provider.
+ """
+ url = stream_job.resolve_stream_url(player_id, ContentType.MP3)
+ sonos_player = self.sonosplayers[player_id]
+ if not sonos_player.soco.is_coordinator:
+ # this should be already handled by the player manager, but just in case...
+ raise PlayerCommandFailed(
+ f"Player {sonos_player.player.display_name} can not "
+ "accept play_stream command, it is synced to another player."
+ )
+ # always stop and clear queue first
+ await asyncio.to_thread(sonos_player.soco.stop)
+ await asyncio.to_thread(sonos_player.soco.clear_queue)
+ await asyncio.to_thread(sonos_player.soco.play_uri, url, force_radio=True)
+ # optimistically set this timestamp to help figure out elapsed time later
+ now = time.time()
+ sonos_player.playback_started = now
+ sonos_player.player.elapsed_time = 0
+ sonos_player.player.elapsed_time_last_updated = now
+
async def enqueue_next_queue_item(self, player_id: str, queue_item: QueueItem):
"""
Handle enqueuing of the next queue item on the player.
the next successful poll or event where it becomes available again.
If the player does not need any polling, simply do not override this method.
"""
+ if player_id not in self.sonosplayers:
+ return
sonos_player = self.sonosplayers[player_id]
try:
# the check_poll logic will work out what endpoints need polling now
type=PlayerType.PLAYER,
name=soco_device.player_name,
available=True,
- powered=True,
+ powered=False,
supported_features=PLAYER_FEATURES,
device_info=DeviceInfo(
model=speaker_info["model_name"],
from __future__ import annotations
import asyncio
+from collections.abc import Iterable
from typing import TYPE_CHECKING
+import shortuuid
+
from music_assistant.common.models.config_entries import (
- CONF_ENTRY_EQ_BASS,
- CONF_ENTRY_EQ_MID,
- CONF_ENTRY_EQ_TREBLE,
- CONF_ENTRY_FLOW_MODE,
- CONF_ENTRY_HIDE_GROUP_MEMBERS,
- CONF_ENTRY_OUTPUT_CHANNELS,
+ CONF_ENTRY_CROSSFADE_DURATION,
ConfigEntry,
ConfigValueOption,
ConfigValueType,
PlayerFeature,
PlayerState,
PlayerType,
+ ProviderFeature,
)
from music_assistant.common.models.player import DeviceInfo, Player
from music_assistant.common.models.queue_item import QueueItem
-from music_assistant.constants import CONF_GROUPED_POWER_ON
+from music_assistant.constants import CONF_CROSSFADE, CONF_GROUP_MEMBERS, SYNCGROUP_PREFIX
from music_assistant.server.models.player_provider import PlayerProvider
if TYPE_CHECKING:
from music_assistant.server import MusicAssistant
from music_assistant.server.models import ProviderInstanceType
+UGP_PREFIX = "ugp_"
-CONF_GROUP_MEMBERS = "group_members"
-CONF_MUTE_CHILDS = "mute_childs"
-CONF_ENTRY_OUTPUT_CHANNELS_FORCED_STEREO = ConfigEntry.from_dict(
- {
- **CONF_ENTRY_OUTPUT_CHANNELS.to_dict(),
- "hidden": True,
- "default_value": "stereo",
- "value": "stereo",
- }
-)
-CONF_ENTRY_FORCED_FLOW_MODE = ConfigEntry.from_dict(
- {**CONF_ENTRY_FLOW_MODE.to_dict(), "default_value": True, "value": True, "hidden": True}
-)
-CONF_ENTRY_EQ_BASS_HIDDEN = ConfigEntry.from_dict({**CONF_ENTRY_EQ_BASS.to_dict(), "hidden": True})
-CONF_ENTRY_EQ_MID_HIDDEN = ConfigEntry.from_dict({**CONF_ENTRY_EQ_MID.to_dict(), "hidden": True})
-CONF_ENTRY_EQ_TREBLE_HIDDEN = ConfigEntry.from_dict(
- {**CONF_ENTRY_EQ_TREBLE.to_dict(), "hidden": True}
-)
-CONF_ENTRY_GROUPED_POWER_ON = ConfigEntry(
- key=CONF_GROUPED_POWER_ON,
- type=ConfigEntryType.BOOLEAN,
- default_value=False,
- label="Forced Power ON of all group members",
- description="Power ON all child players when the group player is powered on "
- "(or playback started). \n"
- "If this setting is disabled, playback will only start on players that "
- "are already powered ON at the time of playback start.\n"
- "When turning OFF the group player, all group members are turned off, "
- "regardless of this setting.",
- advanced=False,
-)
# ruff: noqa: ARG002
async def get_config_entries(
- mass: MusicAssistant,
- instance_id: str | None = None,
- action: str | None = None,
- values: dict[str, ConfigValueType] | None = None,
+ mass: MusicAssistant, # noqa: ARG001
+ instance_id: str | None = None, # noqa: ARG001
+ action: str | None = None, # noqa: ARG001
+ values: dict[str, ConfigValueType] | None = None, # noqa: ARG001
) -> tuple[ConfigEntry, ...]:
"""
Return Config entries to setup this provider.
action: [optional] action key called from config entries UI.
values: the (intermediate) raw values for config entries sent with the action.
"""
- # ruff: noqa: ARG001
- # dynamically extend the amount of entries when needed
- if values.get("ugp_15"):
- player_count = 20
- elif values.get("ugp_10"):
- player_count = 15
- elif values.get("ugp_5"):
- player_count = 10
- else:
- player_count = 5
- player_entries = tuple(
- ConfigEntry(
- key=f"ugp_{index}",
- type=ConfigEntryType.STRING,
- label=f"Group player {index}: Group members",
- default_value=[],
- options=tuple(
- ConfigValueOption(x.display_name, x.player_id)
- for x in mass.players.all(True, True, False)
- if x.player_id != f"ugp_{index}"
- ),
- description="Select all players you want to be part of this group",
- multi_value=True,
- required=False,
- )
- for index in range(1, player_count + 1)
- )
- return player_entries
+ return tuple()
class UniversalGroupProvider(PlayerProvider):
prev_sync_leaders: dict[str, tuple[str]] | None = None
debounce_id: str | None = None
+ @property
+ def supported_features(self) -> tuple[ProviderFeature, ...]:
+ """Return the features supported by this Provider."""
+ return (ProviderFeature.PLAYER_GROUP_CREATE,)
+
async def handle_setup(self) -> None:
"""Handle async initialization of the provider."""
self.prev_sync_leaders = {}
- self.muted_clients = set()
-
- for index in range(1, 100):
- conf_key = f"ugp_{index}"
- try:
- player_conf = self.config.get_value(conf_key)
- except KeyError:
- break
- if player_conf == []:
- # cleanup player config if player config is removed/reset
- self.mass.players.remove(conf_key)
- continue
- elif not player_conf:
- continue
-
- player = Player(
- player_id=conf_key,
- provider=self.domain,
- type=PlayerType.GROUP,
- name=f"{self.name}: {index}",
- available=True,
- powered=False,
- device_info=DeviceInfo(model=self.manifest.name, manufacturer="Music Assistant"),
- # TODO: derive playerfeatures from (all) underlying child players?
- supported_features=(
- PlayerFeature.POWER,
- PlayerFeature.PAUSE,
- PlayerFeature.VOLUME_SET,
- PlayerFeature.VOLUME_MUTE,
- ),
- max_sample_rate=48000,
- supports_24bit=True,
- active_source=conf_key,
- group_childs=player_conf,
- )
- player.extra_data["optimistic_state"] = PlayerState.IDLE
- self.prev_sync_leaders[conf_key] = None
- self.mass.players.register_or_update(player)
+ self.mass.loop.create_task(self._register_all_players())
async def get_player_config_entries(self, player_id: str) -> tuple[ConfigEntry]: # noqa: ARG002
"""Return all (provider/player specific) Config Entries for the given player (if any)."""
- return (
- CONF_ENTRY_HIDE_GROUP_MEMBERS,
- CONF_ENTRY_GROUPED_POWER_ON,
+ base_entries = await super().get_player_config_entries(player_id)
+ return base_entries + (
ConfigEntry(
- key=CONF_MUTE_CHILDS,
+ key=CONF_GROUP_MEMBERS,
type=ConfigEntryType.STRING,
- label="Use muting for power commands",
- multi_value=True,
- options=(
+ label="Group members",
+ default_value=[],
+ options=tuple(
ConfigValueOption(x.display_name, x.player_id)
- for x in self._get_active_members(player_id, False, False)
+ for x in self.mass.players.all(True, False)
+ if x.player_id != player_id
),
- default_value=[],
- description="To prevent a restart of the stream, when a child player "
- "turns on while the group is already playing, you can enable a workaround "
- "where Music Assistant uses muting to control the group players. \n\n"
- "This means that while the group player is playing, power actions to these "
- "child players will be treated as (un)mute commands to prevent the small "
- "interruption of music when the stream is restarted.",
+ description="Select all players you want to be part of this universal group",
+ multi_value=True,
+ required=True,
+ ),
+ ConfigEntry(
+ key="ugp_note",
+ type=ConfigEntryType.LABEL,
+ label="Please note that although the universal group "
+ "allows you to group any player, it will not enable audio sync "
+ "between players of different ecosystems.",
),
+ ConfigEntry(
+ key=CONF_CROSSFADE,
+ type=ConfigEntryType.BOOLEAN,
+ label="Enable crossfade",
+ default_value=False,
+ description="Enable a crossfade transition between (queue) tracks. \n\n"
+ "Note that DLNA does not natively support crossfading so you need to enable "
+ "the 'flow mode' workaround to use crossfading with DLNA players.",
+ advanced=False,
+ ),
+ CONF_ENTRY_CROSSFADE_DURATION,
)
async def cmd_stop(self, player_id: str) -> None:
"""Send STOP command to given player."""
group_player = self.mass.players.get(player_id)
- group_player.extra_data["optimistic_state"] = PlayerState.IDLE
+ group_player.state = PlayerState.IDLE
# forward command to player and any connected sync child's
async with asyncio.TaskGroup() as tg:
- for member in self._get_active_members(
- player_id, only_powered=True, skip_sync_childs=True
- ):
+ for member in self.mass.players.iter_group_members(group_player, only_powered=True):
if member.state == PlayerState.IDLE:
continue
tg.create_task(self.mass.players.cmd_stop(member.player_id))
async def cmd_play(self, player_id: str) -> None:
"""Send PLAY command to given player."""
- group_player = self.mass.players.get(player_id)
- group_player.extra_data["optimistic_state"] = PlayerState.PLAYING
- async with asyncio.TaskGroup() as tg:
- for member in self._get_active_members(
- player_id, only_powered=False, skip_sync_childs=True
- ):
- tg.create_task(self.mass.players.cmd_play(member.player_id))
+
+ async def cmd_pause(self, player_id: str) -> None:
+ """Send PAUSE command to given player."""
+
+ async def cmd_power(self, player_id: str, powered: bool) -> None:
+ """Send POWER command to given player."""
+ await self.mass.players.cmd_group_power(player_id, powered)
+
+ async def cmd_volume_set(self, player_id: str, volume_level: int) -> None:
+ """Send VOLUME_SET command to given player."""
+ # group volume is already handled in the player manager
+
+ async def cmd_volume_mute(self, player_id: str, muted: bool) -> None:
+ """Send VOLUME MUTE command to given player."""
async def play_media(
self,
# power ON
await self.cmd_power(player_id, True)
group_player = self.mass.players.get(player_id)
- active_members = self._get_active_members(
- player_id, only_powered=True, skip_sync_childs=True
- )
- if len(active_members) == 0:
- self.logger.warning(
- "Play media requested for player %s but no member players are powered, "
- "the request will be ignored",
- group_player.display_name,
- )
- return
- group_player.extra_data["optimistic_state"] = PlayerState.PLAYING
+ # create multi-client stream job
+ stream_job = await self.mass.streams.create_multi_client_stream_job(
+ player_id, start_queue_item=queue_item, seek_position=seek_position, fade_in=fade_in
+ )
- # forward the command to all (sync master) group child's
+ # forward the stream job to all group members
async with asyncio.TaskGroup() as tg:
- for member in active_members:
+ for member in self.mass.players.iter_group_members(group_player, only_powered=True):
player_prov = self.mass.players.get_player_provider(member.player_id)
- tg.create_task(
- player_prov.play_media(
- member.player_id,
- queue_item=queue_item,
- seek_position=seek_position,
- fade_in=fade_in,
- )
- )
-
- async def enqueue_next_queue_item(self, player_id: str, queue_item: QueueItem):
- """
- Handle enqueuing of the next queue item on the player.
+ if member.player_id.startswith(SYNCGROUP_PREFIX):
+ member = self.mass.players.get_sync_leader(member) # noqa: PLW2901
+ if member is None:
+ continue
+ tg.create_task(player_prov.play_stream(member.player_id, stream_job))
+ stream_job.start()
- If the player supports PlayerFeature.ENQUE_NEXT:
- This will be called about 10 seconds before the end of the track.
- If the player does NOT report support for PlayerFeature.ENQUE_NEXT:
- This will be called when the end of the track is reached.
-
- A PlayerProvider implementation is in itself responsible for handling this
- so that the queue items keep playing until its empty or the player stopped.
+ async def poll_player(self, player_id: str) -> None:
+ """Poll player for state updates."""
+ self.update_attributes(player_id)
+ self.mass.players.update(player_id, skip_forward=True)
- This will NOT be called if the end of the queue is reached (and repeat disabled).
- This will NOT be called if the player is using flow mode to playback the queue.
- """
- # forward the command to all (sync master) group child's
- async with asyncio.TaskGroup() as tg:
- for member in self._get_active_members(
- player_id, only_powered=False, skip_sync_childs=True
- ):
- player_prov = self.mass.players.get_player_provider(member.player_id)
- tg.create_task(player_prov.enqueue_next_queue_item(member.player_id, queue_item))
+ async def create_group(self, name: str, members: list[str]) -> Player:
+ """Create new PlayerGroup on this provider.
- async def cmd_pause(self, player_id: str) -> None:
- """Send PAUSE command to given player."""
- group_player = self.mass.players.get(player_id)
- group_player.extra_data["optimistic_state"] = PlayerState.PAUSED
- async with asyncio.TaskGroup() as tg:
- for member in self._get_active_members(
- player_id, only_powered=True, skip_sync_childs=True
- ):
- tg.create_task(self.mass.players.cmd_pause(member.player_id))
+ Create a new PlayerGroup with given name and members.
- async def cmd_power(self, player_id: str, powered: bool) -> None:
- """Send POWER command to given player."""
- group_power_on = await self.mass.config.get_player_config_value(
- player_id, CONF_GROUPED_POWER_ON
+ - name: Name for the new group to create.
+ - members: A list of player_id's that should be part of this group.
+ """
+ new_group_id = f"{UGP_PREFIX}{shortuuid.random(8).lower()}"
+ # cleanup list, filter groups (should be handled by frontend, but just in case)
+ members = [
+ x.player_id
+ for x in self.mass.players
+ if x.player_id in members
+ if x.provider != self.instance_id
+ ]
+ # create default config with the user chosen name
+ self.mass.config.create_default_player_config(
+ new_group_id,
+ self.instance_id,
+ name=name,
+ enabled=True,
+ values={CONF_GROUP_MEMBERS: members},
)
- mute_childs = self.mass.config.get_raw_player_config_value(player_id, CONF_MUTE_CHILDS, [])
- group_player = self.mass.players.get(player_id)
-
- async def set_child_power(child_player: Player) -> None:
- # do not turn on the player if not explicitly requested
- # so either the group player turns off OR
- # it turns ON and we have the group_power_on config option enabled
- if not (not powered or group_power_on):
- return
- # make sure to disable the mute as power workaround,
- # otherwise the player keeps on playing "invisible"
- if not powered and child_player.player_id in mute_childs:
- child_player.mute_as_power = False
- if child_player.volume_muted:
- await self.mass.players.cmd_volume_mute(child_player.player_id, False)
- # send actual power command to child player
- await self.mass.players.cmd_power(child_player.player_id, powered)
-
- # set optimistic state on child player to prevent race conditions in other actions
- child_player.powered = powered
-
- # turn on/off child players if needed
- async with asyncio.TaskGroup() as tg:
- for member in self._get_active_members(
- player_id, only_powered=False, skip_sync_childs=False
- ):
- tg.create_task(set_child_power(member))
-
- # (re)set mute_as_power feature for group members
- for child_player_id in mute_childs:
- if child_player := self.mass.players.get(child_player_id):
- child_player.mute_as_power = powered
-
- group_player.powered = powered
- if not powered:
- group_player.extra_data["optimistic_state"] = PlayerState.IDLE
- self.mass.players.update(player_id)
- if powered:
- # sync all players on power on
- await self._sync_players(player_id)
-
- async def cmd_volume_set(self, player_id: str, volume_level: int) -> None:
- """Send VOLUME_SET command to given player."""
- # group volume is already handled in the player manager
-
- async def cmd_volume_mute(self, player_id: str, muted: bool) -> None:
- """Send VOLUME MUTE command to given player."""
+ player = self._register_group_player(new_group_id, name=name, members=members)
+ return player
+
+ async def _register_all_players(self) -> None:
+ """Register all (virtual/fake) group players in the Player controller."""
+ player_configs = await self.mass.config.get_player_configs(self.instance_id)
+ for player_config in player_configs:
+ members = player_config.get_value(CONF_GROUP_MEMBERS)
+ self._register_group_player(
+ player_config.player_id, player_config.name or player_config.default_name, members
+ )
- async def poll_player(self, player_id: str) -> None:
- """Poll player for state updates."""
- self.update_attributes(player_id)
- self.mass.players.update(player_id, skip_forward=True)
+ def _register_group_player(
+ self, group_player_id: str, name: str, members: Iterable[str]
+ ) -> Player:
+ """Register a UGP group player in the Player controller."""
+ player = Player(
+ player_id=group_player_id,
+ provider=self.instance_id,
+ type=PlayerType.SYNC_GROUP,
+ name=name,
+ available=True,
+ powered=False,
+ device_info=DeviceInfo(model="Group", manufacturer=self.name),
+ supported_features=(PlayerFeature.VOLUME_SET, PlayerFeature.POWER),
+ group_childs=set(members),
+ )
+ self.mass.players.register_or_update(player)
+ return player
def update_attributes(self, player_id: str) -> None:
"""Update player attributes."""
group_player = self.mass.players.get(player_id)
if not group_player.powered:
group_player.state = PlayerState.IDLE
- group_player.active_source = None
return
- all_members = self._get_active_members(
- player_id, only_powered=False, skip_sync_childs=False
- )
- group_player.group_childs = list(x.player_id for x in all_members)
- group_player.active_source = player_id
# read the state from the first active group member
- for member in all_members:
- if member.synced_to:
- continue
- if member.mute_as_power:
- player_powered = member.powered and not member.volume_muted
- else:
- player_powered = member.powered
- if not player_powered:
- continue
+ for member in self.mass.players.iter_group_members(group_player, only_powered=True):
group_player.current_item_id = member.current_item_id
group_player.elapsed_time = member.elapsed_time
group_player.elapsed_time_last_updated = member.elapsed_time_last_updated
group_player.state = member.state
break
- async def on_child_power(self, player_id: str, child_player: Player, new_power: bool) -> None:
+ def on_child_power(self, player_id: str, child_player_id: str, new_power: bool) -> None:
"""
- Call when a power command was executed on one of the child players.
+ Call when a power command was executed on one of the child player of a PlayerGroup.
- This is used to handle special actions such as mute-as-power or (re)syncing.
+ This is used to handle special actions such as (re)syncing.
"""
group_player = self.mass.players.get(player_id)
+ child_player = self.mass.players.get(child_player_id)
if not group_player.powered:
# guard, this should be caught in the player controller but just in case...
return
- powered_childs = self._get_active_members(player_id, True, False)
- if not new_power and child_player in powered_childs:
- powered_childs.remove(child_player)
+ powered_childs = [
+ x
+ for x in self.mass.players.iter_group_members(group_player, True)
+ if not (not new_power and x.player_id == child_player_id)
+ ]
+ if new_power and child_player not in powered_childs:
+ powered_childs.append(child_player)
# if the last player of a group turned off, turn off the group
if len(powered_childs) == 0:
self.logger.debug(
- "Group %s has no more powered members, turning off group player", player_id
+ "Group %s has no more powered members, turning off group player",
+ group_player.display_name,
)
self.mass.create_task(self.cmd_power(player_id, False))
return False
- group_playing = group_player.extra_data["optimistic_state"] == PlayerState.PLAYING
# if a child player turned ON while the group player is already playing
# we need to resync/resume
- if new_power and group_playing:
- if sync_leader := next(
- (x for x in child_player.can_sync_with if x in self.prev_sync_leaders[player_id]),
- None,
- ):
- # prevent resume when player platform supports sync
- # and one of its players is already playing
- self.logger.debug(
- "Groupplayer %s forced resync due to groupmember change", player_id
- )
- self.mass.create_task(
- self.mass.players.cmd_sync(child_player.player_id, sync_leader)
- )
- else:
- # send active source because the group may be within another group
- self.logger.debug(
- "Groupplayer %s forced resume due to groupmember change", player_id
- )
- self.mass.create_task(self.mass.player_queues.resume(group_player.active_source))
- elif (
- not new_power
- and group_playing
- and child_player.player_id in self.prev_sync_leaders[player_id]
- and not child_player.mute_as_power
- ):
- # a sync master player turned OFF while the group player
- # should still be playing - we need to resync/resume
- # send atcive source because the group may be within another group
- self.logger.debug("Groupplayer %s forced resume due to groupmember change", player_id)
- self.mass.create_task(self.mass.player_queues.resume, group_player.active_source)
-
- def _get_active_members(
- self,
- player_id: str,
- only_powered: bool = False,
- skip_sync_childs: bool = True,
- ) -> list[Player]:
- """Get (child) players attached to a grouped player."""
- child_players: list[Player] = []
- conf_members: list[str] = self.config.get_value(player_id)
- ignore_ids = set()
- if group_player := self.mass.players.get(player_id):
- parent_source = group_player.active_source
- else:
- parent_source = player_id
- for child_id in conf_members:
- if child_player := self.mass.players.get(child_id, False):
- if not child_player.available:
- continue
- # work out power state
- if child_player.mute_as_power:
- player_powered = child_player.powered and not child_player.volume_muted
- else:
- player_powered = child_player.powered
- if not (not only_powered or player_powered):
- continue
- if child_player.synced_to and skip_sync_childs:
- continue
- allowed_sources = [child_player.player_id, player_id, parent_source] + conf_members
- if child_player.active_source not in allowed_sources:
- # edge case: the child player has another group already active!
- continue
- if child_player.synced_to and child_player.synced_to not in allowed_sources:
- # edge case: the child player is already synced to another player
- continue
- child_players.append(child_player)
- # handle edge case where a group is in the group and both the group
- # and (one of its) child's are added to this universal group.
- if child_player.type == PlayerType.GROUP:
- ignore_ids.update(
- x for x in child_player.group_childs if x != child_player.player_id
- )
- return [x for x in child_players if x.player_id not in ignore_ids]
-
- async def _sync_players(self, player_id: str) -> None:
- """Sync all (possible) players."""
- sync_leaders = set()
- # TODO: sort members on sync master priority attribute ?
- for member in self._get_active_members(player_id, only_powered=True):
- if member.synced_to is not None:
- continue
- if not member.can_sync_with:
- continue
- # check if we can join this player to an already chosen sync leader
- if existing_leader := next(
- (x for x in member.can_sync_with if x in sync_leaders), None
- ):
- await self.mass.players.cmd_sync(member.player_id, existing_leader)
- # set optimistic state to prevent race condition in play media
- member.synced_to = existing_leader
- continue
- # pick this member as new sync leader
- sync_leaders.add(member.player_id)
- self.prev_sync_leaders[player_id] = tuple(sync_leaders)
+ if new_power and group_player.state == PlayerState.PLAYING:
+ self.logger.warning(
+ "Player %s turned on while syncgroup is playing, "
+ "a forced resume for %s will be performed...",
+ child_player.display_name,
+ group_player.display_name,
+ )
+ self.mass.loop.call_later(
+ 1, self.mass.create_task, self.mass.player_queues.resume(group_player.player_id)
+ )