VARIOUS_ARTISTS_NAME,
VERBOSE_LOG_LEVEL,
)
-from music_assistant.controllers.cache import use_cache
from music_assistant.helpers.compare import compare_strings, create_safe_string
from music_assistant.helpers.json import json_loads
from music_assistant.helpers.playlists import parse_m3u, parse_pls
if any(x.provider_instance == self.instance_id for x in track.provider_mappings)
]
- @use_cache(3600) # Cache for 1 hour
async def get_playlist_tracks(self, prov_playlist_id: str, page: int = 0) -> list[Track]:
"""Get playlist tracks."""
result: list[Track] = []
msg = f"Playlist path does not exist: {prov_playlist_id}"
raise MediaNotFoundError(msg)
+ file_item = await self.resolve(prov_playlist_id)
+ # We are using the checksum of the playlist file here to invalidate the cache
+ # when a change has been made to the playlist file (ie track addition/deletion)
+ cache_checksum = file_item.checksum
+
+ cache_key = f"get_playlist_tracks.{prov_playlist_id}"
+ cached_data = await self.mass.cache.get(
+ cache_key,
+ provider=self.instance_id,
+ checksum=cache_checksum,
+ category=0,
+ )
+ if cached_data is not None:
+ if cached_data and isinstance(cached_data[0], dict):
+ return [Track.from_dict(track_dict) for track_dict in cached_data]
+ return cast("list[Track]", cached_data)
+
_, ext = prov_playlist_id.rsplit(".", 1)
try:
# get playlist file contents
str(err),
exc_info=err if self.logger.isEnabledFor(10) else None,
)
+
+ await self.mass.cache.set(
+ key=cache_key,
+ data=result,
+ expiration=3600 * 24, # Cache for 24 hours
+ provider=self.instance_id,
+ checksum=cache_checksum,
+ category=0,
+ )
+
return result
async def get_podcast_episodes(