# PLAYERPROVIDER FEATURES
#
SYNC_PLAYERS = "sync_players"
+ REMOVE_PLAYER = "remove_player"
#
# METADATAPROVIDER FEATURES
"""Error thrown when a provider action is denied because of permissions."""
error_code = 18
+
+
+class ActionUnavailable(MusicAssistantError):
+ """Error thrown when a action is denied because is is (temporary) unavailable/not possible."""
+
+ error_code = 19
import base64
import logging
import os
+from contextlib import suppress
from typing import TYPE_CHECKING, Any
from uuid import uuid4
PlayerConfig,
ProviderConfig,
)
-from music_assistant.common.models.enums import EventType, ProviderType
-from music_assistant.common.models.errors import InvalidDataError
+from music_assistant.common.models.enums import EventType, ProviderFeature, ProviderType
+from music_assistant.common.models.errors import (
+ ActionUnavailable,
+ InvalidDataError,
+ PlayerCommandFailed,
+ UnsupportedFeaturedException,
+)
from music_assistant.constants import (
CONF_CORE,
CONF_PLAYERS,
"""Save/update PlayerConfig."""
config = await self.get_player_config(player_id)
changed_keys = config.update(values)
-
if not changed_keys:
# no changes
return None
-
+ # validate/handle the update in the player manager
+ await self.mass.players.on_player_config_change(config, changed_keys)
+ # actually store changes (if the above did not raise)
conf_key = f"{CONF_PLAYERS}/{player_id}"
self.set(conf_key, config.to_raw())
# send config updated event
object_id=config.player_id,
data=config,
)
- # signal update to the player manager
- self.mass.players.on_player_config_changed(config, changed_keys)
# return full player config (just in case)
return await self.get_player_config(player_id)
conf_key = f"{CONF_PLAYERS}/{player_id}"
existing = self.get(conf_key)
if not existing:
- msg = f"Player {player_id} does not exist"
+ msg = f"Player configuration for {player_id} does not exist"
raise KeyError(msg)
+ player = self.mass.players.get(player_id)
+ player_prov = player.provider if player else existing["provider"]
+ player_provider = self.mass.get_provider(player_prov)
+ if player_provider and ProviderFeature.REMOVE_PLAYER in player_provider.supported_features:
+ # provider supports removal of player (e.g. group player)
+ await player_provider.remove_player(player_id)
+ elif player and player_provider and player.available:
+ # removing a player config while it is active is not allowed
+ # unless the provider repoirts it has the remove_player feature (e.g. group player)
+ raise ActionUnavailable("Can not remove config for an active player!")
+ # check for group memberships that need to be updated
+ if player and player.active_group and player_provider:
+ # try to remove from the group
+ group_player = self.mass.players.get(player.active_group)
+ with suppress(UnsupportedFeaturedException, PlayerCommandFailed):
+ await player_provider.set_members(
+ player.active_group,
+ [x for x in group_player.group_childs if x != player.player_id],
+ )
+ # tell the player manager to remove the player if its lingering around
+ # set cleanup_flag to false otherwise we end up in an infinite loop
+ self.mass.players.remove(player_id, cleanup_config=False)
+ # remove the actual config if all of the above passed
self.remove(conf_key)
- # signal update to the player manager
- self.mass.players.on_player_config_removed(player_id)
def create_default_player_config(
self,
return
if (
(queue := self._queues.get(queue_id))
+ and queue.active
and queue_player.powered
and queue.state == PlayerState.PAUSED
):
for child_player_id in child_player_ids:
if child_player_id == target_player:
continue
- if not (child_player := self.get(child_player_id)):
+ if not (child_player := self.get(child_player_id)) or not child_player.available:
self.logger.warning("Player %s is not available", child_player_id)
continue
if PlayerFeature.SYNC not in child_player.supported_features:
self.register(player)
def remove(self, player_id: str, cleanup_config: bool = True) -> None:
- """Remove a player from the registry."""
+ """Remove a player from the player manager."""
player = self._players.pop(player_id, None)
if player is None:
return
# ignore updates for disabled players
return
+ # correct available state if needed
+ if not player.enabled:
+ player.available = False
+
# always signal update to the playerqueue
self.mass.player_queues.on_player_update(player, changed_values)
"""Get (child) players attached to a group player or syncgroup."""
for child_id in list(group_player.group_childs):
if child_player := self.get(child_id, False):
- if not child_player.available:
+ if not child_player.available or not child_player.enabled:
continue
if not (not only_powered or child_player.powered):
continue
self.mass.loop.call_soon(self.update, player_id)
await asyncio.sleep(1)
- def on_player_config_changed(self, config: PlayerConfig, changed_keys: set[str]) -> None:
+ async def on_player_config_change(self, config: PlayerConfig, changed_keys: set[str]) -> None:
"""Call (by config manager) when the configuration of a player changes."""
+ player_disabled = "enabled" in changed_keys and not config.enabled
+ # signal player provider that the config changed
+ if player_provider := self.mass.get_provider(config.provider):
+ with suppress(PlayerUnavailableError):
+ await player_provider.on_player_config_change(config, changed_keys)
if not (player := self.mass.players.get(config.player_id)):
return
- if config.enabled:
- player_prov = self.mass.players.get_player_provider(config.player_id)
- self.mass.create_task(player_prov.poll_player(config.player_id))
+ if player_disabled:
+ # edge case: ensure that the player is powered off if the player gets disabled
+ await self.cmd_power(config.player_id, False)
+ player.available = False
+ # if the player was playing, restart playback
+ elif not player_disabled and player.state == PlayerState.PLAYING:
+ self.mass.call_later(1, self.mass.player_queues.resume(player.active_source))
+ # check for group memberships that need to be updated
+ if player_disabled and player.active_group and player_provider:
+ # try to remove from the group
+ group_player = self.mass.players.get(player.active_group)
+ with suppress(UnsupportedFeaturedException, PlayerCommandFailed):
+ await player_provider.set_members(
+ player.active_group,
+ [x for x in group_player.group_childs if x != player.player_id],
+ )
player.enabled = config.enabled
- # signal player provider that the config changed
- with suppress(PlayerUnavailableError):
- if provider := self.mass.get_provider(config.provider):
- provider.on_player_config_changed(config, changed_keys)
self.mass.players.update(config.player_id, force_update=True)
- # if the player was playing, restart playback
- if player and player.state == PlayerState.PLAYING:
- self.mass.create_task(self.mass.player_queues.resume(player.active_source))
-
- def on_player_config_removed(self, player_id: str) -> None:
- """Call (by config manager) when the configuration of a player is removed."""
- if (player := self.mass.players.get(player_id)) and player.available:
- self.mass.players.update(player_id, force_update=True)
- if player and (provider := self.mass.get_provider(player.provider)):
- provider = cast(PlayerProvider, provider)
- provider.on_player_config_removed(player_id)
- if not self.mass.players.get(player_id):
- self.mass.signal_event(EventType.PLAYER_REMOVED, player_id)
async def _play_announcement(
self,
return self.domain
return self.instance_id
+ async def loaded_in_mass(self) -> None:
+ """Call after the provider has been loaded."""
+ self.mass.music.start_sync(providers=[self.instance_id])
+
async def search(
self,
search_query: str,
from abc import abstractmethod
+from zeroconf import ServiceStateChange
+from zeroconf.asyncio import AsyncServiceInfo
+
from music_assistant.common.models.config_entries import (
BASE_PLAYER_CONFIG_ENTRIES,
CONF_ENTRY_ANNOUNCE_VOLUME,
ConfigEntry,
PlayerConfig,
)
+from music_assistant.common.models.errors import UnsupportedFeaturedException
from music_assistant.common.models.player import Player, PlayerMedia
from .provider import Provider
Player Provider implementations should inherit from this base model.
"""
+ async def loaded_in_mass(self) -> None:
+ """Call after the provider has been loaded."""
+ await self.discover_players()
+
async def get_player_config_entries(self, player_id: str) -> tuple[ConfigEntry, ...]:
"""Return all (provider/player specific) Config Entries for the given player (if any)."""
return (
CONF_ENTRY_ANNOUNCE_VOLUME_MAX,
)
- def on_player_config_changed(self, config: PlayerConfig, changed_keys: set[str]) -> None:
+ async def on_player_config_change(self, config: PlayerConfig, changed_keys: set[str]) -> None:
"""Call (by config manager) when the configuration of a player changes."""
-
- def on_player_config_removed(self, player_id: str) -> None:
- """Call (by config manager) when the configuration of a player is removed."""
+ # default implementation: feel free to override
+ if (
+ "enabled" in changed_keys
+ and config.enabled
+ and not self.mass.players.get(config.player_id)
+ ):
+ # if a player gets enabled, trigger discovery
+ task_id = f"discover_players_{self.instance_id}"
+ self.mass.call_later(5, self.discover_players, task_id=task_id)
+ else:
+ await self.poll_player(config.player_id)
@abstractmethod
async def cmd_stop(self, player_id: str) -> None:
if 'needs_poll' is set to True in the player object.
"""
+ async def remove_player(self, player_id: str) -> None:
+ """Remove a player."""
+ # will only be called for players with REMOVE_PLAYER feature set.
+ raise NotImplementedError
+
+ async def discover_players(self) -> None:
+ """Discover players for this provider."""
+ # This will be called (once) when the player provider is loaded into MA.
+ # Default implementation is mdns discovery, which will also automatically
+ # discovery players during runtime. If a provider overrides this method and
+ # doesn't use mdns, it is responsible for periodically searching for new players.
+ if not self.available:
+ return
+ for mdns_type in self.manifest.mdns_discovery or []:
+ for mdns_name in set(self.mass.aiozc.zeroconf.cache.cache):
+ if mdns_type not in mdns_name or mdns_type == mdns_name:
+ continue
+ info = AsyncServiceInfo(mdns_type, mdns_name)
+ if await info.async_request(self.mass.aiozc.zeroconf, 3000):
+ await self.on_mdns_service_state_change(
+ mdns_name, ServiceStateChange.Added, info
+ )
+
+ async def set_members(self, player_id: str, members: list[str]) -> None:
+ """Set members for a groupplayer."""
+ # will only be called for (group)players with SET_MEMBERS feature set.
+ raise UnsupportedFeaturedException
+
# DO NOT OVERRIDE BELOW
@property
# Please note that you need to call the super() method to get the default entries.
return ()
- def on_player_config_changed(self, config: PlayerConfig, changed_keys: set[str]) -> None:
+ async def on_player_config_change(self, config: PlayerConfig, changed_keys: set[str]) -> None:
"""Call (by config manager) when the configuration of a player changes."""
# OPTIONAL
- # this callback will be called whenever a player config changes
- # you can use this to react to changes in player configuration
- # but this is completely optional and you can leave it out if not needed.
-
- def on_player_config_removed(self, player_id: str) -> None:
- """Call (by config manager) when the configuration of a player is removed."""
- # OPTIONAL
- # ensure that any group players get removed
- # this callback will be called whenever a player config is removed
+ # this will be called whenever a player config changes
# you can use this to react to changes in player configuration
# but this is completely optional and you can leave it out if not needed.
if address is None:
return
self.logger.debug("Discovered Airplay device %s on %s", display_name, address)
- self._players[player_id] = AirPlayPlayer(self, player_id, info, address)
manufacturer, model = get_model_from_am(info.decoded_properties.get("am"))
if "apple tv" in model.lower():
# For now, we ignore the Apple TV until we implement the authentication.
if not self.mass.config.get_raw_player_config_value(player_id, "enabled", True):
self.logger.debug("Ignoring %s in discovery as it is disabled.", display_name)
return
+ self._players[player_id] = AirPlayPlayer(self, player_id, info, address)
if not (volume := await self.mass.cache.get(player_id, base_key=CACHE_KEY_PREV_VOLUME)):
volume = FALLBACK_VOLUME
mass_player = Player(
if not mass_player.available:
self.logger.debug("Player back online: %s", mass_player.display_name)
bluos_player.client.sync()
- mass_player.available = True
bluos_player.discovery_info = info
self.mass.players.update(self.player_id)
return
self._playlists_dir = os.path.join(self.mass.storage_path, "playlists")
if not await asyncio.to_thread(os.path.exists, self._playlists_dir):
await asyncio.to_thread(os.mkdir, self._playlists_dir)
+ await super().loaded_in_mass()
@property
def is_streaming_provider(self) -> bool:
from pychromecast.models import CastInfo
from pychromecast.socket_client import ConnectionStatus
- from music_assistant.common.models.config_entries import PlayerConfig, ProviderConfig
+ from music_assistant.common.models.config_entries import ProviderConfig
from music_assistant.common.models.provider import ProviderManifest
from music_assistant.server import MusicAssistant
from music_assistant.server.models import ProviderInstanceType
else:
logging.getLogger("pychromecast").setLevel(self.logger.level + 10)
- async def loaded_in_mass(self) -> None:
- """Call after the provider has been loaded."""
+ async def discover_players(self) -> None:
+ """Discover Cast players on the network."""
# start discovery in executor
await self.mass.loop.run_in_executor(None, self.browser.start_discovery)
base_entries = await super().get_player_config_entries(player_id)
return (*base_entries, *PLAYER_CONFIG_ENTRIES, CONF_ENTRY_SAMPLE_RATES_CAST)
- def on_player_config_changed(
- self,
- config: PlayerConfig,
- changed_keys: set[str],
- ) -> None:
- """Call (by config manager) when the configuration of a player changes."""
- super().on_player_config_changed(config, changed_keys)
- if "enabled" in changed_keys and config.player_id not in self.castplayers:
- self.mass.create_task(self.mass.load_provider, self.instance_id)
-
async def cmd_stop(self, player_id: str) -> None:
"""Send STOP command to given player."""
castplayer = self.castplayers[player_id]
self.upnp_factory = UpnpFactory(self.requester, non_strict=True)
self.notify_server = DLNANotifyServer(self.requester, self.mass)
- async def loaded_in_mass(self) -> None:
- """Call after the provider has been loaded."""
- await self._run_discovery()
-
async def unload(self) -> None:
"""
Handle unload/close of the provider.
base_entries = await super().get_player_config_entries(player_id)
return base_entries + PLAYER_CONFIG_ENTRIES
- def on_player_config_changed(
+ async def on_player_config_change(
self,
config: PlayerConfig,
changed_keys: set[str],
) -> None:
"""Call (by config manager) when the configuration of a player changes."""
- super().on_player_config_changed(config, changed_keys)
- # run discovery to catch any re-enabled players
- self.mass.create_task(self._run_discovery())
- # reset player features based on config values
- if not (dlna_player := self.dlnaplayers.get(config.player_id)):
- return
- self._set_player_features(dlna_player)
+ if dlna_player := self.dlnaplayers.get(config.player_id):
+ # reset player features based on config values
+ self._set_player_features(dlna_player)
+ else:
+ # run discovery to catch any re-enabled players
+ self.mass.create_task(self.discover_players())
@catch_request_errors
async def cmd_stop(self, player_id: str) -> None:
finally:
dlna_player.force_poll = False
- async def _run_discovery(self, use_multicast: bool = False) -> None:
+ async def discover_players(self, use_multicast: bool = False) -> None:
"""Discover DLNA players on the network."""
if self._discovery_running:
return
self._discovery_running = False
def reschedule() -> None:
- self.mass.create_task(self._run_discovery(use_multicast=not use_multicast))
+ self.mass.create_task(self.discover_players(use_multicast=not use_multicast))
# reschedule self once finished
self.mass.loop.call_later(300, reschedule)
async def loaded_in_mass(self) -> None:
"""Call after the provider has been loaded."""
+ await super().loaded_in_mass()
player_ids: list[str] = self.config.get_value(CONF_PLAYERS)
# prefetch the device- and entity registry
device_registry = {x["id"]: x for x in await self.hass_prov.hass.get_device_registry()}
self.mass.register_api_command("player_group/create", self.create_group),
]
+ @property
+ def supported_features(self) -> tuple[ProviderFeature, ...]:
+ """Return the features supported by this Provider."""
+ return (ProviderFeature.REMOVE_PLAYER,)
+
async def loaded_in_mass(self) -> None:
"""Call after the provider has been loaded."""
+ await super().loaded_in_mass()
# temp: migrate old config entries
# remove this after MA 2.4 release
for player_config in await self.mass.config.get_player_configs():
*(entry for entry in child_config_entries if entry.key in allowed_conf_entries),
)
- def on_player_config_changed(self, config: PlayerConfig, changed_keys: set[str]) -> None:
+ async def on_player_config_change(self, config: PlayerConfig, changed_keys: set[str]) -> None:
"""Call (by config manager) when the configuration of a player changes."""
- if "enabled" in changed_keys and not config.enabled:
- # edge case: ensure that the player is powered off if the player gets disabled
- self.mass.create_task(self.cmd_power(config.player_id, False))
if f"values/{CONF_GROUP_MEMBERS}" in changed_keys:
members = config.get_value(CONF_GROUP_MEMBERS)
# ensure we filter invalid members
members = self._filter_members(config.get_value(CONF_GROUP_TYPE), members)
- self.mass.config.set_raw_player_config_value(
- config.player_id, CONF_GROUP_MEMBERS, members
- )
- if player := self.mass.players.get(config.player_id):
- player.group_childs = members
- self.mass.players.update(config.player_id)
-
- def on_player_config_removed(self, player_id: str) -> None:
- """Call (by config manager) when the configuration of a player is removed."""
- if not (group_player := self.mass.players.get(player_id)):
- return
- if group_player.powered:
- # edge case: the group player is powered and being removed
- for member in self.mass.players.iter_group_members(group_player, only_powered=True):
- member.active_group = None
- if member.state == PlayerState.IDLE:
- continue
- if member.synced_to:
- continue
- self.mass.create_task(
- self.mass.players.cmd_stop(member.player_id, skip_redirect=True)
- )
- self.mass.players.remove(group_player.player_id, False)
+ if group_player := self.mass.players.get(config.player_id):
+ group_player.group_childs = members
+ if group_player.powered:
+ # power on group player (which will also resync) if needed
+ await self.cmd_power(group_player.player_id, True)
+ await super().on_player_config_change(config, changed_keys)
async def cmd_stop(self, player_id: str) -> None:
"""Send STOP command to given player."""
) -> None:
"""Handle PLAY MEDIA on given player."""
group_player = self.mass.players.get(player_id)
- # power on (or resync) if needed
- if group_player.powered and player_id.startswith(SYNCGROUP_PREFIX):
- await self._sync_syncgroup(group_player)
- else:
- await self.cmd_power(player_id, True)
+ # power on (which will also resync) if needed
+ await self.cmd_power(player_id, True)
# handle play_media for sync group
if player_id.startswith(SYNCGROUP_PREFIX):
group_player_id=new_group_id, group_type=group_type, name=name, members=members
)
+ async def remove_player(self, player_id: str) -> None:
+ """Remove a group player."""
+ if not (group_player := self.mass.players.get(player_id)):
+ return
+ if group_player.powered:
+ # edge case: the group player is powered and being removed
+ for member in self.mass.players.iter_group_members(group_player, only_powered=True):
+ member.active_group = None
+ if member.state == PlayerState.IDLE:
+ continue
+ if member.synced_to:
+ continue
+ await self.mass.players.cmd_stop(member.player_id, skip_redirect=True)
+
async def _register_all_players(self) -> None:
"""Register all (virtual/fake) group players in the Player controller."""
player_configs = await self.mass.config.get_player_configs(
"""Sync all (possible) players of a syncgroup."""
sync_leader = self._select_sync_leader(group_player)
members_to_sync: list[str] = []
- for member in self.mass.players.iter_group_members(group_player, active_only=True):
+ for member in self.mass.players.iter_group_members(group_player, active_only=False):
if sync_leader.player_id == member.player_id:
# skip sync leader
continue
async def loaded_in_mass(self) -> None:
"""Call after the provider has been loaded."""
+ await super().loaded_in_mass()
self.slimproto.subscribe(self._client_callback)
self.mass.streams.register_dynamic_route(
"/slimproto/multi", self._serve_multi_client_stream
)
)
- def on_player_config_changed(self, config: PlayerConfig, changed_keys: set[str]) -> None:
+ async def on_player_config_change(self, config: PlayerConfig, changed_keys: set[str]) -> None:
"""Call (by config manager) when the configuration of a player changes."""
- super().on_player_config_changed(config, changed_keys)
-
if slimplayer := self.slimproto.get_player(config.player_id):
- self.mass.create_task(self._set_preset_items(slimplayer))
- self.mass.create_task(self._set_display(slimplayer))
+ await self._set_preset_items(slimplayer)
+ await self._set_display(slimplayer)
+ await super().on_player_config_change(config, changed_keys)
async def cmd_stop(self, player_id: str) -> None:
"""Send STOP command to given player."""
from music_assistant.common.models.errors import SetupFailedError
from music_assistant.common.models.media_items import AudioFormat
from music_assistant.common.models.player import DeviceInfo, Player, PlayerMedia
-from music_assistant.server.helpers.audio import (
- FFMpeg,
- get_ffmpeg_stream,
- get_player_filter_params,
-)
+from music_assistant.server.helpers.audio import FFMpeg, get_ffmpeg_stream, get_player_filter_params
from music_assistant.server.helpers.process import AsyncProcess, check_output
from music_assistant.server.models.player_provider import PlayerProvider
async def loaded_in_mass(self) -> None:
"""Call after the provider has been loaded."""
+ await super().loaded_in_mass()
# initial load of players
self._handle_update()
self._snapserver.stop()
await self._stop_builtin_server()
- def on_player_config_removed(self, player_id: str) -> None:
- """Call (by config manager) when the configuration of a player is removed."""
- super().on_player_config_removed(player_id)
- if self._use_builtin_server:
- self.mass.create_task(
- self._snapserver.delete_client(self._get_snapclient_id(player_id))
- )
-
def _handle_update(self) -> None:
"""Process Snapcast init Player/Group and set callback ."""
for snap_client in self._snapserver.clients:
import shortuuid
from aiohttp import web
+from aiohttp.client_exceptions import ClientError
from aiosonos.api.models import ContainerType, MusicService, SonosCapability
from aiosonos.api.models import DiscoveryInfo as SonosDiscoveryInfo
from aiosonos.api.models import PlayBackState as SonosPlayBackState
if not self.mass.config.get_raw_player_config_value(player_id, "enabled", True):
self.logger.debug("Ignoring %s in discovery as it is disabled.", name)
return
- if not (discovery_info := await get_discovery_info(self.mass.http_session, address)):
- self.logger.debug("Ignoring %s in discovery as it is not reachable.", name)
+ try:
+ discovery_info = await get_discovery_info(self.mass.http_session, address)
+ except ClientError as err:
+ self.logger.debug("Ignoring %s in discovery as it is not reachable: %s", name, str(err))
return
display_name = discovery_info["device"].get("name") or name
if SonosCapability.PLAYBACK not in discovery_info["device"]["capabilities"]:
self.creation_lock = asyncio.Lock()
self._known_invisible: set[SoCo] = set()
- async def loaded_in_mass(self) -> None:
- """Call after the provider has been loaded."""
- await self._run_discovery()
-
async def unload(self) -> None:
"""Handle close/cleanup of the provider."""
if self._discovery_reschedule_timer:
except ConnectionResetError as err:
raise PlayerUnavailableError from err
- async def _run_discovery(self) -> None:
+ async def discover_players(self) -> None:
"""Discover Sonos players on the network."""
if self._discovery_running:
return
def reschedule() -> None:
self._discovery_reschedule_timer = None
- self.mass.create_task(self._run_discovery())
+ self.mass.create_task(self.discover_players())
# reschedule self once finished
self._discovery_reschedule_timer = self.mass.loop.call_later(1800, reschedule)
# try login which will raise if it fails
await self.login()
- async def loaded_in_mass(self) -> None:
- """Call after the provider has been loaded."""
-
@property
def supported_features(self) -> tuple[ProviderFeature, ...]:
"""Return the features supported by this Provider."""
self.config.set(f"{CONF_PROVIDERS}/{conf.instance_id}/last_error", None)
self.signal_event(EventType.PROVIDERS_UPDATED, data=self.get_providers())
await self._update_available_providers_cache()
- # run initial discovery after load
- for mdns_type in provider.manifest.mdns_discovery or []:
- for mdns_name in set(self.aiozc.zeroconf.cache.cache):
- if mdns_type not in mdns_name or mdns_type == mdns_name:
- continue
- info = AsyncServiceInfo(mdns_type, mdns_name)
- if await info.async_request(self.aiozc.zeroconf, 3000):
- await provider.on_mdns_service_state_change(
- mdns_name, ServiceStateChange.Added, info
- )
- # if this is a music provider, start sync
- if provider.type == ProviderType.MUSIC:
- self.music.start_sync(providers=[provider.instance_id])
async def __load_provider_manifests(self) -> None:
"""Preload all available provider manifest files."""