def _parse_player_current_item_id(self, queue_id: str, player: Player) -> str | None:
"""Parse QueueItem ID from Player's current url."""
- if not player.current_media:
+ if not player._current_media:
+ # YES, we use player._current_media on purpose here because we need the raw metadata
return None
# prefer queue_id and queue_item_id within the current media
- if player.current_media.source_id == queue_id and player.current_media.queue_item_id:
- return player.current_media.queue_item_id
+ if player._current_media.source_id == queue_id and player._current_media.queue_item_id:
+ return player._current_media.queue_item_id
# special case for sonos players
- if player.current_media.uri and player.current_media.uri.startswith(f"mass:{queue_id}"):
- if player.current_media.queue_item_id:
- return player.current_media.queue_item_id
- return player.current_media.uri.split(":")[-1]
+ if player._current_media.uri and player._current_media.uri.startswith(f"mass:{queue_id}"):
+ if player._current_media.queue_item_id:
+ return player._current_media.queue_item_id
+ return player._current_media.uri.split(":")[-1]
# try to extract the item id from a mass stream url
if (
- player.current_media.uri
- and queue_id in player.current_media.uri
- and self.mass.streams.base_url in player.current_media.uri
+ player._current_media.uri
+ and queue_id in player._current_media.uri
+ and self.mass.streams.base_url in player._current_media.uri
):
- current_item_id = player.current_media.uri.rsplit("/")[-1].split(".")[0]
+ current_item_id = player._current_media.uri.rsplit("/")[-1].split(".")[0]
if self.get_item(queue_id, current_item_id):
return current_item_id
# try to extract the item id from a queue_id/item_id combi
if (
- player.current_media.uri
- and queue_id in player.current_media.uri
- and "/" in player.current_media.uri
+ player._current_media.uri
+ and queue_id in player._current_media.uri
+ and "/" in player._current_media.uri
):
- current_item_id = player.current_media.uri.split("/")[1]
+ current_item_id = player._current_media.uri.split("/")[1]
if self.get_item(queue_id, current_item_id):
return current_item_id
),
],
)
+ # Start periodic garbage collection task
+ # This ensures memory from audio buffers and streams is cleaned up regularly
+ self.mass.call_later(900, self._periodic_garbage_collection) # 15 minutes
async def close(self) -> None:
"""Cleanup on exit."""
filter_params=player_filter_params,
extra_input_args=["-y", "-re"],
):
+ if plugin_source.in_use_by != player_id:
+ self.logger.info(
+ "Aborting streaming PluginSource %s to %s "
+ "- another player took over control",
+ plugin_source_id,
+ player_id,
+ )
+ break
yield chunk
finally:
self.logger.debug(
if TYPE_CHECKING: # avoid circular import
assert isinstance(music_prov, MusicProvider)
self.mass.create_task(music_prov.on_streamed(streamdetails))
- # Run garbage collection in executor to reclaim memory from large buffers
- loop = asyncio.get_running_loop()
- await loop.run_in_executor(None, gc.collect)
+ # Periodic GC task will handle memory cleanup every 15 minutes
@use_buffer(30, 1)
async def get_queue_item_stream_with_smartfade(
if crossfade_data and crossfade_data.session_id != session_id:
# invalidate expired crossfade data
+ self.logger.warning(
+ "Skipping crossfade data for queue %s - session mismatch ", queue.display_name
+ )
crossfade_data = None
self.logger.debug(
next_queue_item = await self.mass.player_queues.load_next_queue_item(
queue.queue_id, queue_item.queue_item_id
)
+ # set index_in_buffer to prevent our next track is overwritten while preloading
+ queue.index_in_buffer = self.mass.player_queues.index_by_id(
+ queue.queue_id, next_queue_item.queue_item_id
+ )
next_queue_item_pcm_format = AudioFormat(
content_type=INTERNAL_PCM_FORMAT.content_type,
bit_depth=INTERNAL_PCM_FORMAT.bit_depth,
):
self.logger.debug("Skipping crossfade: sample rate mismatch")
return False
- # all checks passed, crossfade is enabled/allowed
+
return True
+
+ async def _periodic_garbage_collection(self) -> None:
+ """Periodic garbage collection to free up memory from audio buffers and streams."""
+ self.logger.log(
+ VERBOSE_LOG_LEVEL,
+ "Running periodic garbage collection...",
+ )
+ # Run garbage collection in executor to avoid blocking the event loop
+ # Since this runs periodically (not in response to subprocess cleanup),
+ # it's safe to run in a thread without causing thread-safety issues
+ loop = asyncio.get_running_loop()
+ collected = await loop.run_in_executor(None, gc.collect)
+ self.logger.log(
+ VERBOSE_LOG_LEVEL,
+ "Garbage collection completed, collected %d objects",
+ collected,
+ )
+ # Schedule next run in 15 minutes
+ self.mass.call_later(900, self._periodic_garbage_collection)
from __future__ import annotations
import asyncio
-import gc
import logging
import time
from collections import deque
return True
return self._producer_task is not None and self._producer_task.cancelled()
- @staticmethod
- def _cleanup_chunks(chunks: deque[bytes]) -> None:
- """Clear chunks and run garbage collection (runs in executor)."""
- chunks.clear()
- gc.collect()
-
@property
def chunk_size_bytes(self) -> int:
"""Return the size in bytes of one second of PCM audio."""
chunk_count,
self._producer_task is not None,
)
- # Cancel producer task if present
+ # Cancel producer task if present and wait for it to finish
+ # This ensures any subprocess cleanup happens on the main event loop
if self._producer_task and not self._producer_task.done():
self._producer_task.cancel()
with suppress(asyncio.CancelledError):
await self._producer_task
+
# Cancel inactivity task if present
if cancel_inactivity_task and self._inactivity_task and not self._inactivity_task.done():
self._inactivity_task.cancel()
async with self._lock:
# Replace the deque instead of clearing it to avoid blocking
# Clearing a large deque can take >100ms
- old_chunks = self._chunks
self._chunks = deque()
self._discarded_chunks = 0
self._eof_received = False
self._data_available.notify_all()
self._space_available.notify_all()
- # Clear the old deque and run garbage collection in background
- # to avoid blocking the event loop
- loop = asyncio.get_running_loop()
- loop.run_in_executor(None, self._cleanup_chunks, old_chunks)
-
async def set_eof(self) -> None:
"""Signal that no more data will be added to the buffer."""
async with self._lock:
--- /dev/null
+"""Helper utilities for cached properties with various expiration strategies."""
+
+from __future__ import annotations
+
+import time
+from typing import TYPE_CHECKING, Any
+
+if TYPE_CHECKING:
+ from collections.abc import Callable
+
+
+class TimedCachedProperty:
+ """
+ Cached property decorator with time-based expiration.
+
+ Similar to cached_property but the cached value expires after a specified duration.
+ The property value is recalculated when accessed after expiration.
+
+ The cached values are stored in the instance's `_cache` dictionary, which means:
+ - Calling `_cache.clear()` will clear all cached values and timestamps
+ - This integrates seamlessly with the Player class's `update_state()` method
+ - Both automatic (time-based) and manual cache clearing are supported
+
+ :param ttl: Time-to-live in seconds (default: 5 seconds)
+
+ Example:
+ >>> class MyClass:
+ ... def __init__(self):
+ ... self._cache = {}
+ ...
+ ... # Usage with default TTL (5 seconds)
+ ... @timed_cached_property
+ ... def property1(self) -> str:
+ ... return "computed value"
+ ...
+ ... # Usage with custom TTL
+ ... @timed_cached_property(ttl=10.0)
+ ... def property2(self) -> str:
+ ... return "computed value"
+ """
+
+ def __init__(self, ttl: float | Callable[..., Any] = 5.0) -> None:
+ """Initialize the timed cached property decorator."""
+ # Support both @timed_cached_property and @timed_cached_property()
+ if callable(ttl):
+ # Used without parentheses: @timed_cached_property
+ self.func: Callable[..., Any] | None = ttl
+ self.ttl: float = 5.0
+ self.attrname: str | None = None
+ else:
+ # Used with parentheses: @timed_cached_property() or @timed_cached_property(ttl=10)
+ self.func = None
+ self.ttl = ttl
+ self.attrname = None
+
+ def __set_name__(self, owner: type, name: str) -> None:
+ """Store the attribute name when the descriptor is assigned to a class attribute."""
+ self.attrname = name
+
+ def __call__(self, func: Callable[..., Any]) -> TimedCachedProperty:
+ """Allow the decorator to be used with or without arguments."""
+ # If func is already set, this is being used as @timed_cached_property
+ # without parentheses, so just return self
+ if self.func is not None:
+ return self
+
+ # Otherwise, this is being used as @timed_cached_property()
+ # with parentheses, so set the func and return self
+ self.func = func
+ self.attrname = func.__name__
+ return self
+
+ def __get__(self, instance: Any, owner: type | None = None) -> Any:
+ """Get the cached value or compute it if expired or not cached."""
+ if instance is None:
+ return self
+
+ # Use the instance's _cache dict to store values and timestamps
+ cache: dict[str, Any] = instance._cache
+ cache_key = self.attrname or (self.func.__name__ if self.func else "unknown")
+ timestamp_key = f"{cache_key}_timestamp"
+
+ # Check if we have a cached value and if it's still valid
+ current_time = time.time()
+ if cache_key in cache and timestamp_key in cache:
+ if current_time - cache[timestamp_key] < self.ttl:
+ # Cache is still valid
+ return cache[cache_key]
+
+ # Cache miss or expired - compute new value
+ if self.func is None:
+ msg = "Function is not set"
+ raise RuntimeError(msg)
+ value = self.func(instance)
+ cache[cache_key] = value
+ cache[timestamp_key] = current_time
+
+ return value
+
+
+# Convenience alias for backward compatibility with lowercase naming
+timed_cached_property = TimedCachedProperty
+
+
+__all__ = ["TimedCachedProperty", "timed_cached_property"]
CONF_PRE_ANNOUNCE_CHIME_URL,
CONF_VOLUME_CONTROL,
)
+from music_assistant.helpers.cached_property import timed_cached_property
from music_assistant.helpers.util import (
get_changed_dataclass_values,
validate_announcement_chime_url,
if TYPE_CHECKING:
from .player_provider import PlayerProvider
+
CONF_ENTRY_PRE_ANNOUNCE_CUSTOM_CHIME_URL = ConfigEntry(
key=CONF_PRE_ANNOUNCE_CHIME_URL,
type=ConfigEntryType.STRING,
elif self.group_members:
await self.set_members(player_ids_to_remove=self.group_members)
- @cached_property
+ @property
def synced_to(self) -> str | None:
"""
Return the id of the player this player is synced to (sync leader).
return control.power_state
return None
- @cached_property
+ @property
@final
def volume_level(self) -> int | None:
"""
return control.volume_level
return None
- @cached_property
+ @property
@final
def volume_muted(self) -> bool | None:
"""
return control.volume_muted
return None
- @property
+ @timed_cached_property
@final
def active_source(self) -> str | None:
"""
# if the player is grouped/synced, use the active source of the group/parent player
if parent_player_id := (self.active_group or self.synced_to):
if parent_player := self.mass.players.get(parent_player_id):
- return parent_player.active_source
+ return parent_player._active_source
for plugin_source in self.mass.players.get_plugin_sources():
if plugin_source.in_use_by == self.player_id:
return plugin_source.id
active_groups = self.active_groups
return active_groups[0] if active_groups else None
- @cached_property
+ @timed_cached_property
@final
def current_media(self) -> PlayerMedia | None:
"""
# if the player is grouped/synced, use the current_media of the group/parent player
if parent_player_id := (self.active_group or self.synced_to):
if parent_player := self.mass.players.get(parent_player_id):
- return parent_player.current_media
+ return cast("PlayerMedia | None", parent_player.current_media)
# if a pluginsource is currently active, return those details
if (
self.active_source
return str(conf)
return PLAYER_CONTROL_NONE
- @cached_property
+ @property
@final
def group_volume(self) -> int:
"""
# We don't early exit here since playback would otherwise never start
# on iOS devices with Home Assistant OS installations.
- media = player.current_media
+ media = player._current_media
if queue is None or media is None:
raise web.HTTPNotFound(reason="No active queue or media found!")
# UPDATE PLAYBACK INFORMATION
# Note to self:
- # player.current_media tells queue controller what is playing
+ # player._current_media tells queue controller what is playing
# and player.set_current_media is the helper function
# do not access the queue controller to gain playback information here
if (