"request": "launch",
"module": "music_assistant",
"justMyCode": false,
- "args": ["--log-level", "info"],
+ "args": ["--log-level", "debug"],
"env": { "PYTHONDEVMODE": "1" }
},
{
library_item_id=library_item_id,
)
- async def add_item_to_library(self, item: str | MediaItemType) -> MediaItemType:
+ async def add_item_to_library(
+ self, item: str | MediaItemType, overwrite_existing: bool = False
+ ) -> MediaItemType:
"""Add item (uri or mediaitem) to the library."""
return cast(
- MediaItemType, await self.client.send_command("music/library/add_item", item=item)
+ MediaItemType,
+ await self.client.send_command(
+ "music/library/add_item", item=item, overwrite_existing=overwrite_existing
+ ),
)
async def refresh_item(
from __future__ import annotations
import contextlib
-from enum import EnumType, StrEnum
+from enum import EnumType, IntEnum, StrEnum
class MediaTypeMeta(EnumType):
ICY = "icy" # http stream with icy metadata
LOCAL_FILE = "local_file"
CUSTOM = "custom"
+
+
+class CacheCategory(IntEnum):
+ """Enum with predefined cache categories."""
+
+ DEFAULT = 0
+ MUSIC_SEARCH = 1
+ MUSIC_ALBUM_TRACKS = 2
+ MUSIC_ARTIST_TRACKS = 3
+ MUSIC_ARTIST_ALBUMS = 4
+ MUSIC_PLAYLIST_TRACKS = 5
+ MUSIC_PROVIDER_ITEM = 6
+ PLAYER_QUEUE_STATE = 7
+ MEDIA_INFO = 8
+ LIBRARY_ITEMS = 9
LOGGER = logging.getLogger(f"{MASS_LOGGER_NAME}.cache")
CONF_CLEAR_CACHE = "clear_cache"
-DB_SCHEMA_VERSION = 4
+DB_SCHEMA_VERSION = 5
class CacheController(CoreController):
"""Cleanup on exit."""
await self.database.close()
- async def get(self, cache_key: str, checksum: str | None = None, default=None):
+ async def get(
+ self,
+ key: str,
+ checksum: str | None = None,
+ default=None,
+ category: int = 0,
+ base_key: str = "",
+ ) -> Any:
"""Get object from cache and return the results.
cache_key: the (unique) name of the cache object as reference
checksum: optional argument to check if the checksum in the
cacheobject matches the checksum provided
+ category: optional category to group cache objects
+ base_key: optional base key to group cache objects
"""
- if not cache_key:
+ if not key:
return None
cur_time = int(time.time())
if checksum is not None and not isinstance(checksum, str):
checksum = str(checksum)
# try memory cache first
- cache_data = self._mem_cache.get(cache_key)
+ memory_key = f"{category}/{base_key}/{key}"
+ cache_data = self._mem_cache.get(memory_key)
if cache_data and (not checksum or cache_data[1] == checksum) and cache_data[2] >= cur_time:
return cache_data[0]
# fall back to db cache
- if (db_row := await self.database.get_row(DB_TABLE_CACHE, {"key": cache_key})) and (
- not checksum or db_row["checksum"] == checksum and db_row["expires"] >= cur_time
- ):
+ if (
+ db_row := await self.database.get_row(
+ DB_TABLE_CACHE, {"category": category, "base_key": base_key, "sub_key": key}
+ )
+ ) and (not checksum or db_row["checksum"] == checksum and db_row["expires"] >= cur_time):
try:
data = await asyncio.to_thread(json_loads, db_row["data"])
except Exception as exc: # pylint: disable=broad-except
LOGGER.error(
"Error parsing cache data for %s: %s",
- cache_key,
+ memory_key,
str(exc),
exc_info=exc if self.logger.isEnabledFor(10) else None,
)
else:
# also store in memory cache for faster access
- self._mem_cache[cache_key] = (
+ self._mem_cache[memory_key] = (
data,
db_row["checksum"],
db_row["expires"],
return data
return default
- async def set(self, cache_key, data, checksum="", expiration=(86400 * 30)) -> None:
+ async def set(
+ self, key, data, checksum="", expiration=(86400 * 30), category: int = 0, base_key: str = ""
+ ) -> None:
"""Set data in cache."""
- if not cache_key:
+ if not key:
return
if checksum is not None and not isinstance(checksum, str):
checksum = str(checksum)
expires = int(time.time() + expiration)
- self._mem_cache[cache_key] = (data, checksum, expires)
+ memory_key = f"{category}/{base_key}/{key}"
+ self._mem_cache[memory_key] = (data, checksum, expires)
if (expires - time.time()) < 3600 * 4:
# do not cache items in db with short expiration
return
data = await asyncio.to_thread(json_dumps, data)
await self.database.insert(
DB_TABLE_CACHE,
- {"key": cache_key, "expires": expires, "checksum": checksum, "data": data},
+ {
+ "category": category,
+ "base_key": base_key,
+ "sub_key": key,
+ "expires": expires,
+ "checksum": checksum,
+ "data": data,
+ },
allow_replace=True,
)
- async def delete(self, cache_key) -> None:
+ async def delete(
+ self, key: str | None, category: int | None = None, base_key: str | None = None
+ ) -> None:
"""Delete data from cache."""
- self._mem_cache.pop(cache_key, None)
- await self.database.delete(DB_TABLE_CACHE, {"key": cache_key})
-
- async def clear(self, key_filter: str | None = None) -> None:
+ match: dict[str, str | int] = {}
+ if key is not None:
+ match["sub_key"] = key
+ if category is not None:
+ match["category"] = category
+ if base_key is not None:
+ match["base_key"] = base_key
+ if key is not None and category is not None and base_key is not None:
+ self._mem_cache.pop(f"{category}/{base_key}/{key}", None)
+ else:
+ self._mem_cache.clear()
+ await self.database.delete(DB_TABLE_CACHE, match)
+
+ async def clear(
+ self,
+ key_filter: str | None = None,
+ category: int | None = None,
+ base_key_filter: str | None = None,
+ ) -> None:
"""Clear all/partial items from cache."""
- self._mem_cache = {}
+ self._mem_cache.clear()
self.logger.info("Clearing database...")
- query = f"key LIKE '%{key_filter}%' or data LIKE '%{key_filter}%'" if key_filter else None
+ query_parts: list[str] = []
+ if category is not None:
+ query_parts.append(f"category = {category}")
+ if base_key_filter is not None:
+ query_parts.append(f"base_key LIKE '%{base_key_filter}%'")
+ if key_filter is not None:
+ query_parts.append(f"sub_key LIKE '%{key_filter}%'")
+ query = "WHERE " + " AND ".join(query_parts) if query_parts else None
await self.database.delete(DB_TABLE_CACHE, query=query)
await self.database.vacuum()
self.logger.info("Clearing database DONE")
async def auto_cleanup(self) -> None:
"""Run scheduled auto cleanup task."""
self.logger.debug("Running automatic cleanup...")
- # for now we simply reset the memory cache
- self._mem_cache = {}
+ # simply reset the memory cache
+ self._mem_cache.clear()
cur_timestamp = int(time.time())
cleaned_records = 0
for db_row in await self.database.get_rows(DB_TABLE_CACHE):
DB_TABLE_SETTINGS,
{"key": "version", "value": str(DB_SCHEMA_VERSION), "type": "str"},
)
+ await self.__create_database_indexes()
+ # compact db
+ self.logger.debug("Compacting database...")
+ try:
+ await self.database.vacuum()
+ except Exception as err:
+ self.logger.warning("Database vacuum failed: %s", str(err))
+ else:
+ self.logger.debug("Compacting database done")
async def __create_database_tables(self) -> None:
"""Create database table(s)."""
)
await self.database.execute(
f"""CREATE TABLE IF NOT EXISTS {DB_TABLE_CACHE}(
- key TEXT UNIQUE NOT NULL, expires INTEGER NOT NULL,
- data TEXT, checksum TEXT NULL)"""
+ [id] INTEGER PRIMARY KEY AUTOINCREMENT,
+ [category] INTEGER NOT NULL DEFAULT 0,
+ [base_key] TEXT NOT NULL,
+ [sub_key] TEXT NOT NULL,
+ [expires] INTEGER NOT NULL,
+ [data] TEXT,
+ [checksum] TEXT NULL,
+ UNIQUE(category, base_key, sub_key)
+ )"""
)
- # create indexes
+ await self.database.commit()
+
+ async def __create_database_indexes(self) -> None:
+ """Create database indexes."""
await self.database.execute(
- f"CREATE UNIQUE INDEX IF NOT EXISTS {DB_TABLE_CACHE}_key_idx on {DB_TABLE_CACHE}(key);"
+ f"CREATE INDEX IF NOT EXISTS {DB_TABLE_CACHE}_category_idx "
+ f"ON {DB_TABLE_CACHE}(category);"
+ )
+ await self.database.execute(
+ f"CREATE INDEX IF NOT EXISTS {DB_TABLE_CACHE}_base_key_idx "
+ f"ON {DB_TABLE_CACHE}(base_key);"
+ )
+ await self.database.execute(
+ f"CREATE INDEX IF NOT EXISTS {DB_TABLE_CACHE}_sub_key_idx "
+ f"ON {DB_TABLE_CACHE}(sub_key);"
+ )
+ await self.database.execute(
+ f"CREATE INDEX IF NOT EXISTS {DB_TABLE_CACHE}_category_base_key_idx "
+ f"ON {DB_TABLE_CACHE}(category,base_key);"
+ )
+ await self.database.execute(
+ f"CREATE INDEX IF NOT EXISTS {DB_TABLE_CACHE}_category_base_key_sub_key_idx "
+ f"ON {DB_TABLE_CACHE}(category,base_key,sub_key);"
)
await self.database.commit()
def use_cache(
expiration: int = 86400 * 30,
+ category: int = 0,
) -> Callable[[Callable[Param, RetType]], Callable[Param, RetType]]:
"""Return decorator that can be used to cache a method's result."""
async def wrapped(*args: Param.args, **kwargs: Param.kwargs):
method_class = args[0]
method_class_name = method_class.__class__.__name__
- cache_key_parts = [method_class_name, func.__name__]
+ cache_base_key = f"{method_class_name}.{func.__name__}"
+ cache_sub_key_parts = []
skip_cache = kwargs.pop("skip_cache", False)
cache_checksum = kwargs.pop("cache_checksum", "")
if len(args) > 1:
- cache_key_parts += args[1:]
+ cache_sub_key_parts += args[1:]
for key in sorted(kwargs.keys()):
- cache_key_parts.append(f"{key}{kwargs[key]}")
- cache_key = ".".join(cache_key_parts)
+ cache_sub_key_parts.append(f"{key}{kwargs[key]}")
+ cache_sub_key = ".".join(cache_sub_key_parts)
- cachedata = await method_class.cache.get(cache_key, checksum=cache_checksum)
+ cachedata = await method_class.cache.get(
+ cache_sub_key, checksum=cache_checksum, category=category, base_key=cache_base_key
+ )
if not skip_cache and cachedata is not None:
return cachedata
result = await func(*args, **kwargs)
asyncio.create_task(
method_class.cache.set(
- cache_key, result, expiration=expiration, checksum=cache_checksum
+ cache_sub_key,
+ result,
+ expiration=expiration,
+ checksum=cache_checksum,
+ category=category,
+ base_key=cache_base_key,
)
)
return result
def __len__(self) -> int:
"""Return length."""
return len(self.d)
+
+ def clear(self) -> None:
+ """Clear cache."""
+ self.d.clear()
import contextlib
from collections.abc import Iterable
from random import choice, random
-from typing import TYPE_CHECKING
+from typing import TYPE_CHECKING, Any
from music_assistant.common.helpers.json import serialize_to_json
-from music_assistant.common.models.enums import ProviderFeature
+from music_assistant.common.models.enums import CacheCategory, ProviderFeature
from music_assistant.common.models.errors import (
InvalidDataError,
MediaNotFoundError,
Track,
UniqueList,
)
-from music_assistant.constants import (
- DB_TABLE_ALBUM_ARTISTS,
- DB_TABLE_ALBUM_TRACKS,
- DB_TABLE_ALBUMS,
- DB_TABLE_ARTISTS,
- DB_TABLE_PROVIDER_MAPPINGS,
-)
+from music_assistant.constants import DB_TABLE_ALBUM_ARTISTS, DB_TABLE_ALBUM_TRACKS, DB_TABLE_ALBUMS
from music_assistant.server.controllers.media.base import MediaControllerBase
from music_assistant.server.helpers.compare import (
compare_album,
def __init__(self, *args, **kwargs) -> None:
"""Initialize class."""
super().__init__(*args, **kwargs)
- self.base_query = f"""
- SELECT DISTINCT {self.db_table}.* FROM {self.db_table}
- LEFT JOIN {DB_TABLE_ALBUM_ARTISTS} on {DB_TABLE_ALBUM_ARTISTS}.album_id = {self.db_table}.item_id
- LEFT JOIN {DB_TABLE_ARTISTS} on {DB_TABLE_ARTISTS}.item_id = {DB_TABLE_ALBUM_ARTISTS}.artist_id
- LEFT JOIN {DB_TABLE_PROVIDER_MAPPINGS} ON
- {DB_TABLE_PROVIDER_MAPPINGS}.item_id = {self.db_table}.item_id AND media_type = '{self.media_type}'
- """ # noqa: E501
+ self.base_query = """
+ SELECT
+ albums.*,
+ (SELECT JSON_GROUP_ARRAY(
+ json_object(
+ 'item_id', provider_mappings.provider_item_id,
+ 'provider_domain', provider_mappings.provider_domain,
+ 'provider_instance', provider_mappings.provider_instance,
+ 'available', provider_mappings.available,
+ 'audio_format', json(provider_mappings.audio_format),
+ 'url', provider_mappings.url,
+ 'details', provider_mappings.details
+ )) FROM provider_mappings WHERE provider_mappings.item_id = albums.item_id AND media_type = 'album') AS provider_mappings,
+ (SELECT JSON_GROUP_ARRAY(
+ json_object(
+ 'item_id', artists.item_id,
+ 'provider', 'library',
+ 'name', artists.name,
+ 'sort_name', artists.sort_name,
+ 'media_type', 'artist'
+ )) FROM artists JOIN album_artists on album_artists.album_id = albums.item_id WHERE artists.item_id = album_artists.artist_id) AS artists
+ FROM albums""" # noqa: E501
# register (extra) api handlers
api_base = self.api_base
self.mass.register_api_command(f"music/{api_base}/album_tracks", self.tracks)
item_id: str | int,
) -> list[Track]:
"""Return in-database album tracks for the given database album."""
- query = f"WHERE {DB_TABLE_ALBUM_TRACKS}.album_id = {item_id}"
- return await self.mass.music.tracks._get_library_items_by_query(extra_query=query)
+ subquery = f"SELECT track_id FROM {DB_TABLE_ALBUM_TRACKS} WHERE album_id = {item_id}"
+ query = f"WHERE tracks.item_id in ({subquery})"
+ return await self.mass.music.tracks._get_library_items_by_query(extra_query_parts=[query])
async def _add_library_item(self, item: Album) -> int:
"""Add a new record to the database."""
if prov is None:
return []
# prefer cache items (if any) - for streaming providers only
- cache_key = f"{prov.lookup_key}.albumtracks.{item_id}"
+ cache_category = CacheCategory.MUSIC_ALBUM_TRACKS
+ cache_base_key = prov.lookup_key
+ cache_key = item_id
if (
prov.is_streaming_provider
- and (cache := await self.mass.cache.get(cache_key)) is not None
+ and (
+ cache := await self.mass.cache.get(
+ cache_key, category=cache_category, base_key=cache_base_key
+ )
+ )
+ is not None
):
return [Track.from_dict(x) for x in cache]
# no items in cache - get listing from provider
items = await prov.get_album_tracks(item_id)
# store (serializable items) in cache
if prov.is_streaming_provider:
- self.mass.create_task(self.mass.cache.set(cache_key, [x.to_dict() for x in items]))
+ self.mass.create_task(
+ self.mass.cache.set(cache_key, [x.to_dict() for x in items]),
+ category=cache_category,
+ base_key=cache_base_key,
+ )
for item in items:
# if this is a complete track object, pre-cache it as
# that will save us an (expensive) lookup later
if item.image and item.artist_str and item.album and prov.domain != "builtin":
await self.mass.cache.set(
- f"provider_item.track.{prov.lookup_key}.{item_id}", item.to_dict()
+ f"track.{item_id}",
+ item.to_dict(),
+ category=CacheCategory.MUSIC_PROVIDER_ITEM,
+ base_key=prov.lookup_key,
)
return items
"album_id": db_id,
},
)
- artist_mappings: UniqueList[ItemMapping] = UniqueList()
for artist in artists:
- mapping = await self._set_album_artist(db_id, artist=artist, overwrite=overwrite)
- artist_mappings.append(mapping)
- # we (temporary?) duplicate the artist mappings in a separate column of the media
- # item's table, because the json_group_array query is superslow
- await self.mass.music.database.update(
- self.db_table,
- {"item_id": db_id},
- {"artists": serialize_to_json(artist_mappings)},
- )
+ await self._set_album_artist(db_id, artist=artist, overwrite=overwrite)
async def _set_album_artist(
self, db_id: int, artist: Artist | ItemMapping, overwrite: bool = False
db_album.name,
provider.name,
)
+
+ async def _get_library_items_by_query(
+ self,
+ favorite: bool | None = None,
+ search: str | None = None,
+ limit: int = 500,
+ offset: int = 0,
+ order_by: str | None = None,
+ provider: str | None = None,
+ extra_query_parts: list[str] | None = None,
+ extra_query_params: dict[str, Any] | None = None,
+ extra_join_parts: list[str] | None = None,
+ ) -> list[Album]:
+ """Fetch MediaItem records from database by building the query."""
+ extra_query_params = extra_query_params or {}
+ extra_query_parts: list[str] = extra_query_parts or []
+ extra_join_parts: list[str] = extra_join_parts or []
+ artist_table_joined = False
+ if order_by and "artist_name" in order_by:
+ # join artist table to allow sorting on artist name
+ extra_join_parts.append(
+ "JOIN album_artists ON album_artists.album_id = albums.item_id "
+ "JOIN artists ON artists.item_id = album_artists.artist_id "
+ )
+ artist_table_joined = True
+ if search and " - " in search:
+ # handle combined artist + title search
+ artist_str, title_str = search.split(" - ", 1)
+ search = None
+ extra_query_parts.append("albums.name LIKE :search_title")
+ extra_query_params["search_title"] = f"%{title_str}%"
+ # use join with artists table to filter on artist name
+ extra_join_parts.append(
+ "JOIN album_artists ON album_artists.album_id = albums.item_id "
+ "JOIN artists ON artists.item_id = album_artists.artist_id "
+ "AND artists.name LIKE :search_artist"
+ if not artist_table_joined
+ else "AND artists.name LIKE :search_artist"
+ )
+ artist_table_joined = True
+ extra_query_params["search_artist"] = f"%{artist_str}%"
+ result = await super()._get_library_items_by_query(
+ favorite=favorite,
+ search=search,
+ limit=limit,
+ offset=offset,
+ order_by=order_by,
+ provider=provider,
+ extra_query_parts=extra_query_parts,
+ extra_query_params=extra_query_params,
+ extra_join_parts=extra_join_parts,
+ )
+ if search and len(result) < 25 and not offset:
+ # append artist items to result
+ extra_join_parts.append(
+ "JOIN album_artists ON album_artists.album_id = albums.item_id "
+ "JOIN artists ON artists.item_id = album_artists.artist_id "
+ "AND artists.name LIKE :search_artist"
+ if not artist_table_joined
+ else "AND artists.name LIKE :search_artist"
+ )
+ extra_query_params["search_artist"] = f"%{search}%"
+ return result + await super()._get_library_items_by_query(
+ favorite=favorite,
+ search=None,
+ limit=limit,
+ order_by=order_by,
+ provider=provider,
+ extra_query_parts=extra_query_parts,
+ extra_query_params=extra_query_params,
+ extra_join_parts=extra_join_parts,
+ )
+ return result
from typing import TYPE_CHECKING, Any
from music_assistant.common.helpers.json import serialize_to_json
-from music_assistant.common.models.enums import ProviderFeature
+from music_assistant.common.models.enums import CacheCategory, ProviderFeature
from music_assistant.common.models.errors import (
MediaNotFoundError,
ProviderUnavailableError,
if prov is None:
return []
# prefer cache items (if any) - for streaming providers
- cache_key = f"{prov.lookup_key}.artist_toptracks.{item_id}"
+ cache_category = CacheCategory.MUSIC_ARTIST_TRACKS
+ cache_base_key = prov.lookup_key
+ cache_key = item_id
if (
prov.is_streaming_provider
- and (cache := await self.mass.cache.get(cache_key)) is not None
+ and (
+ cache := await self.mass.cache.get(
+ cache_key, category=cache_category, base_key=cache_base_key
+ )
+ )
+ is not None
):
return [Track.from_dict(x) for x in cache]
# no items in cache - get listing from provider
if ProviderFeature.ARTIST_TOPTRACKS in prov.supported_features:
items = await prov.get_artist_toptracks(item_id)
+ for item in items:
+ # if this is a complete track object, pre-cache it as
+ # that will save us an (expensive) lookup later
+ if item.image and item.artist_str and item.album and prov.domain != "builtin":
+ await self.mass.cache.set(
+ f"track.{item_id}",
+ item.to_dict(),
+ category=CacheCategory.MUSIC_PROVIDER_ITEM,
+ base_key=prov.lookup_key,
+ )
else:
# fallback implementation using the db
if db_artist := await self.mass.music.artists.get_library_item_by_prov_id(
item_id,
provider_instance_id_or_domain,
):
- query = (
- f"WHERE {DB_TABLE_TRACK_ARTISTS}.artist_id = :artist_id "
- "AND (provider_domain = :prov_id "
- "OR provider_instance = :prov_id)"
+ artist_id = db_artist.item_id
+ subquery = (
+ f"SELECT track_id FROM {DB_TABLE_TRACK_ARTISTS} WHERE artist_id = {artist_id}"
)
- query_params = {
- "artist_id": db_artist.item_id,
- "prov_id": provider_instance_id_or_domain,
- }
+ query = f"tracks.item_id in ({subquery})"
return await self.mass.music.tracks._get_library_items_by_query(
- extra_query=query, extra_query_params=query_params
+ extra_query_parts=[query], provider=provider_instance_id_or_domain
)
# store (serializable items) in cache
if prov.is_streaming_provider:
- self.mass.create_task(self.mass.cache.set(cache_key, [x.to_dict() for x in items]))
+ self.mass.create_task(
+ self.mass.cache.set(
+ cache_key,
+ [x.to_dict() for x in items],
+ category=cache_category,
+ base_key=cache_base_key,
+ )
+ )
return items
async def get_library_artist_tracks(
item_id: str | int,
) -> list[Track]:
"""Return all tracks for an artist in the library/db."""
- return await self.mass.music.tracks._get_library_items_by_query(
- extra_query=f"WHERE {DB_TABLE_TRACK_ARTISTS}.artist_id = {item_id}",
- )
+ subquery = f"SELECT track_id FROM {DB_TABLE_TRACK_ARTISTS} WHERE artist_id = {item_id}"
+ query = f"tracks.item_id in ({subquery})"
+ return await self.mass.music.tracks._get_library_items_by_query(extra_query_parts=[query])
async def get_provider_artist_albums(
self,
if prov is None:
return []
# prefer cache items (if any)
- cache_key = f"{prov.lookup_key}.artist_albums.{item_id}"
+ cache_category = CacheCategory.MUSIC_ARTIST_ALBUMS
+ cache_base_key = prov.lookup_key
+ cache_key = item_id
if (
prov.is_streaming_provider
- and (cache := await self.mass.cache.get(cache_key)) is not None
+ and (
+ cache := await self.mass.cache.get(
+ cache_key, category=cache_category, base_key=cache_base_key
+ )
+ )
+ is not None
):
return [Album.from_dict(x) for x in cache]
# no items in cache - get listing from provider
item_id,
provider_instance_id_or_domain,
):
- query = (
- f"WHERE {DB_TABLE_ALBUM_ARTISTS}.artist_id = :artist_id "
- "AND (provider_domain = :prov_id "
- "OR provider_instance = :prov_id)"
+ artist_id = db_artist.item_id
+ subquery = (
+ f"SELECT album_id FROM {DB_TABLE_ALBUM_ARTISTS} WHERE artist_id = {artist_id}"
)
- query_params = {
- "prov_id": provider_instance_id_or_domain,
- "artist_id": db_artist.item_id,
- }
+ query = f"albums.item_id in ({subquery})"
return await self.mass.music.albums._get_library_items_by_query(
- extra_query=query, extra_query_params=query_params
+ extra_query_parts=[query], provider=provider_instance_id_or_domain
)
# store (serializable items) in cache
if prov.is_streaming_provider:
- self.mass.create_task(self.mass.cache.set(cache_key, [x.to_dict() for x in items]))
+ self.mass.create_task(
+ self.mass.cache.set(
+ cache_key,
+ [x.to_dict() for x in items],
+ category=cache_category,
+ base_key=cache_base_key,
+ )
+ )
return items
async def get_library_artist_albums(
item_id: str | int,
) -> list[Album]:
"""Return all in-library albums for an artist."""
- query = f"WHERE {DB_TABLE_ALBUM_ARTISTS}.artist_id = {item_id}"
- return await self.mass.music.albums._get_library_items_by_query(extra_query=query)
+ subquery = f"SELECT album_id FROM {DB_TABLE_ALBUM_ARTISTS} WHERE artist_id = {item_id}"
+ query = f"albums.item_id in ({subquery})"
+ return await self.mass.music.albums._get_library_items_by_query(extra_query_parts=[query])
async def _add_library_item(self, item: Artist | ItemMapping) -> int:
"""Add a new item record to the database."""
from typing import TYPE_CHECKING, Any, Generic, TypeVar
from music_assistant.common.helpers.json import json_loads, serialize_to_json
-from music_assistant.common.models.enums import EventType, ExternalID, MediaType, ProviderFeature
+from music_assistant.common.models.enums import (
+ CacheCategory,
+ EventType,
+ ExternalID,
+ MediaType,
+ ProviderFeature,
+)
from music_assistant.common.models.errors import MediaNotFoundError, ProviderUnavailableError
from music_assistant.common.models.media_items import (
Album,
Track,
media_from_dict,
)
-from music_assistant.constants import (
- DB_TABLE_ARTISTS,
- DB_TABLE_PLAYLOG,
- DB_TABLE_PROVIDER_MAPPINGS,
- MASS_LOGGER_NAME,
-)
+from music_assistant.constants import DB_TABLE_PLAYLOG, DB_TABLE_PROVIDER_MAPPINGS, MASS_LOGGER_NAME
from music_assistant.server.helpers.compare import compare_media_item
if TYPE_CHECKING:
ItemCls = TypeVar("ItemCls", bound="MediaItemType")
-JSON_KEYS = ("artists", "album", "metadata", "provider_mappings", "external_ids")
+JSON_KEYS = ("artists", "album", "metadata", "provider_mappings", "external_ids", "albums")
SORT_KEYS = {
"name": "name COLLATE NOCASE ASC",
"year_desc": "year DESC",
"position": "position ASC",
"position_desc": "position DESC",
+ "artist_name": "artists.name COLLATE NOCASE ASC",
+ "artist_name_desc": "artists.name COLLATE NOCASE DESC",
"random": "RANDOM()",
- "random_play_count": "random(), play_count ASC",
- "random_fast": "play_count ASC", # this one is handled with a special query
+ "random_play_count": "RANDOM(), play_count ASC",
}
def __init__(self, mass: MusicAssistant) -> None:
"""Initialize class."""
self.mass = mass
- self.base_query = (
- f"SELECT DISTINCT {self.db_table}.* FROM {self.db_table} "
- f"LEFT JOIN {DB_TABLE_PROVIDER_MAPPINGS} ON "
- f"{DB_TABLE_PROVIDER_MAPPINGS}.item_id = {self.db_table}.item_id "
- f"AND media_type = '{self.media_type}'"
- )
+ self.base_query = f"""
+ SELECT
+ {self.db_table}.*,
+ (SELECT JSON_GROUP_ARRAY(
+ json_object(
+ 'item_id', provider_mappings.provider_item_id,
+ 'provider_domain', provider_mappings.provider_domain,
+ 'provider_instance', provider_mappings.provider_instance,
+ 'available', provider_mappings.available,
+ 'audio_format', json(provider_mappings.audio_format),
+ 'url', provider_mappings.url,
+ 'details', provider_mappings.details
+ )) FROM provider_mappings WHERE provider_mappings.item_id = {self.db_table}.item_id
+ AND provider_mappings.media_type = '{self.media_type.value}') AS provider_mappings
+ FROM {self.db_table} """ # noqa: E501
self.logger = logging.getLogger(f"{MASS_LOGGER_NAME}.music.{self.media_type.value}")
# register (base) api handlers
self.api_base = api_base = f"{self.media_type}s"
if compare_media_item(item, cur_item):
return cur_item.item_id
# search by (exact) name match
- query = f"WHERE {self.db_table}.name = :name OR {self.db_table}.sort_name = :sort_name"
+ query = f"{self.db_table}.name = :name OR {self.db_table}.sort_name = :sort_name"
query_params = {"name": item.name, "sort_name": item.sort_name}
async for db_item in self.iter_library_items(
extra_query=query, extra_query_params=query_params
extra_query_params: dict[str, Any] | None = None,
) -> list[ItemCls]:
"""Get in-database items."""
- # create special performant random query
- if order_by == "random_fast" and not extra_query:
- extra_query = (
- f"{self.db_table}.rowid > (ABS(RANDOM()) % "
- f"(SELECT max({self.db_table}.rowid) FROM {self.db_table}))"
- )
return await self._get_library_items_by_query(
favorite=favorite,
search=search,
offset=offset,
order_by=order_by,
provider=provider,
- extra_query=extra_query,
+ extra_query_parts=[extra_query] if extra_query else None,
extra_query_params=extra_query_params,
)
limit=limit,
offset=offset,
order_by=order_by,
- extra_query=extra_query,
+ extra_query_parts=[extra_query] if extra_query else None,
extra_query_params=extra_query_params,
)
for item in next_items:
return []
# prefer cache items (if any)
- cache_key = f"{prov.lookup_key}.search.{self.media_type.value}.{search_query}.{limit}"
- cache_key = cache_key.lower().replace(" ", "").strip()
- if (cache := await self.mass.cache.get(cache_key)) is not None:
+ cache_category = CacheCategory.MUSIC_SEARCH
+ cache_base_key = prov.lookup_key
+ cache_key = f"{search_query}.{limit}.{self.media_type.value}"
+ if (
+ cache := await self.mass.cache.get(
+ cache_key, category=cache_category, base_key=cache_base_key
+ )
+ ) is not None:
return [media_from_dict(x) for x in cache]
# no items in cache - get listing from provider
searchresult = await prov.search(
# store (serializable items) in cache
if prov.is_streaming_provider: # do not cache filesystem results
self.mass.create_task(
- self.mass.cache.set(cache_key, [x.to_dict() for x in items], expiration=86400 * 7)
+ self.mass.cache.set(
+ cache_key,
+ [x.to_dict() for x in items],
+ expiration=86400 * 7,
+ category=cache_category,
+ base_key=cache_base_key,
+ ),
)
return items
self, external_id: str, external_id_type: ExternalID | None = None
) -> ItemCls | None:
"""Get the library item for the given external id."""
- query = f"WHERE {self.db_table}.external_ids LIKE :external_id_str"
+ query = f"{self.db_table}.external_ids LIKE :external_id_str"
if external_id_type:
external_id_str = f'%"{external_id_type}","{external_id}"%'
else:
external_id_str = f'%"{external_id}"%'
for item in await self._get_library_items_by_query(
- extra_query=query, extra_query_params={"external_id_str": external_id_str}
+ extra_query_parts=[query], extra_query_params={"external_id_str": external_id_str}
):
return item
return None
assert provider_instance_id_or_domain != "library"
assert provider_domain != "library"
assert provider_instance != "library"
+ subquery_parts: list[str] = []
+ query_params: dict[str, Any] = {}
if provider_instance:
query_params = {"prov_id": provider_instance}
- query = "provider_mappings.provider_instance = :prov_id"
+ subquery_parts.append("provider_mappings.provider_instance = :prov_id")
elif provider_domain:
query_params = {"prov_id": provider_domain}
- query = "provider_mappings.provider_domain = :prov_id"
+ subquery_parts.append("provider_mappings.provider_domain = :prov_id")
else:
query_params = {"prov_id": provider_instance_id_or_domain}
- query = (
+ subquery_parts.append(
"(provider_mappings.provider_instance = :prov_id "
"OR provider_mappings.provider_domain = :prov_id)"
)
if provider_item_id:
- query += " AND provider_mappings.provider_item_id = :item_id"
+ subquery_parts.append("provider_mappings.provider_item_id = :item_id")
query_params["item_id"] = provider_item_id
+ subquery = f"SELECT item_id FROM provider_mappings WHERE {' AND '.join(subquery_parts)}"
+ query = f"WHERE {self.db_table}.item_id IN ({subquery})"
return await self._get_library_items_by_query(
- limit=limit, offset=offset, extra_query=query, extra_query_params=query_params
+ limit=limit, offset=offset, extra_query_parts=[query], extra_query_params=query_params
)
async def iter_library_items_by_prov_id(
return await self.get_library_item(item_id)
if not (provider := self.mass.get_provider(provider_instance_id_or_domain)):
raise ProviderUnavailableError(f"{provider_instance_id_or_domain} is not available")
- cache_key = f"provider_item.{self.media_type.value}.{provider.lookup_key}.{item_id}"
- if not force_refresh and (cache := await self.mass.cache.get(cache_key)):
+
+ cache_category = CacheCategory.MUSIC_PROVIDER_ITEM
+ cache_base_key = provider.lookup_key
+ cache_key = f"{self.media_type.value}.{item_id}"
+ if not force_refresh and (
+ cache := await self.mass.cache.get(
+ cache_key, category=cache_category, base_key=cache_base_key
+ )
+ ):
return self.item_cls.from_dict(cache)
if provider := self.mass.get_provider(provider_instance_id_or_domain):
with suppress(MediaNotFoundError):
if item := await provider.get_item(self.media_type, item_id):
- await self.mass.cache.set(cache_key, item.to_dict())
+ await self.mass.cache.set(
+ cache_key, item.to_dict(), category=cache_category, base_key=cache_base_key
+ )
return item
# if we reach this point all possibilities failed and the item could not be found.
# There is a possibility that the (streaming) provider changed the id of the item
offset: int = 0,
order_by: str | None = None,
provider: str | None = None,
- extra_query: str | None = None,
+ extra_query_parts: list[str] | None = None,
extra_query_params: dict[str, Any] | None = None,
+ extra_join_parts: list[str] | None = None,
) -> list[ItemCls]:
- """Fetch MediaItem records from database given a custom (WHERE) clause."""
+ """Fetch MediaItem records from database by building the query."""
sql_query = self.base_query
query_params = extra_query_params or {}
- query_parts: list[str] = []
- # handle basic search on name
+ query_parts: list[str] = extra_query_parts or []
+ join_parts: list[str] = extra_join_parts or []
+ # create special performant random query
+ if order_by and order_by.startswith("random"):
+ query_parts.append(
+ f"{self.db_table}.item_id in "
+ f"(SELECT item_id FROM {self.db_table} ORDER BY RANDOM() LIMIT {limit})"
+ )
+ # handle search
if search:
- # handle combined artist + title search
- if self.media_type in (MediaType.ALBUM, MediaType.TRACK) and " - " in search:
- artist_str, title_str = search.split(" - ", 1)
- query_parts.append(
- f"({self.db_table}.name LIKE :search_title "
- f"AND {DB_TABLE_ARTISTS}.name LIKE :search_artist)"
- )
- query_params["search_title"] = f"%{title_str}%"
- query_params["search_artist"] = f"%{artist_str}%"
- else:
- query_params["search"] = f"%{search}%"
- query_parts.append(f"{self.db_table}.name LIKE :search")
+ query_params["search"] = f"%{search}%"
+ query_parts.append(f"{self.db_table}.name LIKE :search")
# handle favorite filter
if favorite is not None:
query_parts.append(f"{self.db_table}.favorite = :favorite")
query_params["favorite"] = favorite
# handle provider filter
if provider:
- query_parts.append(f"{DB_TABLE_PROVIDER_MAPPINGS}.provider_instance = :provider")
- query_params["provider"] = provider
- # handle extra/custom query
- if extra_query:
- # prevent duplicate where statement
- if extra_query.lower().startswith("where "):
- extra_query = extra_query[5:]
- query_parts.append(extra_query)
- # concetenate all where queries
+ join_parts.append(
+ f"JOIN provider_mappings ON provider_mappings.item_id = {self.db_table}.item_id "
+ f"AND provider_mappings.media_type = '{self.media_type.value}' "
+ f"AND (provider_mappings.provider_instance = '{provider}' "
+ f"OR provider_mappings.provider_domain = '{provider}')"
+ )
+ # prevent duplicate where statement
+ query_parts = [x[5:] if x.lower().startswith("where ") else x for x in query_parts]
+ # concetenate all join and/or where queries
+ if join_parts:
+ sql_query += f' {" ".join(join_parts)} '
if query_parts:
sql_query += " WHERE " + " AND ".join(query_parts)
# build final query
"details": provider_mapping.details,
},
)
- provider_mappings = {x for x in provider_mappings if x.provider_instance is not None}
- # we (temporary?) duplicate the provider mappings in a separate column of the media
- # item's table, because the json_group_array query is superslow
- await self.mass.music.database.update(
- self.db_table,
- {"item_id": db_id},
- {"provider_mappings": serialize_to_json(provider_mappings)},
- )
@staticmethod
def _parse_db_row(db_row: Mapping) -> dict[str, Any]:
continue
db_row_dict[key] = json_loads(raw_value)
+ # copy albums --> album
+ if albums := db_row_dict.get("albums"):
+ db_row_dict["album"] = albums[0]
+ db_row_dict["disc_number"] = albums[0]["disc_number"]
+ db_row_dict["track_number"] = albums[0]["disc_number"]
+
# copy album image to itemmapping single image
if (album := db_row_dict.get("album")) and (images := album.get("images")):
db_row_dict["album"]["image"] = next((x for x in images if x["type"] == "thumb"), None)
from music_assistant.common.helpers.json import serialize_to_json
from music_assistant.common.helpers.uri import create_uri, parse_uri
-from music_assistant.common.models.enums import MediaType, ProviderFeature, ProviderType
+from music_assistant.common.models.enums import (
+ CacheCategory,
+ MediaType,
+ ProviderFeature,
+ ProviderType,
+)
from music_assistant.common.models.errors import (
InvalidDataError,
MediaNotFoundError,
"favorite": item.favorite,
"metadata": serialize_to_json(item.metadata),
"external_ids": serialize_to_json(item.external_ids),
+ "cache_checksum": item.cache_checksum,
},
)
db_id = new_item["item_id"]
"external_ids": serialize_to_json(
update.external_ids if overwrite else cur_item.external_ids
),
+ "cache_checksum": update.cache_checksum
+ if overwrite
+ else update.cache_checksum or cur_item.cache_checksum,
},
)
# update/set provider_mappings table
if not provider:
return []
# prefer cache items (if any)
- cache_key = f"{provider.lookup_key}.playlist.{item_id}.tracks.{page}"
+ cache_category = CacheCategory.MUSIC_PLAYLIST_TRACKS
+ cache_base_key = provider.lookup_key
+ cache_key = f"{item_id}.{page}"
if (
not force_refresh
- and (cache := await self.mass.cache.get(cache_key, checksum=cache_checksum)) is not None
+ and (
+ cache := await self.mass.cache.get(
+ cache_key,
+ checksum=cache_checksum,
+ category=cache_category,
+ base_key=cache_base_key,
+ )
+ )
+ is not None
):
return [PlaylistTrack.from_dict(x) for x in cache]
# no items in cache (or force_refresh) - get listing from provider
- result: list[Track] = []
- for item in await provider.get_playlist_tracks(item_id, page=page):
- # double check if position set
- assert item.position is not None, "Playlist items require position to be set"
- result.append(item)
+ items = await provider.get_playlist_tracks(item_id, page=page)
+ # store (serializable items) in cache
+ self.mass.create_task(
+ self.mass.cache.set(
+ cache_key,
+ [x.to_dict() for x in items],
+ checksum=cache_checksum,
+ category=cache_category,
+ base_key=cache_base_key,
+ )
+ )
+ for item in items:
# if this is a complete track object, pre-cache it as
# that will save us an (expensive) lookup later
if item.image and item.artist_str and item.album and provider.domain != "builtin":
await self.mass.cache.set(
- f"provider_item.track.{provider.lookup_key}.{item_id}", item.to_dict()
+ f"track.{item_id}",
+ item.to_dict(),
+ category=CacheCategory.MUSIC_PROVIDER_ITEM,
+ base_key=provider.lookup_key,
)
- # store (serializable items) in cache
- self.mass.create_task(
- self.mass.cache.set(cache_key, [x.to_dict() for x in result], checksum=cache_checksum)
- )
- return result
+ return items
async def _get_provider_dynamic_tracks(
self,
import urllib.parse
from collections.abc import Iterable
from contextlib import suppress
+from typing import Any
from music_assistant.common.helpers.json import serialize_to_json
from music_assistant.common.models.enums import MediaType, ProviderFeature
from music_assistant.constants import (
DB_TABLE_ALBUM_TRACKS,
DB_TABLE_ALBUMS,
- DB_TABLE_ARTISTS,
- DB_TABLE_PROVIDER_MAPPINGS,
DB_TABLE_TRACK_ARTISTS,
DB_TABLE_TRACKS,
)
def __init__(self, *args, **kwargs) -> None:
"""Initialize class."""
super().__init__(*args, **kwargs)
- self.base_query = f"""
- SELECT DISTINCT
- {self.db_table}.*,
- CASE WHEN albums.item_id IS NULL THEN NULL ELSE
- json_object(
- 'item_id', {DB_TABLE_ALBUMS}.item_id,
+ self.base_query = """
+ SELECT
+ tracks.*,
+ (SELECT JSON_GROUP_ARRAY(
+ json_object(
+ 'item_id', provider_mappings.provider_item_id,
+ 'provider_domain', provider_mappings.provider_domain,
+ 'provider_instance', provider_mappings.provider_instance,
+ 'available', provider_mappings.available,
+ 'audio_format', json(provider_mappings.audio_format),
+ 'url', provider_mappings.url,
+ 'details', provider_mappings.details
+ )) FROM provider_mappings WHERE provider_mappings.item_id = tracks.item_id AND media_type = 'track') AS provider_mappings,
+
+ (SELECT JSON_GROUP_ARRAY(
+ json_object(
+ 'item_id', artists.item_id,
+ 'provider', 'library',
+ 'name', artists.name,
+ 'sort_name', artists.sort_name,
+ 'media_type', 'artist'
+ )) FROM artists JOIN track_artists on track_artists.track_id = tracks.item_id WHERE artists.item_id = track_artists.artist_id) AS artists,
+ (SELECT JSON_GROUP_ARRAY(
+ json_object(
+ 'item_id', albums.item_id,
'provider', 'library',
- 'name', {DB_TABLE_ALBUMS}.name,
- 'sort_name', {DB_TABLE_ALBUMS}.sort_name,
- 'version', {DB_TABLE_ALBUMS}.version,
- 'images', json_extract({DB_TABLE_ALBUMS}.metadata, '$.images'),
- 'media_type', 'album') END as album,
- CASE WHEN {DB_TABLE_ALBUM_TRACKS}.disc_number IS NULL THEN 0 ELSE {DB_TABLE_ALBUM_TRACKS}.disc_number END as disc_number,
- CASE WHEN {DB_TABLE_ALBUM_TRACKS}.track_number IS NULL THEN 0 ELSE {DB_TABLE_ALBUM_TRACKS}.track_number END as track_number
- FROM {self.db_table}
- LEFT JOIN {DB_TABLE_ALBUM_TRACKS} on {DB_TABLE_ALBUM_TRACKS}.track_id = {self.db_table}.item_id
- LEFT JOIN {DB_TABLE_ALBUMS} on {DB_TABLE_ALBUMS}.item_id = {DB_TABLE_ALBUM_TRACKS}.album_id
- LEFT JOIN {DB_TABLE_TRACK_ARTISTS} on {DB_TABLE_TRACK_ARTISTS}.track_id = {self.db_table}.item_id
- LEFT JOIN {DB_TABLE_ARTISTS} on {DB_TABLE_ARTISTS}.item_id = {DB_TABLE_TRACK_ARTISTS}.artist_id
- LEFT JOIN {DB_TABLE_PROVIDER_MAPPINGS} ON
- {DB_TABLE_PROVIDER_MAPPINGS}.item_id = {self.db_table}.item_id AND media_type = '{self.media_type}'
- """ # noqa: E501
+ 'name', albums.name,
+ 'sort_name', albums.sort_name,
+ 'media_type', 'album',
+ 'disc_number', album_tracks.disc_number,
+ 'track_number', album_tracks.track_number
+ )) FROM albums JOIN album_tracks on album_tracks.track_id = tracks.item_id WHERE albums.item_id = album_tracks.album_id) AS albums
+ FROM tracks""" # noqa: E501
# register (extra) api handlers
api_base = self.api_base
self.mass.register_api_command(f"music/{api_base}/track_versions", self.versions)
f"SELECT album_id FROM {DB_TABLE_ALBUM_TRACKS} "
f"WHERE {DB_TABLE_ALBUM_TRACKS}.track_id = {item_id}"
)
- query = f"WHERE {DB_TABLE_ALBUMS}.item_id in ({subquery})"
- return await self.mass.music.albums._get_library_items_by_query(extra_query=query)
+ query = f"{DB_TABLE_ALBUMS}.item_id in ({subquery})"
+ return await self.mass.music.albums._get_library_items_by_query(extra_query_parts=[query])
async def match_providers(self, db_track: Track) -> None:
"""Try to find matching track on all providers for the provided (database) track_id.
for artist in artists:
mapping = await self._set_track_artist(db_id, artist=artist, overwrite=overwrite)
artist_mappings.append(mapping)
- # we (temporary?) duplicate the artist mappings in a separate column of the media
- # item's table, because the json_group_array query is superslow
- await self.mass.music.database.update(
- self.db_table,
- {"item_id": db_id},
- {"artists": serialize_to_json(artist_mappings)},
- )
async def _set_track_artist(
self, db_id: int, artist: Artist | ItemMapping, overwrite: bool = False
},
)
return ItemMapping.from_item(db_artist)
+
+ async def _get_library_items_by_query(
+ self,
+ favorite: bool | None = None,
+ search: str | None = None,
+ limit: int = 500,
+ offset: int = 0,
+ order_by: str | None = None,
+ provider: str | None = None,
+ extra_query_parts: list[str] | None = None,
+ extra_query_params: dict[str, Any] | None = None,
+ extra_join_parts: list[str] | None = None,
+ ) -> list[Track]:
+ """Fetch MediaItem records from database by building the query."""
+ extra_query_params = extra_query_params or {}
+ extra_query_parts: list[str] = extra_query_parts or []
+ extra_join_parts: list[str] = extra_join_parts or []
+ if search and " - " in search:
+ # handle combined artist + title search
+ artist_str, title_str = search.split(" - ", 1)
+ search = None
+ extra_query_parts.append("tracks.name LIKE :search_title")
+ extra_query_params["search_title"] = f"%{title_str}%"
+ # use join with artists table to filter on artist name
+ extra_join_parts.append(
+ "JOIN track_artists ON track_artists.track_id = tracks.item_id "
+ "JOIN artists ON artists.item_id = track_artists.artist_id "
+ "AND artists.name LIKE :search_artist"
+ )
+ extra_query_params["search_artist"] = f"%{artist_str}%"
+ result = await super()._get_library_items_by_query(
+ favorite=favorite,
+ search=search,
+ limit=limit,
+ offset=offset,
+ order_by=order_by,
+ provider=provider,
+ extra_query_parts=extra_query_parts,
+ extra_query_params=extra_query_params,
+ extra_join_parts=extra_join_parts,
+ )
+ if search and len(result) < 25 and not offset:
+ # append artist items to result
+ extra_join_parts.append(
+ "JOIN track_artists ON track_artists.track_id = tracks.item_id "
+ "JOIN artists ON artists.item_id = track_artists.artist_id "
+ "AND artists.name LIKE :search_artist"
+ )
+ extra_query_params["search_artist"] = f"%{search}%"
+ return result + await super()._get_library_items_by_query(
+ favorite=favorite,
+ search=None,
+ limit=limit,
+ order_by=order_by,
+ provider=provider,
+ extra_query_parts=extra_query_parts,
+ extra_query_params=extra_query_params,
+ extra_join_parts=extra_join_parts,
+ )
+ return result
self._online_slots_available = MAX_ONLINE_CALLS_PER_RUN
timestamp = int(time() - 60 * 60 * 24 * 30)
query = (
- f"WHERE json_extract({DB_TABLE_ARTISTS}.metadata,'$.last_refresh') ISNULL "
+ f"json_extract({DB_TABLE_ARTISTS}.metadata,'$.last_refresh') ISNULL "
f"OR json_extract({DB_TABLE_ARTISTS}.metadata,'$.last_refresh') < {timestamp}"
)
for artist in await self.mass.music.artists.library_items(
await self._update_artist_metadata(artist)
query = (
- f"WHERE json_extract({DB_TABLE_ALBUMS}.metadata,'$.last_refresh') ISNULL "
+ f"json_extract({DB_TABLE_ALBUMS}.metadata,'$.last_refresh') ISNULL "
f"OR json_extract({DB_TABLE_ALBUMS}.metadata,'$.last_refresh') < {timestamp}"
)
for album in await self.mass.music.albums.library_items(
await self._update_album_metadata(album)
query = (
- f"WHERE json_extract({DB_TABLE_PLAYLISTS}.metadata,'$.last_refresh') ISNULL "
+ f"json_extract({DB_TABLE_PLAYLISTS}.metadata,'$.last_refresh') ISNULL "
f"OR json_extract({DB_TABLE_PLAYLISTS}.metadata,'$.last_refresh') < {timestamp}"
)
for playlist in await self.mass.music.playlists.library_items(
await self._update_playlist_metadata(playlist)
query = (
- f"WHERE json_extract({DB_TABLE_TRACKS}.metadata,'$.last_refresh') ISNULL "
+ f"json_extract({DB_TABLE_TRACKS}.metadata,'$.last_refresh') ISNULL "
f"OR json_extract({DB_TABLE_TRACKS}.metadata,'$.last_refresh') < {timestamp}"
)
for track in await self.mass.music.tracks.library_items(
from music_assistant.common.helpers.uri import parse_uri
from music_assistant.common.models.config_entries import ConfigEntry, ConfigValueType
from music_assistant.common.models.enums import (
+ CacheCategory,
ConfigEntryType,
EventType,
MediaType,
MusicAssistantError,
ProviderUnavailableError,
)
-from music_assistant.common.models.media_items import BrowseFolder, MediaItemType, SearchResults
+from music_assistant.common.models.media_items import (
+ BrowseFolder,
+ ItemMapping,
+ MediaItemType,
+ SearchResults,
+)
from music_assistant.common.models.provider import SyncTask
from music_assistant.common.models.streamdetails import LoudnessMeasurement
from music_assistant.constants import (
CONF_SYNC_INTERVAL = "sync_interval"
CONF_DELETED_PROVIDERS = "deleted_providers"
CONF_ADD_LIBRARY_ON_PLAY = "add_library_on_play"
-DB_SCHEMA_VERSION: Final[int] = 6
+DB_SCHEMA_VERSION: Final[int] = 7
class MusicController(CoreController):
# prefer cache items (if any)
media_types_str = ",".join(media_types)
- cache_key = f"{prov.instance_id}.search.{search_query}.{limit}.{media_types_str}"
- cache_key += "".join(x for x in media_types)
+ cache_category = CacheCategory.MUSIC_SEARCH
+ cache_base_key = prov.lookup_key
+ cache_key = f"{search_query}.{limit}.{media_types_str}"
- if prov.is_streaming_provider and (cache := await self.mass.cache.get(cache_key)):
+ if prov.is_streaming_provider and (
+ cache := await self.mass.cache.get(
+ cache_key, category=cache_category, base_key=cache_base_key
+ )
+ ):
return SearchResults.from_dict(cache)
# no items in cache - get listing from provider
result = await prov.search(
# store (serializable items) in cache
if prov.is_streaming_provider:
self.mass.create_task(
- self.mass.cache.set(cache_key, result.to_dict(), expiration=86400 * 7)
+ self.mass.cache.set(
+ cache_key,
+ result.to_dict(),
+ expiration=86400 * 7,
+ category=cache_category,
+ base_key=cache_base_key,
+ )
)
return result
await ctrl.remove_item_from_library(library_item_id)
@api_command("music/library/add_item")
- async def add_item_to_library(self, item: str | MediaItemType) -> MediaItemType:
+ async def add_item_to_library(
+ self, item: str | MediaItemType, overwrite_existing: bool = False
+ ) -> MediaItemType:
"""Add item (uri or mediaitem) to the library."""
if isinstance(item, str):
item = await self.get_item_by_uri(item)
+ if isinstance(item, ItemMapping):
+ item = await self.get_item(
+ item.media_type,
+ item.item_id,
+ item.provider,
+ )
+ # add to provider(s) library first
+ for prov_mapping in item.provider_mappings:
+ provider = self.mass.get_provider(prov_mapping.provider_instance)
+ if provider.library_edit_supported(item.media_type):
+ prov_item = item
+ prov_item.provider = prov_mapping.provider_instance
+ prov_item.item_id = prov_mapping.item_id
+ await provider.library_add(prov_item)
+ # add (or overwrite) to library
ctrl = self.get_controller(item.media_type)
- # add to provider's library first
- provider = self.mass.get_provider(item.provider)
- if provider.library_edit_supported(item.media_type):
- await provider.library_add(item)
- # ensure a full item
- item = await ctrl.get(item.item_id, item.provider)
- library_item = await ctrl.add_item_to_library(item)
+ library_item = await ctrl.add_item_to_library(item, overwrite_existing)
# perform full metadata scan (and provider match)
- await self.mass.metadata.update_metadata(library_item)
+ await self.mass.metadata.update_metadata(library_item, overwrite_existing)
return library_item
async def refresh_items(self, items: list[MediaItemType]) -> None:
await self.__create_database_tables()
return
+ if prev_version <= 6:
+ # remove redundant artists and provider_mappings columns
+ for table in (DB_TABLE_TRACKS, DB_TABLE_ALBUMS, DB_TABLE_ARTISTS, DB_TABLE_RADIOS):
+ for column in ("artists", "provider_mappings"):
+ try:
+ await self.database.execute(f"ALTER TABLE {table} DROP COLUMN {column}")
+ except Exception as err:
+ if "no such column" in str(err):
+ continue
+ raise
+ # add cache_checksum column to playlists
+ try:
+ await self.database.execute(
+ f"ALTER TABLE {DB_TABLE_PLAYLISTS} ADD COLUMN cache_checksum TEXT DEFAULT ''"
+ )
+ except Exception as err:
+ if "duplicate column" not in str(err):
+ raise
+
# save changes
await self.database.commit()
[play_count] INTEGER DEFAULT 0,
[last_played] INTEGER DEFAULT 0,
[timestamp_added] INTEGER DEFAULT (cast(strftime('%s','now') as int)),
- [timestamp_modified] INTEGER,
-
- [artists] json DEFAULT '[]',
- [provider_mappings] json DEFAULT '[]'
+ [timestamp_modified] INTEGER
);"""
)
await self.database.execute(
[play_count] INTEGER DEFAULT 0,
[last_played] INTEGER DEFAULT 0,
[timestamp_added] INTEGER DEFAULT (cast(strftime('%s','now') as int)),
- [timestamp_modified] INTEGER,
-
- [provider_mappings] json DEFAULT '[]'
+ [timestamp_modified] INTEGER
);"""
)
await self.database.execute(
[play_count] INTEGER DEFAULT 0,
[last_played] INTEGER DEFAULT 0,
[timestamp_added] INTEGER DEFAULT (cast(strftime('%s','now') as int)),
- [timestamp_modified] INTEGER,
-
- [artists] json DEFAULT '[]',
- [provider_mappings] json DEFAULT '[]'
+ [timestamp_modified] INTEGER
);"""
)
await self.database.execute(
[sort_name] TEXT NOT NULL,
[owner] TEXT NOT NULL,
[is_editable] BOOLEAN NOT NULL,
+ [cache_checksum] TEXT DEFAULT '',
[favorite] BOOLEAN DEFAULT 0,
[metadata] json NOT NULL,
[external_ids] json NOT NULL,
[play_count] INTEGER DEFAULT 0,
[last_played] INTEGER DEFAULT 0,
[timestamp_added] INTEGER DEFAULT (cast(strftime('%s','now') as int)),
- [timestamp_modified] INTEGER,
-
- [provider_mappings] json DEFAULT '[]'
+ [timestamp_modified] INTEGER
);"""
)
await self.database.execute(
[play_count] INTEGER DEFAULT 0,
[last_played] INTEGER DEFAULT 0,
[timestamp_added] INTEGER DEFAULT (cast(strftime('%s','now') as int)),
- [timestamp_modified] INTEGER,
-
- [provider_mappings] json DEFAULT '[]'
+ [timestamp_modified] INTEGER
);"""
)
await self.database.execute(
ConfigValueType,
)
from music_assistant.common.models.enums import (
+ CacheCategory,
ConfigEntryType,
EventType,
MediaType,
key=CONF_DEFAULT_ENQUEUE_OPTION_RADIO,
type=ConfigEntryType.STRING,
default_value=QueueOption.REPLACE.value,
- label="Default enqueue option for Track item(s).",
+ label="Default enqueue option for Radio item(s).",
options=enqueue_options,
description="Define the default enqueue action for this mediatype.",
),
# save items in cache
self.mass.create_task(
self.mass.cache.set(
- f"queue.items.{queue_id}",
+ "items",
[x.to_cache() for x in self._queue_items[queue_id]],
+ category=CacheCategory.PLAYER_QUEUE_STATE,
+ base_key=queue_id,
)
)
# save state
self.mass.create_task(
self.mass.cache.set(
- f"queue.state.{queue_id}",
+ "state",
queue.to_cache(),
+ category=CacheCategory.PLAYER_QUEUE_STATE,
+ base_key=queue_id,
)
)
- bool if the URL represents a ICY (radio) stream.
- bool uf the URL represents a HLS stream/playlist.
"""
- cache_key = f"RADIO_RESOLVED_{url}"
- if cache := await mass.cache.get(cache_key):
+ cache_base_key = "resolved_radio"
+ if cache := await mass.cache.get(url, base_key=cache_base_key):
return cache
is_hls = False
is_icy = False
result = (resolved_url, is_icy, is_hls)
cache_expiration = 3600 * 3
- await mass.cache.set(cache_key, result, expiration=cache_expiration)
+ await mass.cache.set(url, result, expiration=cache_expiration, base_key=cache_base_key)
return result
import itertools
import os
import random
+from base64 import b64decode
from collections.abc import Iterable
from io import BytesIO
from typing import TYPE_CHECKING
return await resp.read()
except ClientError as err:
raise FileNotFoundError from err
+ # handle base64 embedded images
+ if path_or_url.startswith("data:image"):
+ return b64decode(path_or_url.split(",")[-1])
# handle FILE location (of type image)
if path_or_url.endswith(("jpg", "JPG", "png", "PNG", "jpeg")):
if await asyncio.to_thread(os.path.isfile, path_or_url):
from collections.abc import Sequence
from typing import TYPE_CHECKING
-from music_assistant.common.models.enums import MediaType, ProviderFeature
+from music_assistant.common.models.enums import CacheCategory, MediaType, ProviderFeature
from music_assistant.common.models.errors import MediaNotFoundError, MusicAssistantError
from music_assistant.common.models.media_items import (
Album,
)
# process deletions (= no longer in library)
- cache_key = f"library_items.{media_type}.{self.instance_id}"
+ cache_category = CacheCategory.LIBRARY_ITEMS
+ cache_base_key = self.instance_id
+
prev_library_items: list[int] | None
- if prev_library_items := await self.mass.cache.get(cache_key):
+ if prev_library_items := await self.mass.cache.get(
+ media_type.value, category=cache_category, base_key=cache_base_key
+ ):
for db_id in prev_library_items:
if db_id not in cur_db_ids:
try:
# otherwise: just unmark favorite
await controller.set_favorite(db_id, False)
await asyncio.sleep(0) # yield to eventloop
- await self.mass.cache.set(cache_key, list(cur_db_ids))
+ await self.mass.cache.set(
+ media_type.value, list(cur_db_ids), category=cache_category, base_key=cache_base_key
+ )
# DO NOT OVERRIDE BELOW
mass_player.volume_level = volume_level
self.mass.players.update(player_id)
# store last state in cache
- await self.mass.cache.set(f"{CACHE_KEY_PREV_VOLUME}.{player_id}", volume_level)
+ await self.mass.cache.set(player_id, volume_level, base_key=CACHE_KEY_PREV_VOLUME)
async def cmd_sync(self, player_id: str, target_player: str) -> None:
"""Handle SYNC command for given player.
if not self.mass.config.get_raw_player_config_value(player_id, "enabled", True):
self.logger.debug("Ignoring %s in discovery as it is disabled.", display_name)
return
- if not (volume := await self.mass.cache.get(f"{CACHE_KEY_PREV_VOLUME}.{player_id}")):
+ if not (volume := await self.mass.cache.get(player_id, base_key=CACHE_KEY_PREV_VOLUME)):
volume = FALLBACK_VOLUME
mass_player = Player(
player_id=player_id,
self, license_url: str, key_id: str, uri: str, item_id: str
) -> str:
"""Get the decryption key for a song."""
- cache_key = f"{self.instance_id}.decryption_key.{key_id}"
- if decryption_key := await self.mass.cache.get(cache_key):
+ cache_key = f"decryption_key.{item_id}"
+ if decryption_key := await self.mass.cache.get(cache_key, base_key=self.instance_id):
self.logger.debug("Decryption key for %s found in cache.", item_id)
return decryption_key
pssh = self._get_pssh(key_id)
raise MediaNotFoundError("Unable to get decryption key for song %s.", item_id)
cdm.close(session_id)
decryption_key = key.key.hex()
- self.mass.create_task(self.mass.cache.set(cache_key, decryption_key, expiration=7200))
+ self.mass.create_task(
+ self.mass.cache.set(
+ cache_key, decryption_key, expiration=7200, base_key=self.instance_id
+ )
+ )
return decryption_key
def _get_pssh(self, key_id: bytes) -> PSSH:
from music_assistant.common.helpers.uri import parse_uri
from music_assistant.common.models.config_entries import ConfigEntry
from music_assistant.common.models.enums import (
+ CacheCategory,
ConfigEntryType,
ContentType,
ImageType,
async def _get_media_info(self, url: str, force_refresh: bool = False) -> AudioTags:
"""Retrieve mediainfo for url."""
+ cache_category = CacheCategory.MEDIA_INFO
+ cache_base_key = self.lookup_key
# do we have some cached info for this url ?
- cache_key = f"{self.instance_id}.media_info.{url}"
- cached_info = await self.mass.cache.get(cache_key)
+ cached_info = await self.mass.cache.get(
+ url, category=cache_category, base_key=cache_base_key
+ )
if cached_info and not force_refresh:
return AudioTags.parse(cached_info)
# parse info with ffprobe (and store in cache)
media_info = await parse_tags(url)
if "authSig" in url:
media_info.has_cover_image = False
- await self.mass.cache.set(cache_key, media_info.raw)
+ await self.mass.cache.set(
+ url, media_info.raw, category=cache_category, base_key=cache_base_key
+ )
return media_info
async def get_stream_details(self, item_id: str) -> StreamDetails:
async def _get_builtin_playlist_random_favorite_tracks(self) -> list[Track]:
result: list[Track] = []
res = await self.mass.music.tracks.library_items(
- favorite=True, limit=250000, order_by="random"
+ favorite=True, limit=250000, order_by="random_play_count"
)
for idx, item in enumerate(res, 1):
item.position = idx
async def _get_builtin_playlist_random_tracks(self) -> list[Track]:
result: list[Track] = []
- res = await self.mass.music.tracks.library_items(limit=500, order_by="random_fast")
+ res = await self.mass.music.tracks.library_items(limit=500, order_by="random_play_count")
for idx, item in enumerate(res, 1):
item.position = idx
result.append(item)
async def _get_builtin_playlist_random_album(self) -> list[Track]:
result: list[Track] = []
- for random_album in await self.mass.music.albums.library_items(
- limit=1, order_by="random_fast"
- ):
+ for random_album in await self.mass.music.albums.library_items(limit=1, order_by="random"):
tracks = await self.mass.music.albums.tracks(
random_album.item_id, random_album.provider
)
async def _get_builtin_playlist_random_artist(self) -> list[Track]:
result: list[Track] = []
for random_artist in await self.mass.music.artists.library_items(
- limit=1, order_by="random_fast"
+ limit=1, order_by="random"
):
tracks = await self.mass.music.artists.tracks(
random_artist.item_id, random_artist.provider
result = SearchResults()
# searching the filesystem is slow and unreliable,
# so instead we just query the db...
- query = "provider_mappings.provider_instance = :provider_instance "
- params = {
- "provider_instance": self.instance_id,
- }
if media_types is None or MediaType.TRACK in media_types:
result.tracks = await self.mass.music.tracks._get_library_items_by_query(
- search=search_query, extra_query=query, extra_query_params=params, limit=limit
+ search=search_query, provider=self.instance_id, limit=limit
)
if media_types is None or MediaType.ALBUM in media_types:
result.albums = await self.mass.music.albums._get_library_items_by_query(
- search=search_query, extra_query=query, extra_query_params=params, limit=limit
+ search=search_query,
+ provider=self.instance_id,
+ limit=limit,
)
if media_types is None or MediaType.ARTIST in media_types:
result.artists = await self.mass.music.artists._get_library_items_by_query(
- search=search_query, extra_query=query, extra_query_params=params, limit=limit
+ search=search_query,
+ provider=self.instance_id,
+ limit=limit,
)
if media_types is None or MediaType.PLAYLIST in media_types:
result.playlists = await self.mass.music.playlists._get_library_items_by_query(
- search=search_query, extra_query=query, extra_query_params=params, limit=limit
+ search=search_query,
+ provider=self.instance_id,
+ limit=limit,
)
return result
)
async def _get_or_create_artist_by_name(self, artist_name: str) -> Artist | ItemMapping:
- subquery = (
- "WHERE provider_mappings.media_type = 'artist' "
- "AND provider_mappings.provider_instance = :provider_instance"
- )
- query = (
- "WHERE artists.name LIKE :name AND artists.item_id in "
- f"(SELECT item_id FROM provider_mappings {subquery})"
- )
- query_params = {"name": artist_name, "provider_instance": self.instance_id}
if library_items := await self.mass.music.artists._get_library_items_by_query(
- extra_query=query, extra_query_params=query_params
+ search=artist_name, provider=self.instance_id
):
return ItemMapping.from_item(library_items[0])
await slimplayer.power(powered)
# store last state in cache
await self.mass.cache.set(
- f"{CACHE_KEY_PREV_STATE}.{player_id}", (powered, slimplayer.volume_level)
+ player_id, (powered, slimplayer.volume_level), base_key=CACHE_KEY_PREV_STATE
)
async def cmd_volume_set(self, player_id: str, volume_level: int) -> None:
await slimplayer.volume_set(volume_level)
# store last state in cache
await self.mass.cache.set(
- f"{CACHE_KEY_PREV_STATE}.{player_id}", (slimplayer.powered, volume_level)
+ player_id, (slimplayer.powered, volume_level), base_key=CACHE_KEY_PREV_STATE
)
async def cmd_volume_mute(self, player_id: str, muted: bool) -> None:
)
from music_assistant.common.models.enums import (
AlbumType,
+ CacheCategory,
ConfigEntryType,
ExternalID,
ImageType,
self, item_id: str, url: str, force_refresh: bool = False
) -> AudioTags:
"""Retrieve (cached) mediainfo for track."""
- cache_key = f"{self.instance_id}.media_info.{item_id}"
+ cache_category = CacheCategory.MEDIA_INFO
+ cache_base_key = self.lookup_key
# do we have some cached info for this url ?
- cached_info = await self.mass.cache.get(cache_key)
+ cached_info = await self.mass.cache.get(
+ item_id, category=cache_category, base_key=cache_base_key
+ )
if cached_info and not force_refresh:
media_info = AudioTags.parse(cached_info)
else:
# parse info with ffprobe (and store in cache)
media_info = await parse_tags(url)
- await self.mass.cache.set(cache_key, media_info.raw)
+ await self.mass.cache.set(
+ item_id, media_info.raw, category=cache_category, base_key=cache_base_key
+ )
return media_info
async def _get_stream_info(self, preset_id: str) -> list[dict]:
"""Get stream info for a radio station."""
- cache_key = f"tunein_stream_{preset_id}"
- if cache := await self.mass.cache.get(cache_key):
+ cache_base_key = "tunein_stream"
+ if cache := await self.mass.cache.get(preset_id, base_key=cache_base_key):
return cache
result = (await self.__get_data("Tune.ashx", id=preset_id))["body"]
- await self.mass.cache.set(cache_key, result)
+ await self.mass.cache.set(preset_id, result, base_key=cache_base_key)
return result
async def get_stream_details(self, item_id: str) -> StreamDetails: