Fix issues with cached properies of players model
authorMarcel van der Veldt <m.vanderveldt@outlook.com>
Mon, 27 Oct 2025 00:14:01 +0000 (01:14 +0100)
committerMarcel van der Veldt <m.vanderveldt@outlook.com>
Mon, 27 Oct 2025 00:14:01 +0000 (01:14 +0100)
music_assistant/controllers/player_queues.py
music_assistant/controllers/streams.py
music_assistant/helpers/audio_buffer.py
music_assistant/helpers/cached_property.py [new file with mode: 0644]
music_assistant/models/player.py
music_assistant/providers/builtin_player/player.py
music_assistant/providers/musiccast/player.py

index ffe3a713946f485aca578138a4e425f51e51871f..fc74d02f80518c8c8a5e3d97857af29f412891d8 100644 (file)
@@ -1971,32 +1971,33 @@ class PlayerQueuesController(CoreController):
 
     def _parse_player_current_item_id(self, queue_id: str, player: Player) -> str | None:
         """Parse QueueItem ID from Player's current url."""
-        if not player.current_media:
+        if not player._current_media:
+            # YES, we use player._current_media on purpose here because we need the raw metadata
             return None
         # prefer queue_id and queue_item_id within the current media
-        if player.current_media.source_id == queue_id and player.current_media.queue_item_id:
-            return player.current_media.queue_item_id
+        if player._current_media.source_id == queue_id and player._current_media.queue_item_id:
+            return player._current_media.queue_item_id
         # special case for sonos players
-        if player.current_media.uri and player.current_media.uri.startswith(f"mass:{queue_id}"):
-            if player.current_media.queue_item_id:
-                return player.current_media.queue_item_id
-            return player.current_media.uri.split(":")[-1]
+        if player._current_media.uri and player._current_media.uri.startswith(f"mass:{queue_id}"):
+            if player._current_media.queue_item_id:
+                return player._current_media.queue_item_id
+            return player._current_media.uri.split(":")[-1]
         # try to extract the item id from a mass stream url
         if (
-            player.current_media.uri
-            and queue_id in player.current_media.uri
-            and self.mass.streams.base_url in player.current_media.uri
+            player._current_media.uri
+            and queue_id in player._current_media.uri
+            and self.mass.streams.base_url in player._current_media.uri
         ):
-            current_item_id = player.current_media.uri.rsplit("/")[-1].split(".")[0]
+            current_item_id = player._current_media.uri.rsplit("/")[-1].split(".")[0]
             if self.get_item(queue_id, current_item_id):
                 return current_item_id
         # try to extract the item id from a queue_id/item_id combi
         if (
-            player.current_media.uri
-            and queue_id in player.current_media.uri
-            and "/" in player.current_media.uri
+            player._current_media.uri
+            and queue_id in player._current_media.uri
+            and "/" in player._current_media.uri
         ):
-            current_item_id = player.current_media.uri.split("/")[1]
+            current_item_id = player._current_media.uri.split("/")[1]
             if self.get_item(queue_id, current_item_id):
                 return current_item_id
 
index 89e0f25907a6960797f5e3b0a6b5e16141d590ae..fec7e2e33812b2af2ee9dc0e7243314229adb10b 100644 (file)
@@ -318,6 +318,9 @@ class StreamsController(CoreController):
                 ),
             ],
         )
+        # Start periodic garbage collection task
+        # This ensures memory from audio buffers and streams is cleaned up regularly
+        self.mass.call_later(900, self._periodic_garbage_collection)  # 15 minutes
 
     async def close(self) -> None:
         """Cleanup on exit."""
@@ -1092,6 +1095,14 @@ class StreamsController(CoreController):
                 filter_params=player_filter_params,
                 extra_input_args=["-y", "-re"],
             ):
+                if plugin_source.in_use_by != player_id:
+                    self.logger.info(
+                        "Aborting streaming PluginSource %s to %s "
+                        "- another player took over control",
+                        plugin_source_id,
+                        player_id,
+                    )
+                    break
                 yield chunk
         finally:
             self.logger.debug(
@@ -1232,9 +1243,7 @@ class StreamsController(CoreController):
                 if TYPE_CHECKING:  # avoid circular import
                     assert isinstance(music_prov, MusicProvider)
                 self.mass.create_task(music_prov.on_streamed(streamdetails))
-            # Run garbage collection in executor to reclaim memory from large buffers
-            loop = asyncio.get_running_loop()
-            await loop.run_in_executor(None, gc.collect)
+            # Periodic GC task will handle memory cleanup every 15 minutes
 
     @use_buffer(30, 1)
     async def get_queue_item_stream_with_smartfade(
@@ -1256,6 +1265,9 @@ class StreamsController(CoreController):
 
         if crossfade_data and crossfade_data.session_id != session_id:
             # invalidate expired crossfade data
+            self.logger.warning(
+                "Skipping crossfade data for queue %s - session mismatch ", queue.display_name
+            )
             crossfade_data = None
 
         self.logger.debug(
@@ -1361,6 +1373,10 @@ class StreamsController(CoreController):
                 next_queue_item = await self.mass.player_queues.load_next_queue_item(
                     queue.queue_id, queue_item.queue_item_id
                 )
+                # set index_in_buffer to prevent our next track is overwritten while preloading
+                queue.index_in_buffer = self.mass.player_queues.index_by_id(
+                    queue.queue_id, next_queue_item.queue_item_id
+                )
                 next_queue_item_pcm_format = AudioFormat(
                     content_type=INTERNAL_PCM_FORMAT.content_type,
                     bit_depth=INTERNAL_PCM_FORMAT.bit_depth,
@@ -1577,5 +1593,24 @@ class StreamsController(CoreController):
         ):
             self.logger.debug("Skipping crossfade: sample rate mismatch")
             return False
-        # all checks passed, crossfade is enabled/allowed
+
         return True
+
+    async def _periodic_garbage_collection(self) -> None:
+        """Periodic garbage collection to free up memory from audio buffers and streams."""
+        self.logger.log(
+            VERBOSE_LOG_LEVEL,
+            "Running periodic garbage collection...",
+        )
+        # Run garbage collection in executor to avoid blocking the event loop
+        # Since this runs periodically (not in response to subprocess cleanup),
+        # it's safe to run in a thread without causing thread-safety issues
+        loop = asyncio.get_running_loop()
+        collected = await loop.run_in_executor(None, gc.collect)
+        self.logger.log(
+            VERBOSE_LOG_LEVEL,
+            "Garbage collection completed, collected %d objects",
+            collected,
+        )
+        # Schedule next run in 15 minutes
+        self.mass.call_later(900, self._periodic_garbage_collection)
index 93f58b64c212c54f582c0e2b09d4967af596a76d..f236713172da265be56c9c60759ff4c12d773740 100644 (file)
@@ -3,7 +3,6 @@
 from __future__ import annotations
 
 import asyncio
-import gc
 import logging
 import time
 from collections import deque
@@ -72,12 +71,6 @@ class AudioBuffer:
             return True
         return self._producer_task is not None and self._producer_task.cancelled()
 
-    @staticmethod
-    def _cleanup_chunks(chunks: deque[bytes]) -> None:
-        """Clear chunks and run garbage collection (runs in executor)."""
-        chunks.clear()
-        gc.collect()
-
     @property
     def chunk_size_bytes(self) -> int:
         """Return the size in bytes of one second of PCM audio."""
@@ -254,11 +247,13 @@ class AudioBuffer:
             chunk_count,
             self._producer_task is not None,
         )
-        # Cancel producer task if present
+        # Cancel producer task if present and wait for it to finish
+        # This ensures any subprocess cleanup happens on the main event loop
         if self._producer_task and not self._producer_task.done():
             self._producer_task.cancel()
             with suppress(asyncio.CancelledError):
                 await self._producer_task
+
         # Cancel inactivity task if present
         if cancel_inactivity_task and self._inactivity_task and not self._inactivity_task.done():
             self._inactivity_task.cancel()
@@ -268,7 +263,6 @@ class AudioBuffer:
         async with self._lock:
             # Replace the deque instead of clearing it to avoid blocking
             # Clearing a large deque can take >100ms
-            old_chunks = self._chunks
             self._chunks = deque()
             self._discarded_chunks = 0
             self._eof_received = False
@@ -278,11 +272,6 @@ class AudioBuffer:
             self._data_available.notify_all()
             self._space_available.notify_all()
 
-        # Clear the old deque and run garbage collection in background
-        # to avoid blocking the event loop
-        loop = asyncio.get_running_loop()
-        loop.run_in_executor(None, self._cleanup_chunks, old_chunks)
-
     async def set_eof(self) -> None:
         """Signal that no more data will be added to the buffer."""
         async with self._lock:
diff --git a/music_assistant/helpers/cached_property.py b/music_assistant/helpers/cached_property.py
new file mode 100644 (file)
index 0000000..861d3ba
--- /dev/null
@@ -0,0 +1,105 @@
+"""Helper utilities for cached properties with various expiration strategies."""
+
+from __future__ import annotations
+
+import time
+from typing import TYPE_CHECKING, Any
+
+if TYPE_CHECKING:
+    from collections.abc import Callable
+
+
+class TimedCachedProperty:
+    """
+    Cached property decorator with time-based expiration.
+
+    Similar to cached_property but the cached value expires after a specified duration.
+    The property value is recalculated when accessed after expiration.
+
+    The cached values are stored in the instance's `_cache` dictionary, which means:
+    - Calling `_cache.clear()` will clear all cached values and timestamps
+    - This integrates seamlessly with the Player class's `update_state()` method
+    - Both automatic (time-based) and manual cache clearing are supported
+
+    :param ttl: Time-to-live in seconds (default: 5 seconds)
+
+    Example:
+        >>> class MyClass:
+        ...     def __init__(self):
+        ...         self._cache = {}
+        ...
+        ...     # Usage with default TTL (5 seconds)
+        ...     @timed_cached_property
+        ...     def property1(self) -> str:
+        ...         return "computed value"
+        ...
+        ...     # Usage with custom TTL
+        ...     @timed_cached_property(ttl=10.0)
+        ...     def property2(self) -> str:
+        ...         return "computed value"
+    """
+
+    def __init__(self, ttl: float | Callable[..., Any] = 5.0) -> None:
+        """Initialize the timed cached property decorator."""
+        # Support both @timed_cached_property and @timed_cached_property()
+        if callable(ttl):
+            # Used without parentheses: @timed_cached_property
+            self.func: Callable[..., Any] | None = ttl
+            self.ttl: float = 5.0
+            self.attrname: str | None = None
+        else:
+            # Used with parentheses: @timed_cached_property() or @timed_cached_property(ttl=10)
+            self.func = None
+            self.ttl = ttl
+            self.attrname = None
+
+    def __set_name__(self, owner: type, name: str) -> None:
+        """Store the attribute name when the descriptor is assigned to a class attribute."""
+        self.attrname = name
+
+    def __call__(self, func: Callable[..., Any]) -> TimedCachedProperty:
+        """Allow the decorator to be used with or without arguments."""
+        # If func is already set, this is being used as @timed_cached_property
+        # without parentheses, so just return self
+        if self.func is not None:
+            return self
+
+        # Otherwise, this is being used as @timed_cached_property()
+        # with parentheses, so set the func and return self
+        self.func = func
+        self.attrname = func.__name__
+        return self
+
+    def __get__(self, instance: Any, owner: type | None = None) -> Any:
+        """Get the cached value or compute it if expired or not cached."""
+        if instance is None:
+            return self
+
+        # Use the instance's _cache dict to store values and timestamps
+        cache: dict[str, Any] = instance._cache
+        cache_key = self.attrname or (self.func.__name__ if self.func else "unknown")
+        timestamp_key = f"{cache_key}_timestamp"
+
+        # Check if we have a cached value and if it's still valid
+        current_time = time.time()
+        if cache_key in cache and timestamp_key in cache:
+            if current_time - cache[timestamp_key] < self.ttl:
+                # Cache is still valid
+                return cache[cache_key]
+
+        # Cache miss or expired - compute new value
+        if self.func is None:
+            msg = "Function is not set"
+            raise RuntimeError(msg)
+        value = self.func(instance)
+        cache[cache_key] = value
+        cache[timestamp_key] = current_time
+
+        return value
+
+
+# Convenience alias for backward compatibility with lowercase naming
+timed_cached_property = TimedCachedProperty
+
+
+__all__ = ["TimedCachedProperty", "timed_cached_property"]
index c6aa1135ab0e4d245a1c24d4d14510784f1caf78..a52a8de7aceda0e1d2a03aabd14adce8189f06a2 100644 (file)
@@ -75,6 +75,7 @@ from music_assistant.constants import (
     CONF_PRE_ANNOUNCE_CHIME_URL,
     CONF_VOLUME_CONTROL,
 )
+from music_assistant.helpers.cached_property import timed_cached_property
 from music_assistant.helpers.util import (
     get_changed_dataclass_values,
     validate_announcement_chime_url,
@@ -83,6 +84,7 @@ from music_assistant.helpers.util import (
 if TYPE_CHECKING:
     from .player_provider import PlayerProvider
 
+
 CONF_ENTRY_PRE_ANNOUNCE_CUSTOM_CHIME_URL = ConfigEntry(
     key=CONF_PRE_ANNOUNCE_CHIME_URL,
     type=ConfigEntryType.STRING,
@@ -672,7 +674,7 @@ class Player(ABC):
         elif self.group_members:
             await self.set_members(player_ids_to_remove=self.group_members)
 
-    @cached_property
+    @property
     def synced_to(self) -> str | None:
         """
         Return the id of the player this player is synced to (sync leader).
@@ -773,7 +775,7 @@ class Player(ABC):
             return control.power_state
         return None
 
-    @cached_property
+    @property
     @final
     def volume_level(self) -> int | None:
         """
@@ -793,7 +795,7 @@ class Player(ABC):
             return control.volume_level
         return None
 
-    @cached_property
+    @property
     @final
     def volume_muted(self) -> bool | None:
         """
@@ -813,7 +815,7 @@ class Player(ABC):
             return control.volume_muted
         return None
 
-    @property
+    @timed_cached_property
     @final
     def active_source(self) -> str | None:
         """
@@ -825,7 +827,7 @@ class Player(ABC):
         # if the player is grouped/synced, use the active source of the group/parent player
         if parent_player_id := (self.active_group or self.synced_to):
             if parent_player := self.mass.players.get(parent_player_id):
-                return parent_player.active_source
+                return parent_player._active_source
         for plugin_source in self.mass.players.get_plugin_sources():
             if plugin_source.in_use_by == self.player_id:
                 return plugin_source.id
@@ -910,7 +912,7 @@ class Player(ABC):
         active_groups = self.active_groups
         return active_groups[0] if active_groups else None
 
-    @cached_property
+    @timed_cached_property
     @final
     def current_media(self) -> PlayerMedia | None:
         """
@@ -922,7 +924,7 @@ class Player(ABC):
         # if the player is grouped/synced, use the current_media of the group/parent player
         if parent_player_id := (self.active_group or self.synced_to):
             if parent_player := self.mass.players.get(parent_player_id):
-                return parent_player.current_media
+                return cast("PlayerMedia | None", parent_player.current_media)
         # if a pluginsource is currently active, return those details
         if (
             self.active_source
@@ -1057,7 +1059,7 @@ class Player(ABC):
             return str(conf)
         return PLAYER_CONTROL_NONE
 
-    @cached_property
+    @property
     @final
     def group_volume(self) -> int:
         """
index 25160c1ec9ad64607d6f53a4302631a306098b69..00130ccf6d56d7f09521188e1eec8ba1c1834d54 100644 (file)
@@ -253,7 +253,7 @@ class BuiltinPlayer(Player):
             # We don't early exit here since playback would otherwise never start
             # on iOS devices with Home Assistant OS installations.
 
-        media = player.current_media
+        media = player._current_media
         if queue is None or media is None:
             raise web.HTTPNotFound(reason="No active queue or media found!")
 
index fab8ce5f17d6c9cdfe9d27a4b232babd08a4d964..b88bbdbc07cd7b55e11b7cb329a80692cc1b3817 100644 (file)
@@ -213,7 +213,7 @@ class MusicCastPlayer(Player):
 
         # UPDATE PLAYBACK INFORMATION
         # Note to self:
-        # player.current_media tells queue controller what is playing
+        # player._current_media tells queue controller what is playing
         # and player.set_current_media is the helper function
         # do not access the queue controller to gain playback information here
         if (