from music_assistant.helpers.util import TaskManager, get_changed_values
from music_assistant.models.core_controller import CoreController
from music_assistant.models.player_provider import PlayerProvider
-from music_assistant.providers.player_group import PlayerGroupProvider
if TYPE_CHECKING:
from collections.abc import Awaitable, Callable, Coroutine, Iterator
# ungroup player at power off
player_was_synced = player.synced_to is not None
- if not powered and (player.synced_to):
+ if not powered:
+ # this will handle both synced players and group players
+ # NOTE: ungroup will be ignored if the player is not grouped or synced
await self.cmd_ungroup(player_id)
# always stop player at power off
else:
# allow the stop command to process and prevent race conditions
await asyncio.sleep(0.2)
- await self.mass.cache.set(player_id, powered, base_key="player_power")
+
+ # store last power state in cache
+ await self.mass.cache.set(player_id, powered, base_key="player_power")
# always optimistically set the power state to update the UI
# as fast as possible and prevent race conditions
parent_player: Player = self.get(target_player, True)
prev_group_childs = parent_player.group_childs.copy()
if PlayerFeature.SET_MEMBERS not in parent_player.supported_features:
- msg = f"Player {parent_player.name} does not support sync commands"
+ msg = f"Player {parent_player.name} does not support group commands"
raise UnsupportedFeaturedException(msg)
if parent_player.synced_to:
if not (
child_player_id in parent_player.can_group_with
or child_player.provider in parent_player.can_group_with
- or "*" in parent_player.can_group_with
):
raise UnsupportedFeaturedException(
f"Player {child_player.name} can not be grouped with {parent_player.name}"
if not (player := self.get(player_id)):
self.logger.warning("Player %s is not available", player_id)
return
- if PlayerFeature.SET_MEMBERS not in player.supported_features:
- self.logger.warning("Player %s does not support (un)group commands", player.name)
+
+ if (
+ player.active_group
+ and (group_player := self.get(player.active_group))
+ and PlayerFeature.SET_MEMBERS in group_player.supported_features
+ ):
+ # the player is part of a (permanent) groupplayer and the user tries to ungroup
+ # redirect the command to the group provider
+ group_provider = self.mass.get_provider(group_player.provider)
+ await group_provider.cmd_ungroup_member(player_id, group_player.player_id)
return
+
if not (player.synced_to or player.group_childs):
return # nothing to do
- if player.active_group and (
- (group_provider := self.get_player_provider(player.active_group))
- and group_provider.domain == "player_group"
- ):
- # the player is part of a permanent (sync)group and the user tries to ungroup
- # redirect the command to the group provider
- group_provider = cast(PlayerGroupProvider, group_provider)
- await group_provider.cmd_ungroup_member(player_id, player.active_group)
+ if PlayerFeature.SET_MEMBERS not in player.supported_features:
+ self.logger.warning("Player %s does not support (un)group commands", player.name)
return
# handle (edge)case where un ungroup command is sent to a sync leader;
- player_id: player_id of the player to handle the command.
- target_player: player_id of the sync leader.
"""
- # will only be called for players with SYNC feature set.
+ # will only be called for players with SET_MEMBERS feature set.
raise NotImplementedError
async def cmd_ungroup(self, player_id: str) -> None:
- player_id: player_id of the player to handle the command.
"""
- # will only be called for players with SYNC feature set.
+ # will only be called for players with SET_MEMBERS feature set.
raise NotImplementedError
async def cmd_group_many(self, target_player: str, child_player_ids: list[str]) -> None:
# default implementation, simply call the cmd_group for all child players
await self.cmd_group(child_id, target_player)
+ async def cmd_ungroup_member(self, player_id: str, target_player: str) -> None:
+ """Handle UNGROUP command for given player.
+
+ Remove the given player(id) from the given (master) player/sync group.
+
+ - player_id: player_id of the (child) player to ungroup from the group.
+ - target_player: player_id of the group player.
+ """
+ # can only be called for groupplayers with SET_MEMBERS feature set.
+ raise NotImplementedError
+
async def poll_player(self, player_id: str) -> None:
"""Poll player for state updates.
_players: dict[str, AirPlayPlayer]
_dacp_server: asyncio.Server = None
_dacp_info: AsyncServiceInfo = None
- _play_media_lock: asyncio.Lock = asyncio.Lock()
@property
def supported_features(self) -> set[ProviderFeature]:
media: PlayerMedia,
) -> None:
"""Handle PLAY MEDIA on given player."""
- async with self._play_media_lock:
- player = self.mass.players.get(player_id)
- # set the active source for the player to the media queue
- # this accounts for syncgroups and linked players (e.g. sonos)
- player.active_source = media.queue_id
- if player.synced_to:
- # should not happen, but just in case
- raise RuntimeError("Player is synced")
- # always stop existing stream first
- async with TaskManager(self.mass) as tg:
- for airplay_player in self._get_sync_clients(player_id):
- tg.create_task(airplay_player.cmd_stop(update_state=False))
- # select audio source
- if media.media_type == MediaType.ANNOUNCEMENT:
- # special case: stream announcement
- input_format = AIRPLAY_PCM_FORMAT
- audio_source = self.mass.streams.get_announcement_stream(
- media.custom_data["url"],
- output_format=AIRPLAY_PCM_FORMAT,
- use_pre_announce=media.custom_data["use_pre_announce"],
- )
- elif media.queue_id.startswith("ugp_"):
- # special case: UGP stream
- ugp_provider: PlayerGroupProvider = self.mass.get_provider("player_group")
- ugp_stream = ugp_provider.ugp_streams[media.queue_id]
- input_format = ugp_stream.output_format
- audio_source = ugp_stream.subscribe()
- elif media.queue_id and media.queue_item_id:
- # regular queue (flow) stream request
- input_format = AIRPLAY_FLOW_PCM_FORMAT
- audio_source = self.mass.streams.get_flow_stream(
- queue=self.mass.player_queues.get(media.queue_id),
- start_queue_item=self.mass.player_queues.get_item(
- media.queue_id, media.queue_item_id
- ),
- pcm_format=input_format,
- )
- else:
- # assume url or some other direct path
- # NOTE: this will fail if its an uri not playable by ffmpeg
- input_format = AIRPLAY_PCM_FORMAT
- audio_source = get_ffmpeg_stream(
- audio_input=media.uri,
- input_format=AudioFormat(ContentType.try_parse(media.uri)),
- output_format=AIRPLAY_PCM_FORMAT,
- )
- # setup RaopStreamSession for player (and its sync childs if any)
- sync_clients = self._get_sync_clients(player_id)
- raop_stream_session = RaopStreamSession(self, sync_clients, input_format, audio_source)
- await raop_stream_session.start()
+ player = self.mass.players.get(player_id)
+ # set the active source for the player to the media queue
+ # this accounts for syncgroups and linked players (e.g. sonos)
+ player.active_source = media.queue_id
+ if player.synced_to:
+ # should not happen, but just in case
+ raise RuntimeError("Player is synced")
+ # always stop existing stream first
+ async with TaskManager(self.mass) as tg:
+ for airplay_player in self._get_sync_clients(player_id):
+ tg.create_task(airplay_player.cmd_stop(update_state=False))
+ # select audio source
+ if media.media_type == MediaType.ANNOUNCEMENT:
+ # special case: stream announcement
+ input_format = AIRPLAY_PCM_FORMAT
+ audio_source = self.mass.streams.get_announcement_stream(
+ media.custom_data["url"],
+ output_format=AIRPLAY_PCM_FORMAT,
+ use_pre_announce=media.custom_data["use_pre_announce"],
+ )
+ elif media.queue_id.startswith("ugp_"):
+ # special case: UGP stream
+ ugp_provider: PlayerGroupProvider = self.mass.get_provider("player_group")
+ ugp_stream = ugp_provider.ugp_streams[media.queue_id]
+ input_format = ugp_stream.output_format
+ audio_source = ugp_stream.subscribe()
+ elif media.queue_id and media.queue_item_id:
+ # regular queue (flow) stream request
+ input_format = AIRPLAY_FLOW_PCM_FORMAT
+ audio_source = self.mass.streams.get_flow_stream(
+ queue=self.mass.player_queues.get(media.queue_id),
+ start_queue_item=self.mass.player_queues.get_item(
+ media.queue_id, media.queue_item_id
+ ),
+ pcm_format=input_format,
+ )
+ else:
+ # assume url or some other direct path
+ # NOTE: this will fail if its an uri not playable by ffmpeg
+ input_format = AIRPLAY_PCM_FORMAT
+ audio_source = get_ffmpeg_stream(
+ audio_input=media.uri,
+ input_format=AudioFormat(ContentType.try_parse(media.uri)),
+ output_format=AIRPLAY_PCM_FORMAT,
+ )
+ # setup RaopStreamSession for player (and its sync childs if any)
+ sync_clients = self._get_sync_clients(player_id)
+ raop_stream_session = RaopStreamSession(self, sync_clients, input_format, audio_source)
+ await raop_stream_session.start()
async def cmd_volume_set(self, player_id: str, volume_level: int) -> None:
"""Send VOLUME_SET command to given player.
else:
manufacturer, model = get_model_info(info)
- default_enabled = not is_broken_raop_model(manufacturer, model)
- if not self.mass.config.get_raw_player_config_value(player_id, "enabled", default_enabled):
+ if not self.mass.config.get_raw_player_config_value(player_id, "enabled", True):
self.logger.debug("Ignoring %s in discovery as it is disabled.", display_name)
return
},
volume_level=volume,
can_group_with={self.instance_id},
+ enabled_by_default=not is_broken_raop_model(manufacturer, model),
)
await self.mass.players.register_or_update(mass_player)
# TODO fix sync & ungroup
async def cmd_group(self, player_id: str, target_player: str) -> None:
- """Handle SYNC command for BluOS player."""
+ """Handle GROUP command for BluOS player."""
async def cmd_ungroup(self, player_id: str) -> None:
"""Handle UNGROUP command for BluOS player."""
from music_assistant.helpers.util import TaskManager
from music_assistant.models.player_provider import PlayerProvider
-from .ugp_stream import UGP_FORMAT, UGPStream
+from .ugp_stream import UGPStream
if TYPE_CHECKING:
from collections.abc import Iterable
from music_assistant.models import ProviderInstanceType
+UGP_FORMAT = AudioFormat(
+ content_type=ContentType.PCM_F32LE,
+ sample_rate=48000,
+ bit_depth=32,
+)
+
# ruff: noqa: ARG002
UNIVERSAL_PREFIX: Final[str] = "ugp_"
CONF_ENTRY_PLAYER_ICON_GROUP,
CONF_ENTRY_GROUP_TYPE,
CONF_ENTRY_GROUP_MEMBERS,
+ CONFIG_ENTRY_DYNAMIC_MEMBERS,
)
# group type is static and can not be changed. we just grab the existing, stored value
group_type: str = self.mass.config.get_raw_player_config_value(
return (
*base_entries,
group_members,
- CONFIG_ENTRY_DYNAMIC_MEMBERS,
*(entry for entry in child_config_entries if entry.key in allowed_conf_entries),
)
async def on_player_config_change(self, config: PlayerConfig, changed_keys: set[str]) -> None:
"""Call (by config manager) when the configuration of a player changes."""
+ members = config.get_value(CONF_GROUP_MEMBERS)
if f"values/{CONF_GROUP_MEMBERS}" in changed_keys:
- members = config.get_value(CONF_GROUP_MEMBERS)
# ensure we filter invalid members
members = self._filter_members(config.get_value(CONF_GROUP_TYPE), members)
if group_player := self.mass.players.get(config.player_id):
if not powered and group_player.state in (PlayerState.PLAYING, PlayerState.PAUSED):
await self.cmd_stop(group_player.player_id)
- # always (re)fetch the configured group members at power on
- if not group_player.powered:
- group_member_ids = self.mass.config.get_raw_player_config_value(
- player_id, CONF_GROUP_MEMBERS, []
- )
- group_player.group_childs.set(
- x
- for x in group_member_ids
- if (child_player := self.mass.players.get(x))
- and child_player.available
- and child_player.enabled
- )
-
if powered:
# handle TURN_ON of the group player by turning on all members
for member in self.mass.players.iter_group_members(
group_player.powered = powered
self.mass.players.update(group_player.player_id)
if not powered:
- # reset the group members when powered off
+ # reset the original group members when powered off
group_player.group_childs.set(
self.mass.config.get_raw_player_config_value(player_id, CONF_GROUP_MEMBERS, [])
)
raise UnsupportedFeaturedException(
f"Adjusting group members is not allowed for group {group_player.display_name}"
)
- new_members = self._filter_members(group_type, [*group_player.group_childs, player_id])
- group_player.group_childs.set(new_members)
- if group_player.powered:
+ group_player.group_childs.append(player_id)
+
+ # handle resync/resume if group player was already playing
+ if group_player.state == PlayerState.PLAYING and group_type == GROUP_TYPE_UNIVERSAL:
+ child_player_provider = self.mass.players.get_player_provider(player_id)
+ base_url = f"{self.mass.streams.base_url}/ugp/{group_player.player_id}.mp3"
+ await child_player_provider.play_media(
+ player_id,
+ media=PlayerMedia(
+ uri=f"{base_url}?player_id={player_id}",
+ media_type=MediaType.FLOW_STREAM,
+ title=group_player.display_name,
+ queue_id=group_player.player_id,
+ ),
+ )
+ elif group_player.powered and group_type != GROUP_TYPE_UNIVERSAL:
# power on group player (which will also resync) if needed
await self.cmd_power(target_player, True)
raise UnsupportedFeaturedException(
f"Adjusting group members is not allowed for group {group_player.display_name}"
)
- is_sync_leader = len(child_player.group_childs) > 0
+ group_type = self.mass.config.get_raw_player_config_value(
+ group_player.player_id, CONF_ENTRY_GROUP_TYPE.key, CONF_ENTRY_GROUP_TYPE.default_value
+ )
was_playing = child_player.state == PlayerState.PLAYING
- # forward command to the player provider
+ is_sync_leader = len(child_player.group_childs) > 0
+ group_player.group_childs.remove(player_id)
+ child_player.active_group = None
+ child_player.active_source = None
+ if group_type == GROUP_TYPE_UNIVERSAL:
+ if was_playing:
+ # stop playing the group player
+ player_provider = self.mass.players.get_player_provider(child_player.player_id)
+ await player_provider.cmd_stop(child_player.player_id)
+ self._update_attributes(group_player)
+ return
+ # handle sync group
if player_provider := self.mass.players.get_player_provider(child_player.player_id):
await player_provider.cmd_ungroup(child_player.player_id)
- child_player.active_group = None
- child_player.active_source = None
- group_player.group_childs.set({x for x in group_player.group_childs if x != player_id})
- if is_sync_leader and was_playing:
+ if is_sync_leader and was_playing and group_player.powered:
# ungrouping the sync leader will stop the group so we need to resume
- self.mass.call_later(2, self.mass.players.cmd_play, group_player.player_id)
- elif group_player.powered:
- # power on group player (which will also resync) if needed
- await self.cmd_power(group_player.player_id, True)
+ task_id = f"resync_group_{group_player.player_id}"
+ self.mass.call_later(
+ 3, self.mass.players.cmd_play, group_player.player_id, task_id=task_id
+ )
async def _register_all_players(self) -> None:
"""Register all (virtual/fake) group players in the Player controller."""
def _update_attributes(self, player: Player) -> None:
"""Update attributes of a player."""
+ group_type = self.mass.config.get_raw_player_config_value(
+ player.player_id, CONF_ENTRY_GROUP_TYPE.key, CONF_ENTRY_GROUP_TYPE.default_value
+ )
for child_player in self.mass.players.iter_group_members(player, active_only=True):
# just grab the first active player
if child_player.synced_to:
else:
player.state = PlayerState.IDLE
player.active_source = player.player_id
+ if group_type == GROUP_TYPE_UNIVERSAL:
+ can_group_with = {
+ # allow grouping with all providers, except the playergroup provider itself
+ x.instance_id
+ for x in self.mass.players.providers
+ if x.instance_id != self.instance_id
+ }
+ elif sync_player_provider := self.mass.get_provider(group_type):
+ can_group_with = {sync_player_provider.instance_id}
+ else:
+ can_group_with = {}
+
+ player.can_group_with = can_group_with
self.mass.players.update(player.player_id)
async def _serve_ugp_stream(self, request: web.Request) -> web.Response:
# ruff: noqa: ARG002
-UGP_FORMAT = AudioFormat(
- content_type=ContentType.PCM_F32LE,
- sample_rate=48000,
- bit_depth=32,
-)
-
class UGPStream:
"""
if self.client.player.is_passive:
self.logger.debug("Ignore STOP command: Player is synced to another player.")
return
- if (airplay := self.get_linked_airplay_player(True)) and airplay.state != PlayerState.IDLE:
+ if airplay := self.get_linked_airplay_player(True):
# linked airplay player is active, redirect the command
self.logger.debug("Redirecting PLAY command to linked airplay player.")
if player_provider := self.mass.get_provider(airplay.provider):
self.mass_player.volume_muted = self.client.player.volume_muted
group_parent = None
+ airplay_player = self.get_linked_airplay_player(False)
if self.client.player.is_coordinator:
# player is group coordinator
active_group = self.client.player.group
self.mass_player.group_childs.set(self.client.player.group_members)
else:
self.mass_player.group_childs.clear()
+ # append airplay child's to group childs
+ if self.airplay_mode_enabled and airplay_player:
+ airplay_childs = [
+ x for x in airplay_player.group_childs if x != airplay_player.player_id
+ ]
+ self.mass_player.group_childs.extend(airplay_childs)
+ airplay_prov = self.mass.get_provider(airplay_player.provider)
+ self.mass_player.can_group_with.update(
+ x.player_id
+ for x in airplay_prov.players
+ if x.player_id != airplay_player.player_id
+ )
+ else:
+ self.mass_player.can_group_with = {self.prov.instance_id}
self.mass_player.synced_to = None
else:
# player is group child (synced to another player)
self.mass_player.active_source = SOURCE_LINE_IN
elif container_type == ContainerType.AIRPLAY:
# check if the MA airplay player is active
- airplay_player = self.get_linked_airplay_player(False)
if airplay_player and airplay_player.state in (
PlayerState.PLAYING,
PlayerState.PAUSED,
async def cmd_group_many(self, target_player: str, child_player_ids: list[str]) -> None:
"""Create temporary sync group by joining given players to target player."""
sonos_player = self.sonos_players[target_player]
- await sonos_player.client.player.group.modify_group_members(
- player_ids_to_add=child_player_ids, player_ids_to_remove=[]
- )
+ if airplay_player := sonos_player.get_linked_airplay_player(False):
+ # if airplay mode is enabled, we could possibly receive child player id's that are
+ # not Sonos players, but Airplay players. We redirect those.
+ airplay_child_ids = [x for x in child_player_ids if x.startswith("ap")]
+ child_player_ids = [x for x in child_player_ids if x not in airplay_child_ids]
+ if airplay_child_ids:
+ await self.mass.players.cmd_group_many(airplay_player.player_id, airplay_child_ids)
+ if child_player_ids:
+ await sonos_player.client.player.group.modify_group_members(
+ player_ids_to_add=child_player_ids, player_ids_to_remove=[]
+ )
async def cmd_ungroup(self, player_id: str) -> None:
"""Handle UNGROUP command for given player.