From 713e9505f4cefad86c9a68b329f99294b0819bfa Mon Sep 17 00:00:00 2001 From: Marcel van der Veldt Date: Mon, 27 Oct 2025 01:14:01 +0100 Subject: [PATCH] Fix issues with cached properies of players model --- music_assistant/controllers/player_queues.py | 31 +++--- music_assistant/controllers/streams.py | 43 ++++++- music_assistant/helpers/audio_buffer.py | 17 +-- music_assistant/helpers/cached_property.py | 105 ++++++++++++++++++ music_assistant/models/player.py | 18 +-- .../providers/builtin_player/player.py | 2 +- music_assistant/providers/musiccast/player.py | 2 +- 7 files changed, 175 insertions(+), 43 deletions(-) create mode 100644 music_assistant/helpers/cached_property.py diff --git a/music_assistant/controllers/player_queues.py b/music_assistant/controllers/player_queues.py index ffe3a713..fc74d02f 100644 --- a/music_assistant/controllers/player_queues.py +++ b/music_assistant/controllers/player_queues.py @@ -1971,32 +1971,33 @@ class PlayerQueuesController(CoreController): def _parse_player_current_item_id(self, queue_id: str, player: Player) -> str | None: """Parse QueueItem ID from Player's current url.""" - if not player.current_media: + if not player._current_media: + # YES, we use player._current_media on purpose here because we need the raw metadata return None # prefer queue_id and queue_item_id within the current media - if player.current_media.source_id == queue_id and player.current_media.queue_item_id: - return player.current_media.queue_item_id + if player._current_media.source_id == queue_id and player._current_media.queue_item_id: + return player._current_media.queue_item_id # special case for sonos players - if player.current_media.uri and player.current_media.uri.startswith(f"mass:{queue_id}"): - if player.current_media.queue_item_id: - return player.current_media.queue_item_id - return player.current_media.uri.split(":")[-1] + if player._current_media.uri and player._current_media.uri.startswith(f"mass:{queue_id}"): + if player._current_media.queue_item_id: + return player._current_media.queue_item_id + return player._current_media.uri.split(":")[-1] # try to extract the item id from a mass stream url if ( - player.current_media.uri - and queue_id in player.current_media.uri - and self.mass.streams.base_url in player.current_media.uri + player._current_media.uri + and queue_id in player._current_media.uri + and self.mass.streams.base_url in player._current_media.uri ): - current_item_id = player.current_media.uri.rsplit("/")[-1].split(".")[0] + current_item_id = player._current_media.uri.rsplit("/")[-1].split(".")[0] if self.get_item(queue_id, current_item_id): return current_item_id # try to extract the item id from a queue_id/item_id combi if ( - player.current_media.uri - and queue_id in player.current_media.uri - and "/" in player.current_media.uri + player._current_media.uri + and queue_id in player._current_media.uri + and "/" in player._current_media.uri ): - current_item_id = player.current_media.uri.split("/")[1] + current_item_id = player._current_media.uri.split("/")[1] if self.get_item(queue_id, current_item_id): return current_item_id diff --git a/music_assistant/controllers/streams.py b/music_assistant/controllers/streams.py index 89e0f259..fec7e2e3 100644 --- a/music_assistant/controllers/streams.py +++ b/music_assistant/controllers/streams.py @@ -318,6 +318,9 @@ class StreamsController(CoreController): ), ], ) + # Start periodic garbage collection task + # This ensures memory from audio buffers and streams is cleaned up regularly + self.mass.call_later(900, self._periodic_garbage_collection) # 15 minutes async def close(self) -> None: """Cleanup on exit.""" @@ -1092,6 +1095,14 @@ class StreamsController(CoreController): filter_params=player_filter_params, extra_input_args=["-y", "-re"], ): + if plugin_source.in_use_by != player_id: + self.logger.info( + "Aborting streaming PluginSource %s to %s " + "- another player took over control", + plugin_source_id, + player_id, + ) + break yield chunk finally: self.logger.debug( @@ -1232,9 +1243,7 @@ class StreamsController(CoreController): if TYPE_CHECKING: # avoid circular import assert isinstance(music_prov, MusicProvider) self.mass.create_task(music_prov.on_streamed(streamdetails)) - # Run garbage collection in executor to reclaim memory from large buffers - loop = asyncio.get_running_loop() - await loop.run_in_executor(None, gc.collect) + # Periodic GC task will handle memory cleanup every 15 minutes @use_buffer(30, 1) async def get_queue_item_stream_with_smartfade( @@ -1256,6 +1265,9 @@ class StreamsController(CoreController): if crossfade_data and crossfade_data.session_id != session_id: # invalidate expired crossfade data + self.logger.warning( + "Skipping crossfade data for queue %s - session mismatch ", queue.display_name + ) crossfade_data = None self.logger.debug( @@ -1361,6 +1373,10 @@ class StreamsController(CoreController): next_queue_item = await self.mass.player_queues.load_next_queue_item( queue.queue_id, queue_item.queue_item_id ) + # set index_in_buffer to prevent our next track is overwritten while preloading + queue.index_in_buffer = self.mass.player_queues.index_by_id( + queue.queue_id, next_queue_item.queue_item_id + ) next_queue_item_pcm_format = AudioFormat( content_type=INTERNAL_PCM_FORMAT.content_type, bit_depth=INTERNAL_PCM_FORMAT.bit_depth, @@ -1577,5 +1593,24 @@ class StreamsController(CoreController): ): self.logger.debug("Skipping crossfade: sample rate mismatch") return False - # all checks passed, crossfade is enabled/allowed + return True + + async def _periodic_garbage_collection(self) -> None: + """Periodic garbage collection to free up memory from audio buffers and streams.""" + self.logger.log( + VERBOSE_LOG_LEVEL, + "Running periodic garbage collection...", + ) + # Run garbage collection in executor to avoid blocking the event loop + # Since this runs periodically (not in response to subprocess cleanup), + # it's safe to run in a thread without causing thread-safety issues + loop = asyncio.get_running_loop() + collected = await loop.run_in_executor(None, gc.collect) + self.logger.log( + VERBOSE_LOG_LEVEL, + "Garbage collection completed, collected %d objects", + collected, + ) + # Schedule next run in 15 minutes + self.mass.call_later(900, self._periodic_garbage_collection) diff --git a/music_assistant/helpers/audio_buffer.py b/music_assistant/helpers/audio_buffer.py index 93f58b64..f2367131 100644 --- a/music_assistant/helpers/audio_buffer.py +++ b/music_assistant/helpers/audio_buffer.py @@ -3,7 +3,6 @@ from __future__ import annotations import asyncio -import gc import logging import time from collections import deque @@ -72,12 +71,6 @@ class AudioBuffer: return True return self._producer_task is not None and self._producer_task.cancelled() - @staticmethod - def _cleanup_chunks(chunks: deque[bytes]) -> None: - """Clear chunks and run garbage collection (runs in executor).""" - chunks.clear() - gc.collect() - @property def chunk_size_bytes(self) -> int: """Return the size in bytes of one second of PCM audio.""" @@ -254,11 +247,13 @@ class AudioBuffer: chunk_count, self._producer_task is not None, ) - # Cancel producer task if present + # Cancel producer task if present and wait for it to finish + # This ensures any subprocess cleanup happens on the main event loop if self._producer_task and not self._producer_task.done(): self._producer_task.cancel() with suppress(asyncio.CancelledError): await self._producer_task + # Cancel inactivity task if present if cancel_inactivity_task and self._inactivity_task and not self._inactivity_task.done(): self._inactivity_task.cancel() @@ -268,7 +263,6 @@ class AudioBuffer: async with self._lock: # Replace the deque instead of clearing it to avoid blocking # Clearing a large deque can take >100ms - old_chunks = self._chunks self._chunks = deque() self._discarded_chunks = 0 self._eof_received = False @@ -278,11 +272,6 @@ class AudioBuffer: self._data_available.notify_all() self._space_available.notify_all() - # Clear the old deque and run garbage collection in background - # to avoid blocking the event loop - loop = asyncio.get_running_loop() - loop.run_in_executor(None, self._cleanup_chunks, old_chunks) - async def set_eof(self) -> None: """Signal that no more data will be added to the buffer.""" async with self._lock: diff --git a/music_assistant/helpers/cached_property.py b/music_assistant/helpers/cached_property.py new file mode 100644 index 00000000..861d3baa --- /dev/null +++ b/music_assistant/helpers/cached_property.py @@ -0,0 +1,105 @@ +"""Helper utilities for cached properties with various expiration strategies.""" + +from __future__ import annotations + +import time +from typing import TYPE_CHECKING, Any + +if TYPE_CHECKING: + from collections.abc import Callable + + +class TimedCachedProperty: + """ + Cached property decorator with time-based expiration. + + Similar to cached_property but the cached value expires after a specified duration. + The property value is recalculated when accessed after expiration. + + The cached values are stored in the instance's `_cache` dictionary, which means: + - Calling `_cache.clear()` will clear all cached values and timestamps + - This integrates seamlessly with the Player class's `update_state()` method + - Both automatic (time-based) and manual cache clearing are supported + + :param ttl: Time-to-live in seconds (default: 5 seconds) + + Example: + >>> class MyClass: + ... def __init__(self): + ... self._cache = {} + ... + ... # Usage with default TTL (5 seconds) + ... @timed_cached_property + ... def property1(self) -> str: + ... return "computed value" + ... + ... # Usage with custom TTL + ... @timed_cached_property(ttl=10.0) + ... def property2(self) -> str: + ... return "computed value" + """ + + def __init__(self, ttl: float | Callable[..., Any] = 5.0) -> None: + """Initialize the timed cached property decorator.""" + # Support both @timed_cached_property and @timed_cached_property() + if callable(ttl): + # Used without parentheses: @timed_cached_property + self.func: Callable[..., Any] | None = ttl + self.ttl: float = 5.0 + self.attrname: str | None = None + else: + # Used with parentheses: @timed_cached_property() or @timed_cached_property(ttl=10) + self.func = None + self.ttl = ttl + self.attrname = None + + def __set_name__(self, owner: type, name: str) -> None: + """Store the attribute name when the descriptor is assigned to a class attribute.""" + self.attrname = name + + def __call__(self, func: Callable[..., Any]) -> TimedCachedProperty: + """Allow the decorator to be used with or without arguments.""" + # If func is already set, this is being used as @timed_cached_property + # without parentheses, so just return self + if self.func is not None: + return self + + # Otherwise, this is being used as @timed_cached_property() + # with parentheses, so set the func and return self + self.func = func + self.attrname = func.__name__ + return self + + def __get__(self, instance: Any, owner: type | None = None) -> Any: + """Get the cached value or compute it if expired or not cached.""" + if instance is None: + return self + + # Use the instance's _cache dict to store values and timestamps + cache: dict[str, Any] = instance._cache + cache_key = self.attrname or (self.func.__name__ if self.func else "unknown") + timestamp_key = f"{cache_key}_timestamp" + + # Check if we have a cached value and if it's still valid + current_time = time.time() + if cache_key in cache and timestamp_key in cache: + if current_time - cache[timestamp_key] < self.ttl: + # Cache is still valid + return cache[cache_key] + + # Cache miss or expired - compute new value + if self.func is None: + msg = "Function is not set" + raise RuntimeError(msg) + value = self.func(instance) + cache[cache_key] = value + cache[timestamp_key] = current_time + + return value + + +# Convenience alias for backward compatibility with lowercase naming +timed_cached_property = TimedCachedProperty + + +__all__ = ["TimedCachedProperty", "timed_cached_property"] diff --git a/music_assistant/models/player.py b/music_assistant/models/player.py index c6aa1135..a52a8de7 100644 --- a/music_assistant/models/player.py +++ b/music_assistant/models/player.py @@ -75,6 +75,7 @@ from music_assistant.constants import ( CONF_PRE_ANNOUNCE_CHIME_URL, CONF_VOLUME_CONTROL, ) +from music_assistant.helpers.cached_property import timed_cached_property from music_assistant.helpers.util import ( get_changed_dataclass_values, validate_announcement_chime_url, @@ -83,6 +84,7 @@ from music_assistant.helpers.util import ( if TYPE_CHECKING: from .player_provider import PlayerProvider + CONF_ENTRY_PRE_ANNOUNCE_CUSTOM_CHIME_URL = ConfigEntry( key=CONF_PRE_ANNOUNCE_CHIME_URL, type=ConfigEntryType.STRING, @@ -672,7 +674,7 @@ class Player(ABC): elif self.group_members: await self.set_members(player_ids_to_remove=self.group_members) - @cached_property + @property def synced_to(self) -> str | None: """ Return the id of the player this player is synced to (sync leader). @@ -773,7 +775,7 @@ class Player(ABC): return control.power_state return None - @cached_property + @property @final def volume_level(self) -> int | None: """ @@ -793,7 +795,7 @@ class Player(ABC): return control.volume_level return None - @cached_property + @property @final def volume_muted(self) -> bool | None: """ @@ -813,7 +815,7 @@ class Player(ABC): return control.volume_muted return None - @property + @timed_cached_property @final def active_source(self) -> str | None: """ @@ -825,7 +827,7 @@ class Player(ABC): # if the player is grouped/synced, use the active source of the group/parent player if parent_player_id := (self.active_group or self.synced_to): if parent_player := self.mass.players.get(parent_player_id): - return parent_player.active_source + return parent_player._active_source for plugin_source in self.mass.players.get_plugin_sources(): if plugin_source.in_use_by == self.player_id: return plugin_source.id @@ -910,7 +912,7 @@ class Player(ABC): active_groups = self.active_groups return active_groups[0] if active_groups else None - @cached_property + @timed_cached_property @final def current_media(self) -> PlayerMedia | None: """ @@ -922,7 +924,7 @@ class Player(ABC): # if the player is grouped/synced, use the current_media of the group/parent player if parent_player_id := (self.active_group or self.synced_to): if parent_player := self.mass.players.get(parent_player_id): - return parent_player.current_media + return cast("PlayerMedia | None", parent_player.current_media) # if a pluginsource is currently active, return those details if ( self.active_source @@ -1057,7 +1059,7 @@ class Player(ABC): return str(conf) return PLAYER_CONTROL_NONE - @cached_property + @property @final def group_volume(self) -> int: """ diff --git a/music_assistant/providers/builtin_player/player.py b/music_assistant/providers/builtin_player/player.py index 25160c1e..00130ccf 100644 --- a/music_assistant/providers/builtin_player/player.py +++ b/music_assistant/providers/builtin_player/player.py @@ -253,7 +253,7 @@ class BuiltinPlayer(Player): # We don't early exit here since playback would otherwise never start # on iOS devices with Home Assistant OS installations. - media = player.current_media + media = player._current_media if queue is None or media is None: raise web.HTTPNotFound(reason="No active queue or media found!") diff --git a/music_assistant/providers/musiccast/player.py b/music_assistant/providers/musiccast/player.py index fab8ce5f..b88bbdbc 100644 --- a/music_assistant/providers/musiccast/player.py +++ b/music_assistant/providers/musiccast/player.py @@ -213,7 +213,7 @@ class MusicCastPlayer(Player): # UPDATE PLAYBACK INFORMATION # Note to self: - # player.current_media tells queue controller what is playing + # player._current_media tells queue controller what is playing # and player.set_current_media is the helper function # do not access the queue controller to gain playback information here if ( -- 2.34.1