self.manifest.icon = "speaker-multiple"
self._poll_task: asyncio.Task | None = None
self._player_throttlers: dict[str, Throttler] = {}
+ # TEMP 2024-11-20: register some aliases for renamed commands
+ # remove after a few releases
+ self.mass.register_api_command("players/cmd/sync", self.cmd_group)
+ self.mass.register_api_command("players/cmd/unsync", self.cmd_ungroup)
+ self.mass.register_api_command("players/cmd/sync_many", self.cmd_group_many)
+ self.mass.register_api_command("players/cmd/unsync_many", self.cmd_ungroup_many)
async def setup(self, config: CoreConfig) -> None:
"""Async initialize of module."""
@property
def providers(self) -> list[PlayerProvider]:
"""Return all loaded/running MusicProviders."""
- return self.mass.get_providers(ProviderType.MUSIC) # type: ignore=return-value
+ return self.mass.get_providers(ProviderType.PLAYER) # type: ignore=return-value
def __iter__(self) -> Iterator[Player]:
"""Iterate over (available) players."""
if player.powered == powered:
return # nothing to do
- # unsync player at power off
+ # ungroup player at power off
player_was_synced = player.synced_to is not None
if not powered and (player.synced_to):
- await self.cmd_unsync(player_id)
+ await self.cmd_ungroup(player_id)
# always stop player at power off
if (
async with self._player_throttlers[player_id]:
await player_prov.enqueue_next_media(player_id=player_id, media=media)
- @api_command("players/cmd/sync")
+ @api_command("players/cmd/group")
@handle_player_command
- async def cmd_sync(self, player_id: str, target_player: str) -> None:
- """Handle SYNC command for given player.
+ async def cmd_group(self, player_id: str, target_player: str) -> None:
+ """Handle GROUP command for given player.
Join/add the given player(id) to the given (leader) player/sync group.
- If the player is already synced to another player, it will be unsynced there first.
If the target player itself is already synced to another player, this may fail.
If the player can not be synced with the given target player, this may fail.
- player_id: player_id of the player to handle the command.
- target_player: player_id of the syncgroup leader or group player.
"""
- await self.cmd_sync_many(target_player, [player_id])
+ await self.cmd_group_many(target_player, [player_id])
- @api_command("players/cmd/unsync")
- @handle_player_command
- async def cmd_unsync(self, player_id: str) -> None:
- """Handle UNSYNC command for given player.
-
- Remove the given player from any syncgroups it currently is synced to.
- If the player is not currently synced to any other player,
- this will silently be ignored.
-
- - player_id: player_id of the player to handle the command.
- """
- if not (player := self.get(player_id)):
- self.logger.warning("Player %s is not available", player_id)
- return
- if PlayerFeature.SYNC not in player.supported_features:
- self.logger.warning("Player %s does not support (un)sync commands", player.name)
- return
- if not (player.synced_to or player.group_childs):
- return # nothing to do
-
- if player.active_group and (
- (group_provider := self.get_player_provider(player.active_group))
- and group_provider.domain == "player_group"
- ):
- # the player is part of a permanent (sync)group and the user tries to unsync
- # redirect the command to the group provider
- group_provider = cast(PlayerGroupProvider, group_provider)
- await group_provider.cmd_unsync_member(player_id, player.active_group)
- return
-
- # handle (edge)case where un unsync command is sent to a sync leader;
- # we dissolve the entire syncgroup in this case.
- # while maybe not strictly needed to do this for all player providers,
- # we do this to keep the functionality consistent across all providers
- if player.group_childs:
- self.logger.warning(
- "Detected unsync command to player %s which is a sync(group) leader, "
- "all sync members will be unsynced!",
- player.name,
- )
- async with TaskManager(self.mass) as tg:
- for group_child_id in player.group_childs:
- if group_child_id == player_id:
- continue
- tg.create_task(self.cmd_unsync(group_child_id))
- return
-
- # (optimistically) reset active source player if it is unsynced
- player.active_source = None
-
- # forward command to the player provider
- if player_provider := self.get_player_provider(player_id):
- await player_provider.cmd_unsync(player_id)
- # if the command succeeded we optimistically reset the sync state
- # this is to prevent race conditions and to update the UI as fast as possible
- player.synced_to = None
-
- @api_command("players/cmd/sync_many")
- async def cmd_sync_many(self, target_player: str, child_player_ids: list[str]) -> None:
- """Create temporary sync group by joining given players to target player."""
+ @api_command("players/cmd/group_many")
+ async def cmd_group_many(self, target_player: str, child_player_ids: list[str]) -> None:
+ """Join given player(s) to target player."""
parent_player: Player = self.get(target_player, True)
prev_group_childs = parent_player.group_childs.copy()
- if PlayerFeature.SYNC not in parent_player.supported_features:
+ if PlayerFeature.SET_MEMBERS not in parent_player.supported_features:
msg = f"Player {parent_player.name} does not support sync commands"
raise UnsupportedFeaturedException(msg)
# guard edge case: player already synced to another player
raise PlayerCommandFailed(
f"Player {parent_player.name} is already synced to another player on its own, "
- "you need to unsync it first before you can join other players to it.",
+ "you need to ungroup it first before you can join other players to it.",
)
# filter all player ids on compatibility and availability
if not (child_player := self.get(child_player_id)) or not child_player.available:
self.logger.warning("Player %s is not available", child_player_id)
continue
- if PlayerFeature.SYNC not in child_player.supported_features:
- # this should not happen, but just in case bad things happen, guard it
- self.logger.warning("Player %s does not support sync commands", child_player.name)
- continue
+ # check if player can be synced/grouped with the target player
+ if not (
+ child_player_id in parent_player.can_group_with
+ or child_player.provider in parent_player.can_group_with
+ or "*" in parent_player.can_group_with
+ ):
+ raise UnsupportedFeaturedException(
+ f"Player {child_player.name} can not be grouped with {parent_player.name}"
+ )
+
if child_player.synced_to and child_player.synced_to == target_player:
continue # already synced to this target
# guard edge case: childplayer is already a sync leader on its own
raise PlayerCommandFailed(
f"Player {child_player.name} is already synced with other players, "
- "you need to unsync it first before you can join it to another player.",
+ "you need to ungroup it first before you can join it to another player.",
)
if child_player.synced_to:
- # player already synced to another player, unsync first
+ # player already synced to another player, ungroup first
self.logger.warning(
- "Player %s is already synced to another player, unsyncing first",
+ "Player %s is already synced to another player, ungrouping first",
child_player.name,
)
- await self.cmd_unsync(child_player.player_id)
+ await self.cmd_ungroup(child_player.player_id)
# power on the player if needed
if not child_player.powered:
await self.cmd_power(child_player.player_id, True, skip_update=True)
player_provider = self.get_player_provider(target_player)
async with self._player_throttlers[target_player]:
try:
- await player_provider.cmd_sync_many(target_player, final_player_ids)
+ await player_provider.cmd_group_many(target_player, final_player_ids)
except Exception:
# restore sync state if the command failed
- parent_player.group_childs = prev_group_childs
+ parent_player.group_childs.set(prev_group_childs)
raise
- @api_command("players/cmd/unsync_many")
- async def cmd_unsync_many(self, player_ids: list[str]) -> None:
- """Handle UNSYNC command for all the given players."""
+ @api_command("players/cmd/ungroup")
+ @handle_player_command
+ async def cmd_ungroup(self, player_id: str) -> None:
+ """Handle UNGROUP command for given player.
+
+ Remove the given player from any (sync)groups it currently is synced to.
+ If the player is not currently grouped to any other player,
+ this will silently be ignored.
+
+ - player_id: player_id of the player to handle the command.
+ """
+ if not (player := self.get(player_id)):
+ self.logger.warning("Player %s is not available", player_id)
+ return
+ if PlayerFeature.SET_MEMBERS not in player.supported_features:
+ self.logger.warning("Player %s does not support (un)group commands", player.name)
+ return
+ if not (player.synced_to or player.group_childs):
+ return # nothing to do
+
+ if player.active_group and (
+ (group_provider := self.get_player_provider(player.active_group))
+ and group_provider.domain == "player_group"
+ ):
+ # the player is part of a permanent (sync)group and the user tries to ungroup
+ # redirect the command to the group provider
+ group_provider = cast(PlayerGroupProvider, group_provider)
+ await group_provider.cmd_ungroup_member(player_id, player.active_group)
+ return
+
+ # handle (edge)case where un ungroup command is sent to a sync leader;
+ # we dissolve the entire syncgroup in this case.
+ # while maybe not strictly needed to do this for all player providers,
+ # we do this to keep the functionality consistent across all providers
+ if player.group_childs:
+ self.logger.warning(
+ "Detected ungroup command to player %s which is a sync(group) leader, "
+ "all sync members will be ungrouped!",
+ player.name,
+ )
+ async with TaskManager(self.mass) as tg:
+ for group_child_id in player.group_childs:
+ if group_child_id == player_id:
+ continue
+ tg.create_task(self.cmd_ungroup(group_child_id))
+ return
+
+ # (optimistically) reset active source player if it is ungrouped
+ player.active_source = None
+
+ # forward command to the player provider
+ if player_provider := self.get_player_provider(player_id):
+ await player_provider.cmd_ungroup(player_id)
+ # if the command succeeded we optimistically reset the sync state
+ # this is to prevent race conditions and to update the UI as fast as possible
+ player.synced_to = None
+
+ @api_command("players/cmd/ungroup_many")
+ async def cmd_ungroup_many(self, player_ids: list[str]) -> None:
+ """Handle UNGROUP command for all the given players."""
for player_id in list(player_ids):
- await self.cmd_unsync(player_id)
+ await self.cmd_ungroup(player_id)
def set(self, player: Player) -> None:
"""Set/Update player details on the controller."""
player.active_source = self._get_active_source(player)
player.volume_level = player.volume_level or 0 # guard for None volume
# correct group_members if needed
- if player.group_childs == {player.player_id}:
- player.group_childs = set()
+ if player.group_childs == [player.player_id]:
+ player.group_childs.clear()
+ elif (
+ player.group_childs
+ and player.player_id not in player.group_childs
+ and player.type == PlayerType.PLAYER
+ ):
+ player.group_childs.set([player.player_id, *player.group_childs])
+ if player.active_group and player.active_group == player.player_id:
+ player.active_group = None
# Auto correct player state if player is synced (or group child)
# This is because some players/providers do not accurately update this info
# for the sync child's.
def _handle_player_unavailable(self, player: Player) -> None:
"""Handle a player becoming unavailable."""
if player.synced_to:
- self.mass.create_task(self.cmd_unsync(player.player_id))
+ self.mass.create_task(self.cmd_ungroup(player.player_id))
# also set this optimistically because the above command will most likely fail
player.synced_to = None
return
self.mass.create_task(self.cmd_power(group_child_id, False, True))
# also set this optimistically because the above command will most likely fail
child_player.synced_to = None
- player.group_childs = set()
+ player.group_childs.clear()
if player.active_group and (group_player := self.get(player.active_group)):
# remove player from group if its part of a group
group_player = self.get(player.active_group)
queue = self.mass.player_queues.get(player.active_source)
prev_queue_active = queue and queue.active
prev_item_id = player.current_item_id
- # unsync player if its currently synced
+ # ungroup player if its currently synced
if prev_synced_to:
self.logger.debug(
- "Announcement to player %s - unsyncing player...",
+ "Announcement to player %s - ungrouping player...",
player.display_name,
)
- await self.cmd_unsync(player.player_id)
+ await self.cmd_ungroup(player.player_id)
# stop player if its currently playing
elif prev_state in (PlayerState.PLAYING, PlayerState.PAUSED):
self.logger.debug(
await self.cmd_power(player.player_id, False)
return
elif prev_synced_to:
- await self.cmd_sync(player.player_id, prev_synced_to)
+ await self.cmd_group(player.player_id, prev_synced_to)
elif prev_queue_active and prev_state == PlayerState.PLAYING:
await self.mass.player_queues.resume(queue.queue_id, True)
elif prev_state == PlayerState.PLAYING:
import eyed3
from music_assistant_models.enums import AlbumType
from music_assistant_models.errors import InvalidDataError
-from music_assistant_models.media_items import MediaItemChapter
from music_assistant.constants import MASS_LOGGER_NAME, UNKNOWN_ARTIST
from music_assistant.helpers.process import AsyncProcess
"""Return artist sort name tag(s) if present."""
return split_items(self.tags.get("albumartistsort"), False)
+ @property
+ def is_audiobook(self) -> bool:
+ """Return True if this is an audiobook."""
+ return self.filename.endswith("m4b") and len(self.chapters) > 1
+
@property
def album_type(self) -> AlbumType:
"""Return albumtype tag if present."""
- # handle audiobook/podcast
- if self.filename.endswith("m4b") and len(self.chapters) > 1:
- return AlbumType.AUDIOBOOK
- if "podcast" in self.tags.get("genre", "").lower() and len(self.chapters) > 1:
- return AlbumType.PODCAST
if self.tags.get("compilation", "") == "1":
return AlbumType.COMPILATION
tag = (
# the album type tag is messy within id3 and may even contain multiple types
# try to parse one in order of preference
for album_type in (
- AlbumType.PODCAST,
- AlbumType.AUDIOBOOK,
AlbumType.COMPILATION,
AlbumType.EP,
AlbumType.SINGLE,
return None
@property
- def chapters(self) -> list[MediaItemChapter]:
+ def chapters(self) -> list[dict[str, Any]]:
"""Return chapters in MediaItem (if any)."""
- chapters: list[MediaItemChapter] = []
- if raw_chapters := self.raw.get("chapters"):
- for chapter_data in raw_chapters:
- chapters.append(
- MediaItemChapter(
- chapter_id=chapter_data["id"],
- position_start=chapter_data["start"],
- position_end=chapter_data["end"],
- title=chapter_data.get("tags", {}).get("title"),
- )
- )
- return chapters
+ return self.raw.get("chapters") or []
@property
def lyrics(self) -> str | None:
# ruff: noqa: ARG001, ARG002
-DEFAULT_SUPPORTED_FEATURES = (
+DEFAULT_SUPPORTED_FEATURES = {
ProviderFeature.ARTIST_METADATA,
ProviderFeature.ALBUM_METADATA,
ProviderFeature.TRACK_METADATA,
-)
+}
class MetadataProvider(Provider):
"""
@property
- def supported_features(self) -> tuple[ProviderFeature, ...]:
+ def supported_features(self) -> set[ProviderFeature]:
"""Return the features supported by this Provider."""
return DEFAULT_SUPPORTED_FEATURES
# will only be called for players with 'next_previous' feature set.
raise NotImplementedError
- async def cmd_sync(self, player_id: str, target_player: str) -> None:
- """Handle SYNC command for given player.
+ async def cmd_group(self, player_id: str, target_player: str) -> None:
+ """Handle GROUP command for given player.
Join/add the given player(id) to the given (master) player/sync group.
# will only be called for players with SYNC feature set.
raise NotImplementedError
- async def cmd_unsync(self, player_id: str) -> None:
- """Handle UNSYNC command for given player.
+ async def cmd_ungroup(self, player_id: str) -> None:
+ """Handle UNGROUP command for given player.
- Remove the given player from any syncgroups it currently is synced to.
+ Remove the given player from any (sync)groups it currently is grouped to.
- player_id: player_id of the player to handle the command.
"""
# will only be called for players with SYNC feature set.
raise NotImplementedError
- async def cmd_sync_many(self, target_player: str, child_player_ids: list[str]) -> None:
+ async def cmd_group_many(self, target_player: str, child_player_ids: list[str]) -> None:
"""Create temporary sync group by joining given players to target player."""
for child_id in child_player_ids:
- # default implementation, simply call the cmd_sync for all child players
- await self.cmd_sync(child_id, target_player)
+ # default implementation, simply call the cmd_group for all child players
+ await self.cmd_group(child_id, target_player)
async def poll_player(self, player_id: str) -> None:
"""Poll player for state updates.
self.available = False
@property
- def supported_features(self) -> tuple[ProviderFeature, ...]:
+ def supported_features(self) -> set[ProviderFeature]:
"""Return the features supported by this Provider."""
return ()
"""
@property
- def supported_features(self) -> tuple[ProviderFeature, ...]:
+ def supported_features(self) -> set[ProviderFeature]:
"""Return the features supported by this Provider."""
# MANDATORY
# you should return a tuple of provider-level features
"""
@property
- def supported_features(self) -> tuple[ProviderFeature, ...]:
+ def supported_features(self) -> set[ProviderFeature]:
"""Return the features supported by this Provider."""
# MANDATORY
# you should return a tuple of provider-level features
device_info=DeviceInfo(
model="Model XYX",
manufacturer="Super Brand",
- address=cur_address,
+ ip_address=cur_address,
),
# set the supported features for this player only with
# the ones the player actually supports
- supported_features=(
+ supported_features={
PlayerFeature.POWER, # if the player can be turned on/off
PlayerFeature.VOLUME_SET,
PlayerFeature.VOLUME_MUTE,
PlayerFeature.PLAY_ANNOUNCEMENT, # see play_announcement method
- ),
+ },
)
# register the player with the player manager
await self.mass.players.register(mass_player)
"""
# this method should handle the enqueuing of the next queue item on the player.
- async def cmd_sync(self, player_id: str, target_player: str) -> None:
- """Handle SYNC command for given player.
+ async def cmd_group(self, player_id: str, target_player: str) -> None:
+ """Handle GROUP command for given player.
Join/add the given player(id) to the given (master) player/sync group.
# this method should handle the sync command for the given player.
# you should join the given player to the target_player/syncgroup.
- async def cmd_unsync(self, player_id: str) -> None:
- """Handle UNSYNC command for given player.
+ async def cmd_ungroup(self, player_id: str) -> None:
+ """Handle UNGROUP command for given player.
- Remove the given player from any syncgroups it currently is synced to.
+ Remove the given player from any (sync)groups it currently is grouped to.
- player_id: player_id of the player to handle the command.
"""
_play_media_lock: asyncio.Lock = asyncio.Lock()
@property
- def supported_features(self) -> tuple[ProviderFeature, ...]:
+ def supported_features(self) -> set[ProviderFeature]:
"""Return the features supported by this Provider."""
return (ProviderFeature.SYNC_PLAYERS,)
mass_player.device_info = DeviceInfo(
model=mass_player.device_info.model,
manufacturer=mass_player.device_info.manufacturer,
- address=str(cur_address),
+ ip_address=str(cur_address),
)
if not mass_player.available:
self.logger.debug("Player back online: %s", display_name)
await self.mass.cache.set(player_id, volume_level, base_key=CACHE_KEY_PREV_VOLUME)
@lock
- async def cmd_sync(self, player_id: str, target_player: str) -> None:
- """Handle SYNC command for given player.
+ async def cmd_group(self, player_id: str, target_player: str) -> None:
+ """Handle GROUP command for given player.
Join/add the given player(id) to the given (master) player/sync group.
if airplay_player.raop_stream and airplay_player.raop_stream.running:
await airplay_player.raop_stream.session.remove_client(airplay_player)
# always make sure that the parent player is part of the sync group
- parent_player.group_childs.add(parent_player.player_id)
- parent_player.group_childs.add(child_player.player_id)
+ parent_player.group_childs.append(parent_player.player_id)
+ parent_player.group_childs.append(child_player.player_id)
child_player.synced_to = parent_player.player_id
# mark players as powered
parent_player.powered = True
self.mass.players.update(parent_player.player_id, skip_forward=True)
@lock
- async def cmd_unsync(self, player_id: str) -> None:
- """Handle UNSYNC command for given player.
+ async def cmd_ungroup(self, player_id: str) -> None:
+ """Handle UNGROUP command for given player.
- Remove the given player from any syncgroups it currently is synced to.
+ Remove the given player from any (sync)groups it currently is grouped to.
- player_id: player_id of the player to handle the command.
"""
device_info=DeviceInfo(
model=model,
manufacturer=manufacturer,
- address=address,
+ ip_address=address,
),
- supported_features=(
+ supported_features={
PlayerFeature.PAUSE,
- PlayerFeature.SYNC,
+ PlayerFeature.SET_MEMBERS,
PlayerFeature.VOLUME_SET,
- ),
+ },
volume_level=volume,
+ can_group_with={self.instance_id},
)
await self.mass.players.register_or_update(mass_player)
from music_assistant.models import ProviderInstanceType
-SUPPORTED_FEATURES = (
+SUPPORTED_FEATURES = {
ProviderFeature.LIBRARY_ARTISTS,
ProviderFeature.LIBRARY_ALBUMS,
ProviderFeature.LIBRARY_TRACKS,
ProviderFeature.ARTIST_ALBUMS,
ProviderFeature.ARTIST_TOPTRACKS,
ProviderFeature.SIMILAR_TRACKS,
-)
+}
DEVELOPER_TOKEN = app_var(8)
WIDEVINE_BASE_PATH = "/usr/local/bin/widevine_cdm"
self._decrypt_private_key = await _file.read()
@property
- def supported_features(self) -> tuple[ProviderFeature, ...]:
+ def supported_features(self) -> set[ProviderFeature]:
"""Return the features supported by this Provider."""
return SUPPORTED_FEATURES
PLAYER_FEATURES_BASE = {
- PlayerFeature.SYNC,
+ PlayerFeature.SET_MEMBERS,
PlayerFeature.VOLUME_MUTE,
PlayerFeature.PAUSE,
}
if self.sync_status.leader is None:
if self.sync_status.followers:
- self.mass_player.group_childs = (
- self.sync_status.followers if len(self.sync_status.followers) > 1 else set()
- )
+ if len(self.sync_status.followers) > 1:
+ self.mass_player.group_childs.set(self.sync_status.followers)
+ else:
+ self.mass_player.group_childs.clear()
self.mass_player.synced_to = None
if self.status.state == "stream":
self.mass_player.current_media = None
else:
- self.mass_player.group_childs = set()
+ self.mass_player.group_childs.clear()
self.mass_player.synced_to = self.sync_status.leader
self.mass_player.active_source = self.sync_status.leader
bluos_players: dict[str, BluesoundPlayer]
@property
- def supported_features(self) -> tuple[ProviderFeature, ...]:
+ def supported_features(self) -> set[ProviderFeature]:
"""Return the features supported by this Provider."""
- return (ProviderFeature.SYNC_PLAYERS,)
+ return {ProviderFeature.SYNC_PLAYERS}
async def handle_async_init(self) -> None:
"""Handle async initialization of the provider."""
mass_player.device_info = DeviceInfo(
model=mass_player.device_info.model,
manufacturer=mass_player.device_info.manufacturer,
- address=str(cur_address),
+ ip_address=str(cur_address),
)
if not mass_player.available:
self.logger.debug("Player back online: %s", mass_player.display_name)
device_info=DeviceInfo(
model="BluOS speaker",
manufacturer="Bluesound",
- address=cur_address,
+ ip_address=cur_address,
),
# Set the supported features for this player
- supported_features=(
+ supported_features={
PlayerFeature.VOLUME_SET,
PlayerFeature.VOLUME_MUTE,
PlayerFeature.PAUSE,
- ),
+ },
needs_poll=True,
poll_interval=30,
+ can_group_with={self.instance_id},
)
await self.mass.players.register(mass_player)
if bluos_player := self.bluos_players[player_id]:
await bluos_player.update_attributes()
- # TODO fix sync & unsync
+ # TODO fix sync & ungroup
- async def cmd_sync(self, player_id: str, target_player: str) -> None:
+ async def cmd_group(self, player_id: str, target_player: str) -> None:
"""Handle SYNC command for BluOS player."""
- async def cmd_unsync(self, player_id: str) -> None:
- """Handle UNSYNC command for BluOS player."""
+ async def cmd_ungroup(self, player_id: str) -> None:
+ """Handle UNGROUP command for BluOS player."""
if bluos_player := self.bluos_players[player_id]:
await bluos_player.client.player.leave_group()
return False
@property
- def supported_features(self) -> tuple[ProviderFeature, ...]:
+ def supported_features(self) -> set[ProviderFeature]:
"""Return the features supported by this Provider."""
- return (
+ return {
ProviderFeature.BROWSE,
ProviderFeature.LIBRARY_TRACKS,
ProviderFeature.LIBRARY_RADIOS,
ProviderFeature.LIBRARY_RADIOS_EDIT,
ProviderFeature.PLAYLIST_CREATE,
ProviderFeature.PLAYLIST_TRACKS_EDIT,
- )
+ }
async def get_track(self, prov_track_id: str) -> Track:
"""Get full track details by id."""
powered=False,
device_info=DeviceInfo(
model=cast_info.model_name,
- address=f"{cast_info.host}:{cast_info.port}",
+ ip_address=f"{cast_info.host}:{cast_info.port}",
manufacturer=cast_info.manufacturer,
),
- supported_features=(
+ supported_features={
PlayerFeature.POWER,
PlayerFeature.VOLUME_MUTE,
PlayerFeature.VOLUME_SET,
PlayerFeature.PAUSE,
PlayerFeature.NEXT_PREVIOUS,
PlayerFeature.ENQUEUE,
- ),
+ },
enabled_by_default=enabled_by_default,
needs_poll=True,
),
# handle stereo pairs
if castplayer.cast_info.is_multichannel_group:
castplayer.player.type = PlayerType.STEREO_PAIR
- castplayer.player.group_childs = set()
+ castplayer.player.group_childs.clear()
# handle cast groups
if castplayer.cast_info.is_audio_group and not castplayer.cast_info.is_multichannel_group:
castplayer.player.type = PlayerType.GROUP
- castplayer.player.group_childs = {
+ castplayer.player.group_childs.set(
str(UUID(x)) for x in castplayer.mz_controller.members
- }
- castplayer.player.supported_features = (
+ )
+ castplayer.player.supported_features = {
PlayerFeature.POWER,
PlayerFeature.VOLUME_SET,
PlayerFeature.PAUSE,
PlayerFeature.ENQUEUE,
- )
+ }
# update player status
castplayer.player.name = castplayer.cast_info.friendly_name
castplayer.player.available = new_available
castplayer.player.device_info = DeviceInfo(
model=castplayer.cast_info.model_name,
- address=f"{castplayer.cast_info.host}:{castplayer.cast_info.port}",
+ ip_address=f"{castplayer.cast_info.host}:{castplayer.cast_info.port}",
manufacturer=castplayer.cast_info.manufacturer,
)
self.mass.loop.call_soon_threadsafe(self.mass.players.update, castplayer.player_id)
from .gw_client import GWClient
-SUPPORTED_FEATURES = (
+SUPPORTED_FEATURES = {
ProviderFeature.LIBRARY_ARTISTS,
ProviderFeature.LIBRARY_ALBUMS,
ProviderFeature.LIBRARY_TRACKS,
ProviderFeature.PLAYLIST_CREATE,
ProviderFeature.RECOMMENDATIONS,
ProviderFeature.SIMILAR_TRACKS,
-)
+}
@dataclass
await self.gw_client.setup()
@property
- def supported_features(self) -> tuple[ProviderFeature, ...]:
+ def supported_features(self) -> set[ProviderFeature]:
"""Return the features supported by this Provider."""
return SUPPORTED_FEATURES
# device info will be discovered later after connect
device_info=DeviceInfo(
model="unknown",
- address=description_url,
+ ip_address=description_url,
manufacturer="unknown",
),
needs_poll=True,
# connect was successful, update device info
dlna_player.player.device_info = DeviceInfo(
model=dlna_player.device.model_name,
- address=dlna_player.device.device.presentation_url
+ ip_address=dlna_player.device.device.presentation_url
or dlna_player.description_url,
manufacturer=dlna_player.device.manufacturer,
)
supported_features.add(PlayerFeature.VOLUME_MUTE)
if dlna_player.device.has_pause:
supported_features.add(PlayerFeature.PAUSE)
- dlna_player.player.supported_features = tuple(supported_features)
+ dlna_player.player.supported_features = supported_features
from music_assistant import MusicAssistant
from music_assistant.models import ProviderInstanceType
-SUPPORTED_FEATURES = (
+SUPPORTED_FEATURES = {
ProviderFeature.ARTIST_METADATA,
ProviderFeature.ALBUM_METADATA,
-)
+}
CONF_ENABLE_ARTIST_IMAGES = "enable_artist_images"
CONF_ENABLE_ALBUM_IMAGES = "enable_album_images"
self.throttler = Throttler(rate_limit=1, period=30)
@property
- def supported_features(self) -> tuple[ProviderFeature, ...]:
+ def supported_features(self) -> set[ProviderFeature]:
"""Return the features supported by this Provider."""
return SUPPORTED_FEATURES
SEEKABLE_FILES = (ContentType.MP3, ContentType.WAV, ContentType.FLAC)
-SUPPORTED_FEATURES = (
+SUPPORTED_FEATURES = {
ProviderFeature.LIBRARY_ARTISTS,
ProviderFeature.LIBRARY_ALBUMS,
ProviderFeature.LIBRARY_TRACKS,
ProviderFeature.LIBRARY_PLAYLISTS,
ProviderFeature.BROWSE,
ProviderFeature.SEARCH,
-)
+}
listdir = wrap(os.listdir)
isdir = wrap(os.path.isdir)
scan_limiter = asyncio.Semaphore(25)
@property
- def supported_features(self) -> tuple[ProviderFeature, ...]:
+ def supported_features(self) -> set[ProviderFeature]:
"""Return the features supported by this Provider."""
if self.write_access:
- return (
+ return {
*SUPPORTED_FEATURES,
ProviderFeature.PLAYLIST_CREATE,
ProviderFeature.PLAYLIST_TRACKS_EDIT,
- )
+ }
return SUPPORTED_FEATURES
@property
device_info=DeviceInfo(
model=self._fully.deviceInfo["deviceModel"],
manufacturer=self._fully.deviceInfo["deviceManufacturer"],
- address=address,
+ ip_address=address,
),
- supported_features=(PlayerFeature.VOLUME_SET,),
+ supported_features={PlayerFeature.VOLUME_SET},
needs_poll=True,
poll_interval=10,
)
target={"entity_id": player_id},
)
- async def cmd_sync(self, player_id: str, target_player: str) -> None:
- """Handle SYNC command for given player.
+ async def cmd_group(self, player_id: str, target_player: str) -> None:
+ """Handle GROUP command for given player.
Join/add the given player(id) to the given (master) player/sync group.
target={"entity_id": target_player},
)
- async def cmd_unsync(self, player_id: str) -> None:
- """Handle UNSYNC command for given player.
+ async def cmd_ungroup(self, player_id: str) -> None:
+ """Handle UNGROUP command for given player.
- Remove the given player from any syncgroups it currently is synced to.
+ Remove the given player from any (sync)groups it currently is grouped to.
- player_id: player_id of the player to handle the command.
"""
hass_supported_features = MediaPlayerEntityFeature(
state["attributes"]["supported_features"]
)
- supported_features: list[PlayerFeature] = []
+ supported_features: set[PlayerFeature] = set()
if MediaPlayerEntityFeature.PAUSE in hass_supported_features:
- supported_features.append(PlayerFeature.PAUSE)
+ supported_features.add(PlayerFeature.PAUSE)
if MediaPlayerEntityFeature.VOLUME_SET in hass_supported_features:
- supported_features.append(PlayerFeature.VOLUME_SET)
+ supported_features.add(PlayerFeature.VOLUME_SET)
if MediaPlayerEntityFeature.VOLUME_MUTE in hass_supported_features:
- supported_features.append(PlayerFeature.VOLUME_MUTE)
+ supported_features.add(PlayerFeature.VOLUME_MUTE)
if MediaPlayerEntityFeature.MEDIA_ENQUEUE in hass_supported_features:
- supported_features.append(PlayerFeature.ENQUEUE)
+ supported_features.add(PlayerFeature.ENQUEUE)
if (
MediaPlayerEntityFeature.TURN_ON in hass_supported_features
and MediaPlayerEntityFeature.TURN_OFF in hass_supported_features
):
- supported_features.append(PlayerFeature.POWER)
+ supported_features.add(PlayerFeature.POWER)
player = Player(
player_id=state["entity_id"],
provider=self.instance_id,
hass_device["manufacturer"] if hass_device else "Unknown Manufacturer"
),
),
- supported_features=tuple(supported_features),
+ supported_features=supported_features,
state=StateMap.get(state["state"], PlayerState.IDLE),
)
self._update_player_attributes(player, state["attributes"])
player.current_item_id = value
if key == "group_members":
if value and value[0] == player.player_id:
- player.group_childs = value
+ player.group_childs.set(value)
player.synced_to = None
elif value and value[0] != player.player_id:
- player.group_childs = set()
+ player.group_childs.clear()
player.synced_to = value[0]
else:
- player.group_childs = set()
+ player.group_childs.clear()
player.synced_to = None
async def _late_add_player(self, entity_id: str) -> None:
raise LoginFailed(f"Authentication failed: {err}") from err
@property
- def supported_features(self) -> tuple[ProviderFeature, ...]:
+ def supported_features(self) -> set[ProviderFeature]:
"""Return a list of supported features."""
- return (
+ return {
ProviderFeature.LIBRARY_ARTISTS,
ProviderFeature.LIBRARY_ALBUMS,
ProviderFeature.LIBRARY_TRACKS,
ProviderFeature.SEARCH,
ProviderFeature.ARTIST_ALBUMS,
ProviderFeature.SIMILAR_TRACKS,
- )
+ }
@property
def is_streaming_provider(self) -> bool:
LUCENE_SPECIAL = r'([+\-&|!(){}\[\]\^"~*?:\\\/])'
-SUPPORTED_FEATURES = ()
+SUPPORTED_FEATURES = set()
async def setup(
self.cache = self.mass.cache
@property
- def supported_features(self) -> tuple[ProviderFeature, ...]:
+ def supported_features(self) -> set[ProviderFeature]:
"""Return the features supported by this Provider."""
return SUPPORTED_FEATURES
self.logger.info("Server does not support transcodeOffset, seeking in player provider")
@property
- def supported_features(self) -> tuple[ProviderFeature, ...]:
+ def supported_features(self) -> set[ProviderFeature]:
"""Return a list of supported features."""
- return (
+ return {
ProviderFeature.LIBRARY_ARTISTS,
ProviderFeature.LIBRARY_ALBUMS,
ProviderFeature.LIBRARY_TRACKS,
ProviderFeature.SIMILAR_TRACKS,
ProviderFeature.PLAYLIST_TRACKS_EDIT,
ProviderFeature.PLAYLIST_CREATE,
- )
+ }
@property
def is_streaming_provider(self) -> bool:
ProviderFeature,
)
from music_assistant_models.errors import (
+ InvalidDataError,
PlayerUnavailableError,
ProviderUnavailableError,
UnsupportedFeaturedException,
default_value=[],
description="Select all players you want to be part of this group",
multi_value=True,
- required=True,
+ required=False, # otherwise dynamic members won't work (which allows empty members list)
)
CONF_ENTRY_SAMPLE_RATES_UGP = create_sample_rates_config_entry(44100, 16, 44100, 16, True)
CONFIG_ENTRY_UGP_NOTE = ConfigEntry(
CONFIG_ENTRY_DYNAMIC_MEMBERS = ConfigEntry(
key="dynamic_members",
type=ConfigEntryType.BOOLEAN,
- label="Enable dynamic members (experimental)",
+ label="Enable dynamic members",
description="Allow members to (temporary) join/leave the group dynamically, "
"so the group more or less behaves the same like manually syncing players together, "
- "with the main difference being that the groupplayer will hold the queue. \n\n"
- "NOTE: This is an experimental feature which we are testing out. "
- "You may run into some unexpected behavior!",
+ "with the main difference being that the groupplayer will hold the queue.",
default_value=False,
required=False,
)
]
@property
- def supported_features(self) -> tuple[ProviderFeature, ...]:
+ def supported_features(self) -> set[ProviderFeature]:
"""Return the features supported by this Provider."""
- return (ProviderFeature.REMOVE_PLAYER,)
+ return {ProviderFeature.REMOVE_PLAYER}
async def loaded_in_mass(self) -> None:
"""Call after the provider has been loaded."""
# ensure we filter invalid members
members = self._filter_members(config.get_value(CONF_GROUP_TYPE), members)
if group_player := self.mass.players.get(config.player_id):
- group_player.group_childs = members
+ group_player.group_childs.set(members)
if group_player.powered:
# power on group player (which will also resync) if needed
await self.cmd_power(group_player.player_id, True)
if f"values/{CONFIG_ENTRY_DYNAMIC_MEMBERS.key}" in changed_keys:
# dynamic members feature changed
if group_player := self.mass.players.get(config.player_id):
- if PlayerFeature.SYNC in group_player.supported_features:
- group_player.supported_features = tuple(
- x for x in group_player.supported_features if x != PlayerFeature.SYNC
- )
+ if PlayerFeature.SET_MEMBERS in group_player.supported_features:
+ group_player.supported_features.remove(PlayerFeature.SET_MEMBERS)
else:
- group_player.supported_features = (
- *group_player.supported_features,
- PlayerFeature.SYNC,
- )
+ group_player.supported_features.add(PlayerFeature.SET_MEMBERS)
+ if not members and not config.get_value(CONFIG_ENTRY_DYNAMIC_MEMBERS.key):
+ raise InvalidDataError("Group player must have at least one member")
await super().on_player_config_change(config, changed_keys)
async def cmd_stop(self, player_id: str) -> None:
group_member_ids = self.mass.config.get_raw_player_config_value(
player_id, CONF_GROUP_MEMBERS, []
)
- group_player.group_childs = {
+ group_player.group_childs.set(
x
for x in group_member_ids
if (child_player := self.mass.players.get(x))
and child_player.available
and child_player.enabled
- }
+ )
if powered:
# handle TURN_ON of the group player by turning on all members
else:
# handle TURN_OFF of the group player by turning off all members
# optimistically set the group state to prevent race conditions
- # with the unsync command
+ # with the ungroup command
group_player.powered = False
for member in self.mass.players.iter_group_members(
group_player, only_powered=True, active_only=True
self.mass.players.update(group_player.player_id)
if not powered:
# reset the group members when powered off
- group_player.group_childs = set(
+ group_player.group_childs.set(
self.mass.config.get_raw_player_config_value(player_id, CONF_GROUP_MEMBERS, [])
)
if group_player := self.mass.players.get(player_id):
self._update_attributes(group_player)
- async def create_group(self, group_type: str, name: str, members: list[str]) -> Player:
+ async def create_group(
+ self, group_type: str, name: str, members: list[str], dynamic: bool = False
+ ) -> Player:
"""Create new Group Player."""
# perform basic checks
if group_type == GROUP_TYPE_UNIVERSAL:
if ProviderFeature.SYNC_PLAYERS not in player_prov.supported_features:
msg = f"Provider {player_prov.name} does not support creating groups"
raise UnsupportedFeaturedException(msg)
+ group_type = player_prov.instance_id # just in case only domain was sent
new_group_id = f"{prefix}{shortuuid.random(8).lower()}"
# cleanup list, just in case the frontend sends some garbage
self.instance_id,
name=name,
enabled=True,
- values={CONF_GROUP_MEMBERS: members, CONF_GROUP_TYPE: group_type},
+ values={
+ CONF_GROUP_MEMBERS: members,
+ CONF_GROUP_TYPE: group_type,
+ CONFIG_ENTRY_DYNAMIC_MEMBERS.key: dynamic,
+ },
)
return await self._register_group_player(
group_player_id=new_group_id, group_type=group_type, name=name, members=members
return
if group_player.powered:
# edge case: the group player is powered and being removed
- # make sure to turn it off first (which will also unsync a syncgroup)
+ # make sure to turn it off first (which will also ungroup a syncgroup)
await self.cmd_power(player_id, False)
- async def cmd_sync(self, player_id: str, target_player: str) -> None:
- """Handle SYNC command for given player.
+ async def cmd_group(self, player_id: str, target_player: str) -> None:
+ """Handle GROUP command for given player.
Join/add the given player(id) to the given (master) player/sync group.
f"Adjusting group members is not allowed for group {group_player.display_name}"
)
new_members = self._filter_members(group_type, [*group_player.group_childs, player_id])
- group_player.group_childs = new_members
+ group_player.group_childs.set(new_members)
if group_player.powered:
# power on group player (which will also resync) if needed
await self.cmd_power(target_player, True)
- async def cmd_unsync_member(self, player_id: str, target_player: str) -> None:
- """Handle UNSYNC command for given player.
+ async def cmd_ungroup_member(self, player_id: str, target_player: str) -> None:
+ """Handle UNGROUP command for given player.
Remove the given player(id) from the given (master) player/sync group.
- - player_id: player_id of the (child) player to unsync from the group.
+ - player_id: player_id of the (child) player to ungroup from the group.
- target_player: player_id of the group player.
"""
group_player = self.mass.players.get(target_player, raise_unavailable=True)
CONFIG_ENTRY_DYNAMIC_MEMBERS.key,
CONFIG_ENTRY_DYNAMIC_MEMBERS.default_value,
)
- if group_player.powered and not dynamic_members_enabled:
+ if not dynamic_members_enabled:
raise UnsupportedFeaturedException(
f"Adjusting group members is not allowed for group {group_player.display_name}"
)
was_playing = child_player.state == PlayerState.PLAYING
# forward command to the player provider
if player_provider := self.mass.players.get_player_provider(child_player.player_id):
- await player_provider.cmd_unsync(child_player.player_id)
+ await player_provider.cmd_ungroup(child_player.player_id)
child_player.active_group = None
child_player.active_source = None
- group_player.group_childs = {x for x in group_player.group_childs if x != player_id}
+ group_player.group_childs.set({x for x in group_player.group_childs if x != player_id})
if is_sync_leader and was_playing:
- # unsyncing the sync leader will stop the group so we need to resume
+ # ungrouping the sync leader will stop the group so we need to resume
self.mass.call_later(2, self.mass.players.cmd_play, group_player.player_id)
elif group_player.powered:
# power on group player (which will also resync) if needed
self._on_unload.append(
self.mass.streams.register_dynamic_route(route_path, self._serve_ugp_stream)
)
+ can_group_with = {
+ # allow grouping with all providers, except the playergroup provider itself
+ x.instance_id
+ for x in self.mass.players.providers
+ if x.instance_id != self.instance_id
+ }
elif player_provider := self.mass.get_provider(group_type):
# grab additional details from one of the provider's players
if TYPE_CHECKING:
player_provider = cast(PlayerProvider, player_provider)
model_name = "Sync Group"
manufacturer = self.mass.get_provider(group_type).name
+ can_group_with = {player_provider.instance_id}
for feature in (PlayerFeature.PAUSE, PlayerFeature.VOLUME_MUTE, PlayerFeature.ENQUEUE):
if all(feature in x.supported_features for x in player_provider.players):
player_features.add(feature)
CONFIG_ENTRY_DYNAMIC_MEMBERS.key,
CONFIG_ENTRY_DYNAMIC_MEMBERS.default_value,
):
- player_features.add(PlayerFeature.SYNC)
+ player_features.add(PlayerFeature.SET_MEMBERS)
player = Player(
player_id=group_player_id,
available=True,
powered=False,
device_info=DeviceInfo(model=model_name, manufacturer=manufacturer),
- supported_features=tuple(player_features),
- group_childs=set(members),
+ supported_features=player_features,
active_source=group_player_id,
needs_poll=True,
poll_interval=30,
+ can_group_with=can_group_with,
)
await self.mass.players.register_or_update(player)
members_to_sync: list[str] = []
for member in self.mass.players.iter_group_members(group_player, active_only=False):
if member.synced_to and member.synced_to != sync_leader.player_id:
- # unsync first
- await self.mass.players.cmd_unsync(member.player_id)
+ # ungroup first
+ await self.mass.players.cmd_ungroup(member.player_id)
if sync_leader.player_id == member.player_id:
# skip sync leader
continue
continue
members_to_sync.append(member.player_id)
if members_to_sync:
- await self.mass.players.cmd_sync_many(sync_leader.player_id, members_to_sync)
+ await self.mass.players.cmd_group_many(sync_leader.player_id, members_to_sync)
async def _on_mass_player_added_event(self, event: MassEvent) -> None:
"""Handle player added event from player controller."""
AudioFormat,
ItemMapping,
MediaItem,
- MediaItemChapter,
MediaItemImage,
Playlist,
ProviderMapping,
raise SetupFailedError from err
@property
- def supported_features(self) -> tuple[ProviderFeature, ...]:
+ def supported_features(self) -> set[ProviderFeature]:
"""Return a list of supported features."""
- return (
+ return {
ProviderFeature.LIBRARY_ARTISTS,
ProviderFeature.LIBRARY_ALBUMS,
ProviderFeature.LIBRARY_TRACKS,
ProviderFeature.BROWSE,
ProviderFeature.SEARCH,
ProviderFeature.ARTIST_ALBUMS,
- )
+ }
@property
def is_streaming_provider(self) -> bool:
if plex_track.duration:
track.duration = int(plex_track.duration / 1000)
if plex_track.chapters:
- track.metadata.chapters = UniqueList(
- [
- MediaItemChapter(
- chapter_id=plex_chapter.id,
- position_start=plex_chapter.start,
- position_end=plex_chapter.end,
- title=plex_chapter.title,
- )
- for plex_chapter in plex_track.chapters
- ]
- )
+ pass # TODO!
return track
from music_assistant.models import ProviderInstanceType
-SUPPORTED_FEATURES = (
+SUPPORTED_FEATURES = {
ProviderFeature.LIBRARY_ARTISTS,
ProviderFeature.LIBRARY_ALBUMS,
ProviderFeature.LIBRARY_TRACKS,
ProviderFeature.SEARCH,
ProviderFeature.ARTIST_ALBUMS,
ProviderFeature.ARTIST_TOPTRACKS,
-)
+}
VARIOUS_ARTISTS_ID = "145383"
raise LoginFailed(msg)
@property
- def supported_features(self) -> tuple[ProviderFeature, ...]:
+ def supported_features(self) -> set[ProviderFeature]:
"""Return the features supported by this Provider."""
return SUPPORTED_FEATURES
from music_assistant.controllers.cache import use_cache
from music_assistant.models.music_provider import MusicProvider
-SUPPORTED_FEATURES = (
+SUPPORTED_FEATURES = {
ProviderFeature.SEARCH,
ProviderFeature.BROWSE,
# RadioBrowser doesn't support a library feature at all
# have that included in backups so we store it in the config.
ProviderFeature.LIBRARY_RADIOS,
ProviderFeature.LIBRARY_RADIOS_EDIT,
-)
+}
if TYPE_CHECKING:
from music_assistant_models.config_entries import ConfigValueType, ProviderConfig
"""Provider implementation for RadioBrowser."""
@property
- def supported_features(self) -> tuple[ProviderFeature, ...]:
+ def supported_features(self) -> set[ProviderFeature]:
"""Return the features supported by this Provider."""
return SUPPORTED_FEATURES
_current_stream_details: StreamDetails | None = None
@property
- def supported_features(self) -> tuple[ProviderFeature, ...]:
+ def supported_features(self) -> set[ProviderFeature]:
"""Return the features supported by this Provider."""
- return (
+ return {
ProviderFeature.BROWSE,
ProviderFeature.LIBRARY_RADIOS,
- )
+ }
async def handle_async_init(self) -> None:
"""Handle async initialization of the provider."""
_multi_streams: dict[str, MultiClientStream]
@property
- def supported_features(self) -> tuple[ProviderFeature, ...]:
+ def supported_features(self) -> set[ProviderFeature]:
"""Return the features supported by this Provider."""
- return (ProviderFeature.SYNC_PLAYERS,)
+ return {ProviderFeature.SYNC_PLAYERS}
async def handle_async_init(self) -> None:
"""Handle async initialization of the provider."""
if slimplayer := self.slimproto.get_player(player_id):
await slimplayer.mute(muted)
- async def cmd_sync(self, player_id: str, target_player: str) -> None:
- """Handle SYNC command for given player."""
+ async def cmd_group(self, player_id: str, target_player: str) -> None:
+ """Handle GROUP command for given player."""
child_player = self.mass.players.get(player_id)
assert child_player # guard
parent_player = self.mass.players.get(target_player)
if child_player.synced_to and child_player.synced_to != target_player:
raise RuntimeError("Player is already synced to another player")
# always make sure that the parent player is part of the sync group
- parent_player.group_childs.add(parent_player.player_id)
- parent_player.group_childs.add(child_player.player_id)
+ parent_player.group_childs.append(parent_player.player_id)
+ parent_player.group_childs.append(child_player.player_id)
child_player.synced_to = parent_player.player_id
# check if we should (re)start or join a stream session
# TODO: support late joining of a client into an existing stream session
self.mass.players.update(child_player.player_id, skip_forward=True)
self.mass.players.update(parent_player.player_id, skip_forward=True)
- async def cmd_unsync(self, player_id: str) -> None:
- """Handle UNSYNC command for given player.
+ async def cmd_ungroup(self, player_id: str) -> None:
+ """Handle UNGROUP command for given player.
- Remove the given player from any syncgroups it currently is synced to.
+ Remove the given player from any (sync)groups it currently is grouped to.
- player_id: player_id of the player to handle the command.
"""
powered=slimplayer.powered,
device_info=DeviceInfo(
model=slimplayer.device_model,
- address=slimplayer.device_address,
+ ip_address=slimplayer.device_address,
manufacturer=slimplayer.device_type,
),
- supported_features=(
+ supported_features={
PlayerFeature.POWER,
- PlayerFeature.SYNC,
+ PlayerFeature.SET_MEMBERS,
PlayerFeature.VOLUME_SET,
PlayerFeature.PAUSE,
PlayerFeature.VOLUME_MUTE,
PlayerFeature.ENQUEUE,
- ),
+ },
+ can_group_with={self.instance_id},
)
await self.mass.players.register_or_update(player)
return self._get_ma_id(snap_client_id)
@property
- def supported_features(self) -> tuple[ProviderFeature, ...]:
+ def supported_features(self) -> set[ProviderFeature]:
"""Return the features supported by this Provider."""
- return (ProviderFeature.SYNC_PLAYERS,)
+ return {ProviderFeature.SYNC_PLAYERS}
async def handle_async_init(self) -> None:
"""Handle async initialization of the provider."""
powered=snap_client.connected,
device_info=DeviceInfo(
model=snap_client._client.get("host").get("os"),
- address=snap_client._client.get("host").get("ip"),
+ ip_address=snap_client._client.get("host").get("ip"),
manufacturer=snap_client._client.get("host").get("arch"),
),
- supported_features=(
- PlayerFeature.SYNC,
+ supported_features={
+ PlayerFeature.SET_MEMBERS,
PlayerFeature.VOLUME_SET,
PlayerFeature.VOLUME_MUTE,
- ),
- group_childs=set(),
+ },
synced_to=self._synced_to(player_id),
+ can_group_with={self.instance_id},
)
asyncio.run_coroutine_threadsafe(
self.mass.players.register_or_update(player), loop=self.mass.loop
ma_player.volume_muted = snapclient.muted
self.mass.players.update(player_id)
- async def cmd_sync(self, player_id: str, target_player: str) -> None:
+ async def cmd_group(self, player_id: str, target_player: str) -> None:
"""Sync Snapcast player."""
group = self._get_snapgroup(target_player)
mass_target_player = self.mass.players.get(target_player)
await group.add_client(self._get_snapclient_id(player_id))
mass_player = self.mass.players.get(player_id)
mass_player.synced_to = target_player
- mass_target_player.group_childs.add(player_id)
+ mass_target_player.group_childs.append(player_id)
self.mass.players.update(player_id)
self.mass.players.update(target_player)
- async def cmd_unsync(self, player_id: str) -> None:
- """Unsync Snapcast player."""
+ async def cmd_ungroup(self, player_id: str) -> None:
+ """Ungroup Snapcast player."""
mass_player = self.mass.players.get(player_id)
if mass_player.synced_to is None:
for mass_child_id in list(mass_player.group_childs):
if mass_child_id != player_id:
- await self.cmd_unsync(mass_child_id)
+ await self.cmd_ungroup(mass_child_id)
return
mass_sync_master_player = self.mass.players.get(mass_player.synced_to)
mass_sync_master_player.group_childs.remove(player_id)
mass_player.group_childs.clear()
if mass_player.synced_to is not None:
return
- mass_player.group_childs.add(player_id)
+ mass_player.group_childs.append(player_id)
{
- mass_player.group_childs.add(self._get_ma_id(snap_client_id))
+ mass_player.group_childs.append(self._get_ma_id(snap_client_id))
for snap_client_id in snap_group.clients
if self._get_ma_id(snap_client_id) != player_id
and self._snapserver.client(snap_client_id).connected
}
PLAYER_FEATURES_BASE = {
- PlayerFeature.SYNC,
+ PlayerFeature.SET_MEMBERS,
PlayerFeature.VOLUME_MUTE,
PlayerFeature.PAUSE,
PlayerFeature.ENQUEUE,
device_info=DeviceInfo(
model=self.discovery_info["device"]["modelDisplayName"],
manufacturer=self.prov.manifest.name,
- address=self.ip_address,
+ ip_address=self.ip_address,
),
- supported_features=tuple(supported_features),
+ supported_features=supported_features,
+ # NOTE: strictly taken we can have multiple sonos households
+ # but for now we assume we only have one
+ can_group_with={self.prov.instance_id},
)
self.update_attributes()
await self.mass.players.register_or_update(mass_player)
if self.client.player.is_coordinator:
# player is group coordinator
active_group = self.client.player.group
- self.mass_player.group_childs = (
- set(self.client.player.group_members)
- if len(self.client.player.group_members) > 1
- else set()
- )
+ if len(self.client.player.group_members) > 1:
+ self.mass_player.group_childs.set(self.client.player.group_members)
+ else:
+ self.mass_player.group_childs.clear()
self.mass_player.synced_to = None
else:
# player is group child (synced to another player)
# handle race condition where the group parent is not yet discovered
return
active_group = group_parent.client.player.group
- self.mass_player.group_childs = set()
+ self.mass_player.group_childs.clear()
self.mass_player.synced_to = active_group.coordinator_id
self.mass_player.active_source = active_group.coordinator_id
sonos_players: dict[str, SonosPlayer]
@property
- def supported_features(self) -> tuple[ProviderFeature, ...]:
+ def supported_features(self) -> set[ProviderFeature]:
"""Return the features supported by this Provider."""
- return (ProviderFeature.SYNC_PLAYERS,)
+ return {ProviderFeature.SYNC_PLAYERS}
async def handle_async_init(self) -> None:
"""Handle async initialization of the provider."""
mass_player.device_info = DeviceInfo(
model=mass_player.device_info.model,
manufacturer=mass_player.device_info.manufacturer,
- address=str(cur_address),
+ ip_address=str(cur_address),
)
if not sonos_player.connected:
self.logger.debug("Player back online: %s", mass_player.display_name)
if sonos_player := self.sonos_players[player_id]:
await sonos_player.cmd_volume_mute(muted)
- async def cmd_sync(self, player_id: str, target_player: str) -> None:
- """Handle SYNC command for given player.
+ async def cmd_group(self, player_id: str, target_player: str) -> None:
+ """Handle GROUP command for given player.
Join/add the given player(id) to the given (master) player/sync group.
- player_id: player_id of the player to handle the command.
- target_player: player_id of the syncgroup master or group player.
"""
- await self.cmd_sync_many(target_player, [player_id])
+ await self.cmd_group_many(target_player, [player_id])
- async def cmd_sync_many(self, target_player: str, child_player_ids: list[str]) -> None:
+ async def cmd_group_many(self, target_player: str, child_player_ids: list[str]) -> None:
"""Create temporary sync group by joining given players to target player."""
sonos_player = self.sonos_players[target_player]
await sonos_player.client.player.group.modify_group_members(
player_ids_to_add=child_player_ids, player_ids_to_remove=[]
)
- async def cmd_unsync(self, player_id: str) -> None:
- """Handle UNSYNC command for given player.
+ async def cmd_ungroup(self, player_id: str) -> None:
+ """Handle UNGROUP command for given player.
- Remove the given player from any syncgroups it currently is synced to.
+ Remove the given player from any (sync)groups it currently is grouped to.
- player_id: player_id of the player to handle the command.
"""
x for x in sonos_player.client.player.group.player_ids if x != player_id
]
if group_childs:
- await self.mass.players.cmd_unsync_many(group_childs)
+ await self.mass.players.cmd_ungroup_many(group_childs)
await self.mass.players.play_media(airplay.player_id, media)
if group_childs:
# ensure master player is first in the list
from music_assistant.models import ProviderInstanceType
-PLAYER_FEATURES = (
- PlayerFeature.SYNC,
+PLAYER_FEATURES = {
+ PlayerFeature.SET_MEMBERS,
PlayerFeature.VOLUME_MUTE,
PlayerFeature.PAUSE,
PlayerFeature.ENQUEUE,
-)
+}
CONF_NETWORK_SCAN = "network_scan"
CONF_HOUSEHOLD_ID = "household_id"
_discovery_reschedule_timer: asyncio.TimerHandle | None = None
@property
- def supported_features(self) -> tuple[ProviderFeature, ...]:
+ def supported_features(self) -> set[ProviderFeature]:
"""Return the features supported by this Provider."""
- return (ProviderFeature.SYNC_PLAYERS,)
+ return {ProviderFeature.SYNC_PLAYERS}
async def handle_async_init(self) -> None:
"""Handle async initialization of the provider."""
await asyncio.to_thread(set_volume_mute, player_id, muted)
- async def cmd_sync(self, player_id: str, target_player: str) -> None:
- """Handle SYNC command for given player.
+ async def cmd_group(self, player_id: str, target_player: str) -> None:
+ """Handle GROUP command for given player.
Join/add the given player(id) to the given (master) player/sync group.
await sonos_master_player.join([sonos_player])
self.mass.call_later(2, sonos_player.poll_speaker)
- async def cmd_unsync(self, player_id: str) -> None:
- """Handle UNSYNC command for given player.
+ async def cmd_ungroup(self, player_id: str) -> None:
+ """Handle UNGROUP command for given player.
- Remove the given player from any syncgroups it currently is synced to.
+ Remove the given player from any (sync)groups it currently is grouped to.
- player_id: player_id of the player to handle the command.
"""
supported_features=PLAYER_FEATURES,
device_info=DeviceInfo(
model=speaker_info["model_name"],
- address=soco.ip_address,
+ ip_address=soco.ip_address,
manufacturer="SONOS",
),
needs_poll=True,
poll_interval=30,
+ can_group_with={self.instance_id},
)
self.sonosplayers[player_id] = sonos_player = SonosPlayer(
self,
mass_player=mass_player,
)
if not soco.fixed_volume:
- mass_player.supported_features = (
+ mass_player.supported_features = {
*mass_player.supported_features,
PlayerFeature.VOLUME_SET,
- )
+ }
asyncio.run_coroutine_threadsafe(
self.mass.players.register_or_update(sonos_player.mass_player), loop=self.mass.loop
)
LOGGER = logging.getLogger(__name__)
PLAYER_FEATURES = (
- PlayerFeature.SYNC,
+ PlayerFeature.SET_MEMBERS,
PlayerFeature.VOLUME_MUTE,
PlayerFeature.VOLUME_SET,
)
self.setup()
self.mass_player.device_info = DeviceInfo(
model=self.mass_player.device_info.model,
- address=ip_address,
+ ip_address=ip_address,
manufacturer=self.mass_player.device_info.manufacturer,
)
self.update_player()
self.mass_player.powered = False
self.mass_player.state = PlayerState.IDLE
self.mass_player.synced_to = None
- self.mass_player.group_childs = set()
+ self.mass_player.group_childs.clear()
return
# transport info (playback state)
if self.sync_coordinator:
# player is synced to another player
self.mass_player.synced_to = self.sync_coordinator.player_id
- self.mass_player.group_childs = set()
+ self.mass_player.group_childs.clear()
self.mass_player.active_source = self.sync_coordinator.mass_player.active_source
elif len(self.group_members_ids) > 1:
# this player is the sync leader in a group
self.mass_player.synced_to = None
- self.mass_player.group_childs = set(self.group_members_ids)
+ self.mass_player.group_childs.extend(self.group_members_ids)
else:
# standalone player, not synced
self.mass_player.synced_to = None
- self.mass_player.group_childs = set()
+ self.mass_player.group_childs.clear()
def _set_basic_track_info(self, update_position: bool = False) -> None:
"""Query the speaker to update media metadata and position info."""
CONF_CLIENT_ID = "client_id"
CONF_AUTHORIZATION = "authorization"
-SUPPORTED_FEATURES = (
+SUPPORTED_FEATURES = {
ProviderFeature.LIBRARY_ARTISTS,
ProviderFeature.LIBRARY_TRACKS,
ProviderFeature.LIBRARY_PLAYLISTS,
ProviderFeature.SEARCH,
ProviderFeature.ARTIST_TOPTRACKS,
ProviderFeature.SIMILAR_TRACKS,
-)
+}
if TYPE_CHECKING:
self._user_id = self._me["id"]
@property
- def supported_features(self) -> tuple[ProviderFeature, ...]:
+ def supported_features(self) -> set[ProviderFeature]:
"""Return the features supported by this Provider."""
return SUPPORTED_FEATURES
CALLBACK_REDIRECT_URL = "https://music-assistant.io/callback"
LIKED_SONGS_FAKE_PLAYLIST_ID_PREFIX = "liked_songs"
-SUPPORTED_FEATURES = (
+SUPPORTED_FEATURES = {
ProviderFeature.LIBRARY_ARTISTS,
ProviderFeature.LIBRARY_ALBUMS,
ProviderFeature.LIBRARY_TRACKS,
ProviderFeature.ARTIST_ALBUMS,
ProviderFeature.ARTIST_TOPTRACKS,
ProviderFeature.SIMILAR_TRACKS,
-)
+}
async def setup(
await self.login()
@property
- def supported_features(self) -> tuple[ProviderFeature, ...]:
+ def supported_features(self) -> set[ProviderFeature]:
"""Return the features supported by this Provider."""
- return (
+ return {
ProviderFeature.LIBRARY_ARTISTS,
ProviderFeature.LIBRARY_ALBUMS,
ProviderFeature.LIBRARY_TRACKS,
ProviderFeature.ARTIST_ALBUMS,
ProviderFeature.ARTIST_TOPTRACKS,
ProviderFeature.SIMILAR_TRACKS,
- )
+ }
@property
def name(self) -> str:
return False
@property
- def supported_features(self) -> tuple[ProviderFeature, ...]:
+ def supported_features(self) -> set[ProviderFeature]:
"""Return the features supported by this Provider."""
- return (ProviderFeature.LIBRARY_TRACKS,)
+ return {ProviderFeature.LIBRARY_TRACKS}
async def get_track(self, prov_track_id: str) -> Track:
"""Get full track details by id."""
from music_assistant import MusicAssistant
from music_assistant.models import ProviderInstanceType
-SUPPORTED_FEATURES = (
+SUPPORTED_FEATURES = {
ProviderFeature.ARTIST_METADATA,
ProviderFeature.ALBUM_METADATA,
ProviderFeature.TRACK_METADATA,
-)
+}
IMG_MAPPING = {
"strArtistThumb": ImageType.THUMB,
self.throttler = Throttler(rate_limit=1, period=1)
@property
- def supported_features(self) -> tuple[ProviderFeature, ...]:
+ def supported_features(self) -> set[ProviderFeature]:
"""Return the features supported by this Provider."""
return SUPPORTED_FEATURES
raise
@property
- def supported_features(self) -> tuple[ProviderFeature, ...]:
+ def supported_features(self) -> set[ProviderFeature]:
"""Return the features supported by this Provider."""
- return (
+ return {
ProviderFeature.LIBRARY_ARTISTS,
ProviderFeature.LIBRARY_ALBUMS,
ProviderFeature.LIBRARY_TRACKS,
ProviderFeature.SIMILAR_TRACKS,
ProviderFeature.BROWSE,
ProviderFeature.PLAYLIST_TRACKS_EDIT,
- )
+ }
async def search(
self,
from music_assistant.helpers.throttle_retry import Throttler
from music_assistant.models.music_provider import MusicProvider
-SUPPORTED_FEATURES = (
+SUPPORTED_FEATURES = {
ProviderFeature.LIBRARY_RADIOS,
ProviderFeature.BROWSE,
-)
+}
if TYPE_CHECKING:
from collections.abc import AsyncGenerator
_throttler: Throttler
@property
- def supported_features(self) -> tuple[ProviderFeature, ...]:
+ def supported_features(self) -> set[ProviderFeature]:
"""Return the features supported by this Provider."""
return SUPPORTED_FEATURES
)
YTM_PREMIUM_CHECK_TRACK_ID = "dQw4w9WgXcQ"
-SUPPORTED_FEATURES = (
+SUPPORTED_FEATURES = {
ProviderFeature.LIBRARY_ARTISTS,
ProviderFeature.LIBRARY_ALBUMS,
ProviderFeature.LIBRARY_TRACKS,
ProviderFeature.ARTIST_ALBUMS,
ProviderFeature.ARTIST_TOPTRACKS,
ProviderFeature.SIMILAR_TRACKS,
-)
+}
# TODO: fix disabled tests
raise LoginFailed("User does not have Youtube Music Premium")
@property
- def supported_features(self) -> tuple[ProviderFeature, ...]:
+ def supported_features(self) -> set[ProviderFeature]:
"""Return the features supported by this Provider."""
return SUPPORTED_FEATURES
"mashumaro==3.14",
"memory-tempfile==2.2.3",
"music-assistant-frontend==v2.9.15",
- "music-assistant-models==1.1.0",
+ "music-assistant-models==1.1.2",
"orjson==3.10.7",
"pillow==11.0.0",
"python-slugify==8.0.4",
mashumaro==3.14
memory-tempfile==2.2.3
music-assistant-frontend==v2.9.15
-music-assistant-models==1.1.0
+music-assistant-models==1.1.2
orjson==3.10.7
pillow==11.0.0
pkce==1.0.3