+++ /dev/null
-"""Helper utilities for cached properties with various expiration strategies."""
-
-from __future__ import annotations
-
-import time
-from typing import TYPE_CHECKING, Any
-
-if TYPE_CHECKING:
- from collections.abc import Callable
-
-
-class TimedCachedProperty:
- """
- Cached property decorator with time-based expiration.
-
- Similar to cached_property but the cached value expires after a specified duration.
- The property value is recalculated when accessed after expiration.
-
- The cached values are stored in the instance's `_cache` dictionary, which means:
- - Calling `_cache.clear()` will clear all cached values and timestamps
- - This integrates seamlessly with the Player class's `update_state()` method
- - Both automatic (time-based) and manual cache clearing are supported
-
- :param ttl: Time-to-live in seconds (default: 5 seconds)
-
- Example:
- >>> class MyClass:
- ... def __init__(self):
- ... self._cache = {}
- ...
- ... # Usage with default TTL (5 seconds)
- ... @timed_cached_property
- ... def property1(self) -> str:
- ... return "computed value"
- ...
- ... # Usage with custom TTL
- ... @timed_cached_property(ttl=10.0)
- ... def property2(self) -> str:
- ... return "computed value"
- """
-
- def __init__(self, ttl: float | Callable[..., Any] = 5.0) -> None:
- """Initialize the timed cached property decorator."""
- # Support both @timed_cached_property and @timed_cached_property()
- if callable(ttl):
- # Used without parentheses: @timed_cached_property
- self.func: Callable[..., Any] | None = ttl
- self.ttl: float = 5.0
- self.attrname: str | None = None
- else:
- # Used with parentheses: @timed_cached_property() or @timed_cached_property(ttl=10)
- self.func = None
- self.ttl = ttl
- self.attrname = None
-
- def __set_name__(self, owner: type, name: str) -> None:
- """Store the attribute name when the descriptor is assigned to a class attribute."""
- self.attrname = name
-
- def __call__(self, func: Callable[..., Any]) -> TimedCachedProperty:
- """Allow the decorator to be used with or without arguments."""
- # If func is already set, this is being used as @timed_cached_property
- # without parentheses, so just return self
- if self.func is not None:
- return self
-
- # Otherwise, this is being used as @timed_cached_property()
- # with parentheses, so set the func and return self
- self.func = func
- self.attrname = func.__name__
- return self
-
- def __get__(self, instance: Any, owner: type | None = None) -> Any:
- """Get the cached value or compute it if expired or not cached."""
- if instance is None:
- return self
-
- # Use the instance's _cache dict to store values and timestamps
- cache: dict[str, Any] = instance._cache
- cache_key = self.attrname or (self.func.__name__ if self.func else "unknown")
- timestamp_key = f"{cache_key}_timestamp"
-
- # Check if we have a cached value and if it's still valid
- current_time = time.time()
- if cache_key in cache and timestamp_key in cache:
- if current_time - cache[timestamp_key] < self.ttl:
- # Cache is still valid
- return cache[cache_key]
-
- # Cache miss or expired - compute new value
- if self.func is None:
- msg = "Function is not set"
- raise RuntimeError(msg)
- value = self.func(instance)
- cache[cache_key] = value
- cache[timestamp_key] = current_time
-
- return value
-
-
-# Convenience alias for backward compatibility with lowercase naming
-timed_cached_property = TimedCachedProperty
-
-
-__all__ = ["TimedCachedProperty", "timed_cached_property"]
CONF_PRE_ANNOUNCE_CHIME_URL,
CONF_VOLUME_CONTROL,
)
-from music_assistant.helpers.cached_property import timed_cached_property
from music_assistant.helpers.util import (
get_changed_dataclass_values,
validate_announcement_chime_url,
return control.volume_muted
return None
- @timed_cached_property
+ @property
@final
def active_source(self) -> str | None:
"""
"""
Return the FINAL source list of the player.
- This is a convenience property which calculates the final source list
+ This is a convenience property with the calculated final source list
based on any group memberships or source plugins that can be active.
"""
- sources = UniqueList(self._source_list)
- # always ensure the Music Assistant Queue is in the source list
- mass_source = next((x for x in sources if x.id == self.player_id), None)
- if mass_source is None:
- # if the MA queue is not in the source list, add it
- mass_source = PlayerSource(
- id=self.player_id,
- name="Music Assistant Queue",
- passive=False,
- # TODO: Do we want to dynamically set these based on the queue state ?
- can_play_pause=True,
- can_seek=True,
- can_next_previous=True,
- )
- sources.append(mass_source)
- # append all/any plugin sources (convert to PlayerSource to avoid deepcopy issues)
- for plugin_source in self.mass.players.get_plugin_sources():
- if hasattr(plugin_source, "as_player_source"):
- sources.append(plugin_source.as_player_source())
- else:
- sources.append(plugin_source)
- return sources
+ return self.__attr_source_list or UniqueList()
@cached_property
@final
This will return the ids of the groupplayers if any groups are active.
If no groups are currently active, this will return an empty list.
"""
- active_groups = []
- for player in self.mass.players.all(return_unavailable=False, return_disabled=False):
- if player.type != PlayerType.GROUP:
- continue
- if not (player.powered or player.playback_state == PlaybackState.PLAYING):
- continue
- if self.player_id in player.group_members:
- active_groups.append(player.player_id)
- return active_groups
+ return self.__attr_active_groups or []
@property
@final
active_groups = self.active_groups
return active_groups[0] if active_groups else None
- @timed_cached_property
+ @property
@final
def current_media(self) -> PlayerMedia | None:
"""
Return the current media being played by the player.
- This is a convenience property which calculates the current media
+ This is a convenience property with the calculates current media
based on any group memberships or source plugins that can be active.
"""
# if the player is grouped/synced, use the current_media of the group/parent player
if parent_player_id := (self.active_group or self.synced_to):
- if parent_player_id != self.player_id and (
- parent_player := self.mass.players.get(parent_player_id)
- ):
- return cast("PlayerMedia | None", parent_player.current_media)
- # if a pluginsource is currently active, return those details
- if (
- self.active_source
- and (source := self.mass.players.get_plugin_source(self.active_source))
- and source.metadata
- ):
- return PlayerMedia(
- uri=source.metadata.uri or source.id,
- media_type=MediaType.PLUGIN_SOURCE,
- title=source.metadata.title,
- artist=source.metadata.artist,
- album=source.metadata.album,
- image_url=source.metadata.image_url,
- duration=source.metadata.duration,
- source_id=source.id,
- elapsed_time=source.metadata.elapsed_time,
- elapsed_time_last_updated=source.metadata.elapsed_time_last_updated,
- )
- # if MA queue is active, return those details
- active_queue = None
- if self._current_media and self._current_media.source_id:
- active_queue = self.mass.player_queues.get(self._current_media.source_id)
- if not active_queue and self.active_source:
- active_queue = self.mass.player_queues.get(self.active_source)
-
- if active_queue and (current_item := active_queue.current_item):
- item_image_url = (
- # the image format needs to be 500x500 jpeg for maximum compatibility with players
- self.mass.metadata.get_image_url(current_item.image, size=500, image_format="png")
- if current_item.image
- else None
- )
- if current_item.streamdetails and (
- stream_metadata := current_item.streamdetails.stream_metadata
- ):
- # handle stream metadata in streamdetails (e.g. for radio stream)
- return PlayerMedia(
- uri=current_item.uri,
- media_type=current_item.media_type,
- title=stream_metadata.title or current_item.name,
- artist=stream_metadata.artist,
- album=stream_metadata.album or current_item.name,
- image_url=(stream_metadata.image_url or item_image_url),
- duration=stream_metadata.duration or current_item.duration,
- source_id=active_queue.queue_id,
- queue_item_id=current_item.queue_item_id,
- elapsed_time=stream_metadata.elapsed_time or int(active_queue.elapsed_time),
- elapsed_time_last_updated=stream_metadata.elapsed_time_last_updated
- or active_queue.elapsed_time_last_updated,
- )
- if media_item := current_item.media_item:
- # normal media item
- return PlayerMedia(
- uri=str(media_item.uri),
- media_type=media_item.media_type,
- title=media_item.name,
- artist=getattr(media_item, "artist_str", None),
- album=album.name if (album := getattr(media_item, "album", None)) else None,
- # the image format needs to be 500x500 jpeg for maximum player compatibility
- image_url=self.mass.metadata.get_image_url(
- current_item.media_item.image, size=500, image_format="jpeg"
- )
- or item_image_url
- if current_item.media_item.image
- else item_image_url,
- duration=media_item.duration,
- source_id=active_queue.queue_id,
- queue_item_id=current_item.queue_item_id,
- elapsed_time=int(active_queue.elapsed_time),
- elapsed_time_last_updated=active_queue.elapsed_time_last_updated,
- )
-
- # fallback to basic current item details
- return PlayerMedia(
- uri=current_item.uri,
- media_type=current_item.media_type,
- title=current_item.name,
- image_url=item_image_url,
- duration=current_item.duration,
- source_id=active_queue.queue_id,
- queue_item_id=current_item.queue_item_id,
- elapsed_time=int(active_queue.elapsed_time),
- elapsed_time_last_updated=active_queue.elapsed_time_last_updated,
- )
- # return native current media if no group/queue is active
- if self._current_media:
- return PlayerMedia(
- uri=self._current_media.uri,
- media_type=self._current_media.media_type,
- title=self._current_media.title,
- artist=self._current_media.artist,
- album=self._current_media.album,
- image_url=self._current_media.image_url,
- duration=self._current_media.duration,
- source_id=self._current_media.source_id or self._active_source,
- queue_item_id=self._current_media.queue_item_id,
- elapsed_time=self._current_media.elapsed_time or int(self.elapsed_time)
- if self.elapsed_time
- else None,
- elapsed_time_last_updated=self._current_media.elapsed_time_last_updated
- or self.elapsed_time_last_updated,
- )
- return None
+ if parent_player := self.mass.players.get(parent_player_id):
+ return parent_player.current_media
+ return self.__attr_current_media
@cached_property
@final
Returns a dict with the state attributes that have changed.
"""
+ self.__attr_active_groups = self.__calculate_active_groups()
+ self.__attr_current_media = self.__calculate_current_media()
+ self.__attr_source_list = self.__calculate_source_list()
prev_state = deepcopy(self._state)
self._state = PlayerState(
player_id=self.player_id,
recursive=True,
)
+ __attr_active_groups: list[str] | None = None
+
+ def __calculate_active_groups(self) -> list[str]:
+ """Calculate the active groups for the player."""
+ active_groups = []
+ for player in self.mass.players.all(return_unavailable=False, return_disabled=False):
+ if player.type != PlayerType.GROUP:
+ continue
+ if player.player_id == self.player_id:
+ continue
+ if not (player.powered or player.playback_state == PlaybackState.PLAYING):
+ continue
+ if self.player_id in player.group_members:
+ active_groups.append(player.player_id)
+ return active_groups
+
+ __attr_current_media: PlayerMedia | None = None
+
+ def __calculate_current_media(self) -> PlayerMedia | None:
+ """Calculate the current media for the player."""
+ # if a pluginsource is currently active, return those details
+ if (
+ self.active_source
+ and (source := self.mass.players.get_plugin_source(self.active_source))
+ and source.metadata
+ ):
+ return PlayerMedia(
+ uri=source.metadata.uri or source.id,
+ media_type=MediaType.PLUGIN_SOURCE,
+ title=source.metadata.title,
+ artist=source.metadata.artist,
+ album=source.metadata.album,
+ image_url=source.metadata.image_url,
+ duration=source.metadata.duration,
+ source_id=source.id,
+ elapsed_time=source.metadata.elapsed_time,
+ elapsed_time_last_updated=source.metadata.elapsed_time_last_updated,
+ )
+ # if MA queue is active, return those details
+ active_queue = None
+ if self._current_media and self._current_media.source_id:
+ active_queue = self.mass.player_queues.get(self._current_media.source_id)
+ if not active_queue and self.active_source:
+ active_queue = self.mass.player_queues.get(self.active_source)
+
+ if active_queue and (current_item := active_queue.current_item):
+ item_image_url = (
+ # the image format needs to be 500x500 jpeg for maximum compatibility with players
+ self.mass.metadata.get_image_url(current_item.image, size=500, image_format="png")
+ if current_item.image
+ else None
+ )
+ if current_item.streamdetails and (
+ stream_metadata := current_item.streamdetails.stream_metadata
+ ):
+ # handle stream metadata in streamdetails (e.g. for radio stream)
+ return PlayerMedia(
+ uri=current_item.uri,
+ media_type=current_item.media_type,
+ title=stream_metadata.title or current_item.name,
+ artist=stream_metadata.artist,
+ album=stream_metadata.album or current_item.name,
+ image_url=(stream_metadata.image_url or item_image_url),
+ duration=stream_metadata.duration or current_item.duration,
+ source_id=active_queue.queue_id,
+ queue_item_id=current_item.queue_item_id,
+ elapsed_time=stream_metadata.elapsed_time or int(active_queue.elapsed_time),
+ elapsed_time_last_updated=stream_metadata.elapsed_time_last_updated
+ or active_queue.elapsed_time_last_updated,
+ )
+ if media_item := current_item.media_item:
+ # normal media item
+ return PlayerMedia(
+ uri=str(media_item.uri),
+ media_type=media_item.media_type,
+ title=media_item.name,
+ artist=getattr(media_item, "artist_str", None),
+ album=album.name if (album := getattr(media_item, "album", None)) else None,
+ # the image format needs to be 500x500 jpeg for maximum player compatibility
+ image_url=self.mass.metadata.get_image_url(
+ current_item.media_item.image, size=500, image_format="jpeg"
+ )
+ or item_image_url
+ if current_item.media_item.image
+ else item_image_url,
+ duration=media_item.duration,
+ source_id=active_queue.queue_id,
+ queue_item_id=current_item.queue_item_id,
+ elapsed_time=int(active_queue.elapsed_time),
+ elapsed_time_last_updated=active_queue.elapsed_time_last_updated,
+ )
+
+ # fallback to basic current item details
+ return PlayerMedia(
+ uri=current_item.uri,
+ media_type=current_item.media_type,
+ title=current_item.name,
+ image_url=item_image_url,
+ duration=current_item.duration,
+ source_id=active_queue.queue_id,
+ queue_item_id=current_item.queue_item_id,
+ elapsed_time=int(active_queue.elapsed_time),
+ elapsed_time_last_updated=active_queue.elapsed_time_last_updated,
+ )
+ # return native current media if no group/queue is active
+ if self._current_media:
+ return PlayerMedia(
+ uri=self._current_media.uri,
+ media_type=self._current_media.media_type,
+ title=self._current_media.title,
+ artist=self._current_media.artist,
+ album=self._current_media.album,
+ image_url=self._current_media.image_url,
+ duration=self._current_media.duration,
+ source_id=self._current_media.source_id or self._active_source,
+ queue_item_id=self._current_media.queue_item_id,
+ elapsed_time=self._current_media.elapsed_time or int(self.elapsed_time)
+ if self.elapsed_time
+ else None,
+ elapsed_time_last_updated=self._current_media.elapsed_time_last_updated
+ or self.elapsed_time_last_updated,
+ )
+ return None
+
+ __attr_source_list: UniqueList[PlayerSource] | None = None
+
+ def __calculate_source_list(self) -> UniqueList[PlayerSource]:
+ """Calculate the source list for the player."""
+ sources = UniqueList(self._source_list)
+ # always ensure the Music Assistant Queue is in the source list
+ mass_source = next((x for x in sources if x.id == self.player_id), None)
+ if mass_source is None:
+ # if the MA queue is not in the source list, add it
+ mass_source = PlayerSource(
+ id=self.player_id,
+ name="Music Assistant Queue",
+ passive=False,
+ # TODO: Do we want to dynamically set these based on the queue state ?
+ can_play_pause=True,
+ can_seek=True,
+ can_next_previous=True,
+ )
+ sources.append(mass_source)
+ # append all/any plugin sources (convert to PlayerSource to avoid deepcopy issues)
+ for plugin_source in self.mass.players.get_plugin_sources():
+ if hasattr(plugin_source, "as_player_source"):
+ sources.append(plugin_source.as_player_source())
+ else:
+ sources.append(plugin_source)
+ return sources
+
def __hash__(self) -> int:
"""Return a hash of the Player."""
return hash(self.player_id)