remove cache from provider implementations and have the logic at global/abstract level
prov = self.mass.music.get_provider(provider_id or provider)
if not prov:
return []
- return await prov.get_album_tracks(item_id)
+ # prefer cache items (if any)
+ cache_key = f"{prov.type.value}.album_tracks.{item_id}"
+ if cache := await self.mass.cache.get(cache_key):
+ return [Track.from_dict(x) for x in cache]
+ # no items in cache - get listing from provider
+ items = await prov.get_album_tracks(item_id)
+ # store (serializable items) in cache
+ self.mass.create_task(
+ self.mass.cache.set(cache_key, [x.to_dict() for x in items])
+ )
+ return items
async def add_db_item(
self, item: Album, overwrite_existing: bool = False, db: Optional[Db] = None
self, item_id: str, provider_id: str
) -> List[Track]:
"""Return top tracks for an artist on given provider."""
- provider = self.mass.music.get_provider(provider_id)
- if not provider:
+ prov = self.mass.music.get_provider(provider_id)
+ if not prov:
return []
- return await provider.get_artist_toptracks(item_id)
+ # prefer cache items (if any)
+ cache_key = f"{prov.type.value}.artist_albums.{item_id}"
+ if cache := await self.mass.cache.get(cache_key):
+ return [Track.from_dict(x) for x in cache]
+ # no items in cache - get listing from provider
+ items = await prov.get_artist_toptracks(item_id)
+ # store (serializable items) in cache
+ self.mass.create_task(
+ self.mass.cache.set(cache_key, [x.to_dict() for x in items])
+ )
+ return items
async def get_provider_artist_albums(
self, item_id: str, provider_id: str
) -> List[Album]:
"""Return albums for an artist on given provider."""
- provider = self.mass.music.get_provider(provider_id)
- if not provider:
+ prov = self.mass.music.get_provider(provider_id)
+ if not prov:
return []
- return await provider.get_artist_albums(item_id)
+ # prefer cache items (if any)
+ cache_key = f"{prov.type.value}.artist_albums.{item_id}"
+ if cache := await self.mass.cache.get(cache_key):
+ return [Album.from_dict(x) for x in cache]
+ # no items in cache - get listing from provider
+ items = await prov.get_artist_albums(item_id)
+ # store (serializable items) in cache
+ self.mass.create_task(
+ self.mass.cache.set(cache_key, [x.to_dict() for x in items])
+ )
+ return items
async def add_db_item(
self, item: Artist, overwrite_existing: bool = False, db: Optional[Db] = None
provider_id: Optional[str] = None,
) -> List[Track]:
"""Return playlist tracks for the given provider playlist id."""
- if provider == ProviderType.DATABASE or provider_id == "database":
- playlist = await self.get_db_item(item_id)
- prov = next(x for x in playlist.provider_ids)
- item_id = prov.item_id
- provider_id = prov.prov_id
+ playlist = await self.get(item_id, provider, provider_id)
+ prov = next(x for x in playlist.provider_ids)
+ prov_playlist_id = prov.item_id
+ provider_id = prov.prov_id
provider = self.mass.music.get_provider(provider_id or provider)
if not provider:
return []
-
- return await provider.get_playlist_tracks(item_id)
+ # prefer cache for playlist tracks - use checksum from playlist
+ cache_key = f"{provider.value}.playlist_tracks.{prov_playlist_id}"
+ cache_checksum = playlist.metadata.checksum
+ if cache := await self.mass.cache.get(cache_key, cache_checksum):
+ return [Track.from_dict(x) for x in cache]
+ # no items in cache - get listing from provider
+ items = await provider.get_playlist_tracks(prov_playlist_id)
+ # store (serializable items) in cache
+ self.mass.create_task(
+ self.mass.cache.set(cache_key, [x.to_dict() for x in items], cache_checksum)
+ )
+ return items
async def add(self, item: Playlist) -> Playlist:
"""Add playlist to local db and return the new database item."""
playlist_path = await self.get_filepath(MediaType.PLAYLIST, prov_playlist_id)
if not await self.exists(playlist_path):
raise MediaNotFoundError(f"Playlist path does not exist: {playlist_path}")
- getmtime = wrap(os.path.getmtime)
- mtime = await getmtime(playlist_path)
- checksum = f"{SCHEMA_VERSION}.{int(mtime)}"
- cache_key = f"playlist_{self.id}_tracks_{prov_playlist_id}"
- if cache := await self.mass.cache.get(cache_key, checksum):
- return [Track.from_dict(x) for x in cache]
playlist_base_path = Path(playlist_path).parent
index = 0
try:
self.logger.warning(
"Error while parsing playlist %s", playlist_path, exc_info=err
)
- await self.mass.cache.set(cache_key, [x.to_dict() for x in result], checksum)
return result
async def _parse_playlist_line(self, line: str, playlist_path: str) -> Track | None:
)
)
playlist.owner = self._attr_name
+ getmtime = wrap(os.path.getmtime)
+ mtime = await getmtime(playlist_path)
+ checksum = f"{SCHEMA_VERSION}.{int(mtime)}"
+ playlist.metadata.checksum = checksum
return playlist
async def exists(self, file_path: str) -> bool:
app_var,
)
from music_assistant.helpers.audio import get_http_stream
-from music_assistant.helpers.cache import use_cache
from music_assistant.helpers.util import parse_title_and_version, try_parse_int
from music_assistant.models.enums import ProviderType
from music_assistant.models.errors import LoginFailed, MediaNotFoundError
async def get_library_artists(self) -> AsyncGenerator[Artist, None]:
"""Retrieve all library artists from Qobuz."""
endpoint = "favorite/getUserFavorites"
- for item in await self._get_all_items(
- endpoint, key="artists", type="artists", skip_cache=True
- ):
+ for item in await self._get_all_items(endpoint, key="artists", type="artists"):
if item and item["id"]:
yield await self._parse_artist(item)
async def get_library_albums(self) -> AsyncGenerator[Album, None]:
"""Retrieve all library albums from Qobuz."""
endpoint = "favorite/getUserFavorites"
- for item in await self._get_all_items(
- endpoint, key="albums", type="albums", skip_cache=True
- ):
+ for item in await self._get_all_items(endpoint, key="albums", type="albums"):
if item and item["id"]:
yield await self._parse_album(item)
async def get_library_tracks(self) -> AsyncGenerator[Track, None]:
"""Retrieve library tracks from Qobuz."""
endpoint = "favorite/getUserFavorites"
- for item in await self._get_all_items(
- endpoint, key="tracks", type="tracks", skip_cache=True
- ):
+ for item in await self._get_all_items(endpoint, key="tracks", type="tracks"):
if item and item["id"]:
yield await self._parse_track(item)
async def get_library_playlists(self) -> AsyncGenerator[Playlist, None]:
"""Retrieve all library playlists from the provider."""
endpoint = "playlist/getUserPlaylists"
- for item in await self._get_all_items(
- endpoint, key="playlists", skip_cache=True
- ):
+ for item in await self._get_all_items(endpoint, key="playlists"):
if item and item["id"]:
yield await self._parse_playlist(item)
async def get_playlist_tracks(self, prov_playlist_id) -> List[Track]:
"""Get all playlist tracks for given playlist id."""
- playlist = await self.get_playlist(prov_playlist_id)
endpoint = "playlist/get"
return [
await self._parse_track(item)
key="tracks",
playlist_id=prov_playlist_id,
extra="tracks",
- cache_checksum=playlist.metadata.checksum,
)
if (item and item["id"])
]
format_id=format_id,
track_id=item_id,
intent="stream",
- skip_cache=True,
)
if result and result.get("url"):
streamdata = result
self.mass.metadata.preferred_language = details["user"]["country_code"]
return details["user_auth_token"]
- @use_cache(3600 * 24)
async def _get_all_items(self, endpoint, key="tracks", **kwargs):
"""Get all items from a paged list."""
limit = 50
while True:
kwargs["limit"] = limit
kwargs["offset"] = offset
- result = await self._get_data(endpoint, skip_cache=True, **kwargs)
+ result = await self._get_data(endpoint, **kwargs)
offset += limit
if not result:
break
break
return all_items
- @use_cache(3600 * 2)
async def _get_data(self, endpoint, sign_request=False, **kwargs):
"""Get data from api."""
url = f"http://www.qobuz.com/api.json/0.2/{endpoint}"
from music_assistant.helpers.app_vars import ( # noqa # pylint: disable=no-name-in-module
app_var,
)
-from music_assistant.helpers.cache import use_cache
from music_assistant.helpers.process import AsyncProcess
from music_assistant.helpers.util import parse_title_and_version
from music_assistant.models.enums import ProviderType
endpoint = "me/following"
while True:
spotify_artists = await self._get_data(
- endpoint, type="artist", limit=50, skip_cache=True
+ endpoint,
+ type="artist",
+ limit=50,
)
for item in spotify_artists["artists"]["items"]:
if item and item["id"]:
async def get_library_albums(self) -> AsyncGenerator[Album, None]:
"""Retrieve library albums from the provider."""
- for item in await self._get_all_items("me/albums", skip_cache=True):
+ for item in await self._get_all_items("me/albums"):
if item["album"] and item["album"]["id"]:
yield await self._parse_album(item["album"])
async def get_library_tracks(self) -> AsyncGenerator[Track, None]:
"""Retrieve library tracks from the provider."""
- for item in await self._get_all_items("me/tracks", skip_cache=True):
+ for item in await self._get_all_items("me/tracks"):
if item and item["track"]["id"]:
yield await self._parse_track(item["track"])
async def get_library_playlists(self) -> AsyncGenerator[Playlist, None]:
"""Retrieve playlists from the provider."""
- for item in await self._get_all_items("me/playlists", skip_cache=True):
+ for item in await self._get_all_items("me/playlists"):
if item and item["id"]:
yield await self._parse_playlist(item)
async def get_playlist_tracks(self, prov_playlist_id) -> List[Track]:
"""Get all playlist tracks for given playlist id."""
- playlist = await self.get_playlist(prov_playlist_id)
return [
await self._parse_track(item["track"])
for item in await self._get_all_items(
f"playlists/{prov_playlist_id}/tracks",
- cache_checksum=playlist.metadata.checksum,
)
if (item and item["track"] and item["track"]["id"])
]
return tokeninfo
return None
- @use_cache(3600 * 24)
async def _get_all_items(self, endpoint, key="items", **kwargs) -> List[dict]:
"""Get all items from a paged list."""
limit = 50
while True:
kwargs["limit"] = limit
kwargs["offset"] = offset
- result = await self._get_data(endpoint, skip_cache=True, **kwargs)
+ result = await self._get_data(endpoint, **kwargs)
offset += limit
if not result or key not in result or not result[key]:
break
break
return all_items
- @use_cache(3600 * 2)
async def _get_data(self, endpoint, **kwargs):
"""Get data from api."""
url = f"https://api.spotify.com/v1/{endpoint}"
from asyncio_throttle import Throttler
from music_assistant.helpers.audio import get_radio_stream
-from music_assistant.helpers.cache import use_cache
from music_assistant.helpers.util import create_clean_string
from music_assistant.models.enums import ProviderType
from music_assistant.models.errors import LoginFailed, MediaNotFoundError
):
yield chunk
- @use_cache(3600 * 2)
async def __get_data(self, endpoint: str, **kwargs):
"""Get data from api."""
if endpoint.startswith("http"):