async def get_artist_metadata(self, mb_artist_id: str, cur_metadata: dict) -> dict:
"""Get/update rich metadata for an artist by providing the musicbrainz artist id."""
metadata = cur_metadata
- if "fanart" in metadata:
+ if "fanart" not in metadata:
# no need to query (other) metadata providers if we already have a result
- return metadata
- self.logger.info(
- "Fetching metadata for MusicBrainz Artist %s on Fanrt.tv", mb_artist_id
- )
- cache_key = f"fanarttv.artist_metadata.{mb_artist_id}"
- res = await cached(
- self.cache, cache_key, self.fanarttv.get_artist_images, mb_artist_id
- )
- if res:
- metadata = merge_dict(metadata, res)
- self.logger.debug(
- "Found metadata for MusicBrainz Artist %s on Fanart.tv: %s",
- mb_artist_id,
- ", ".join(res.keys()),
+ metadata = merge_dict(
+ metadata, await self._get_fanarttv_metadata(mb_artist_id)
)
+
return metadata
async def get_thumbnail(self, url, size) -> bytes:
TABLE_THUMBS, {**match, "img": thumbnail}
)
return thumbnail
+
+ async def _get_fanarttv_metadata(self, mb_artist_id: str) -> dict:
+ """Get metadata from fanarttv for artist."""
+ metadata = {}
+ self.logger.info(
+ "Fetching metadata for MusicBrainz Artist %s on Fanrt.tv", mb_artist_id
+ )
+ cache_key = f"fanarttv.artist_metadata.{mb_artist_id}"
+ res = await cached(
+ self.cache, cache_key, self.fanarttv.get_artist_images, mb_artist_id
+ )
+ if res:
+ metadata = res
+ self.logger.debug(
+ "Found metadata for MusicBrainz Artist %s on Fanart.tv: %s",
+ mb_artist_id,
+ ", ".join(res.keys()),
+ )
+ return metadata
"media_type": media_type.value,
"prov_item_id": prov_id.item_id,
"provider": prov_id.provider,
- "quality": prov_id.quality.value,
+ "quality": prov_id.quality.value if prov_id.quality else None,
"details": prov_id.details,
+ "url": prov_id.url,
},
)
media_type TEXT NOT NULL,
prov_item_id TEXT NOT NULL,
provider TEXT NOT NULL,
- quality INTEGER NOT NULL,
+ quality INTEGER NULL,
details TEXT NULL,
+ url TEXT NULL,
UNIQUE(item_id, media_type, prov_item_id, provider)
);"""
)
# get streamdetails
try:
streamdetails = await get_stream_details(
- self.mass, queue_track, queue.queue_id, lazy=track_count == 1
+ self.mass, queue_track, queue.queue_id
)
except MediaNotFoundError as err:
self.logger.warning(
async def get_stream_details(
- mass: MusicAssistant, queue_item: QueueItem, queue_id: str = "", lazy: bool = True
+ mass: MusicAssistant, queue_item: QueueItem, queue_id: str = ""
) -> StreamDetails:
"""
Get streamdetails for the given QueueItem.
)
else:
# always request the full db track as there might be other qualities available
- full_item = await mass.music.get_item_by_uri(
- queue_item.uri, force_refresh=not lazy, lazy=lazy
- )
+ full_item = await mass.music.get_item_by_uri(queue_item.uri)
# sort by quality and check track availability
for prov_media in sorted(
full_item.provider_ids, key=lambda x: x.quality, reverse=True
data = await asyncio.get_running_loop().run_in_executor(
None, pickle.loads, db_row["data"]
)
- except Exception: # pylint: disable=broad-except
- self.logger.warning("Error parsing cache data for %s", cache_key)
+ except Exception as exc: # pylint: disable=broad-except
+ self.logger.exception(
+ "Error parsing cache data for %s", cache_key, exc_info=exc
+ )
else:
# also store in memory cache for faster access
if cache_key not in self._mem_cache:
db_row["expires"],
)
return data
- self.logger.debug("no cache data for %s", cache_key)
return default
async def set(self, cache_key, data, checksum="", expiration=(86400 * 30)):
# album match OR near exact duration match
if (
compare_album(left_track.album, right_track.album)
- and abs(left_track.duration - right_track.duration) < 5
- ) or abs(left_track.duration - right_track.duration) < 1:
+ and left_track.duration == right_track.duration
+ ) or abs(left_track.duration - right_track.duration) <= 2:
# 100% match, all criteria passed
return True
return False
# pylint: disable=invalid-name
+SCHEMA_VERSION = 1
+
class Database:
"""Class that holds the (logic to the) database."""
self.mass = mass
self.logger = mass.logger.getChild("db")
+ async def setup(self) -> None:
+ """Perform async initialization."""
+ async with self.get_db() as _db:
+ await _db.execute(
+ """CREATE TABLE IF NOT EXISTS settings(
+ key TEXT PRIMARY KEY,
+ value TEXT
+ );"""
+ )
+ await self._migrate()
+
@asynccontextmanager
async def get_db(self, db: Optional[Db] = None) -> Db:
"""Context manager helper to get the active db connection."""
sql_query = f"DELETE FROM {table}"
sql_query += " WHERE " + " AND ".join((f"{x} = :{x}" for x in match))
await _db.execute(sql_query)
+
+ async def _migrate(self):
+ """Perform database migration actions if needed."""
+ prev_version = await self.get_row("settings", {"key": "version"})
+ if prev_version:
+ prev_version = int(prev_version["value"])
+ else:
+ prev_version = 0
+ if SCHEMA_VERSION != prev_version:
+ self.logger.info(
+ "Performing database migration from %s to %s",
+ prev_version,
+ SCHEMA_VERSION,
+ )
+
+ # schema version 1: too many breaking changes, simply drop the media tables for now
+ async with self.get_db() as _db:
+ await _db.execute("DROP TABLE IF EXISTS artists")
+ await _db.execute("DROP TABLE IF EXISTS albums")
+ await _db.execute("DROP TABLE IF EXISTS tracks")
+ await _db.execute("DROP TABLE IF EXISTS playlists")
+ await _db.execute("DROP TABLE IF EXISTS radios")
+ await _db.execute("DROP TABLE IF EXISTS playlist_tracks")
+ await _db.execute("DROP TABLE IF EXISTS album_tracks")
+ await _db.execute("DROP TABLE IF EXISTS provider_mappings")
+ await _db.execute("DROP TABLE IF EXISTS cache")
+
+ # store current schema version
+ await self.insert_or_replace(
+ "settings", {"key": "version", "value": str(SCHEMA_VERSION)}
+ )
connector=aiohttp.TCPConnector(ssl=False),
)
# setup core controllers
+ await self.database.setup()
await self.cache.setup()
await self.music.setup()
await self.metadata.setup()
from __future__ import annotations
from abc import ABCMeta, abstractmethod
+from time import time
from typing import Generic, List, Optional, Tuple, TypeVar
from music_assistant.helpers.cache import cached
ItemCls = TypeVar("ItemCls", bound="MediaControllerBase")
+REFRESH_INTERVAL = 60 * 60 * 24 * 30
+REFRESH_KEY = "last_refresh"
+
class MediaControllerBase(Generic[ItemCls], metaclass=ABCMeta):
"""Base model for controller managing a MediaType."""
) -> ItemCls:
"""Return (full) details for a single media item."""
db_item = await self.get_db_item_by_prov_id(provider_id, provider_item_id)
+ if (
+ db_item
+ and (time() - db_item.metadata.get(REFRESH_KEY, 0)) > REFRESH_INTERVAL
+ ):
+ force_refresh = True
if db_item and force_refresh:
provider_id, provider_item_id = await self.get_provider_id(db_item)
elif db_item:
return db_item
if not details:
details = await self.get_provider_item(provider_item_id, provider_id)
+ details.metadata[REFRESH_KEY] = int(time())
if not lazy:
return await self.add(details)
self.mass.add_job(self.add(details), f"Add {details.uri} to database")
from music_assistant.helpers.json import json
from music_assistant.helpers.util import create_sort_name
+MetadataTypes = Union[int, bool, str, List[str]]
+
class MediaType(Enum):
"""Enum for MediaType."""
RADIO = "radio"
UNKNOWN = "unknown"
- @classmethod
- def _missing_(cls: "MediaType", value: str):
- """Set default enum member if an unknown value is provided."""
- return cls.UNKNOWN
-
class MediaQuality(IntEnum):
"""Enum for Media Quality."""
- LOSSY_MP3 = 0
- LOSSY_OGG = 1
- LOSSY_AAC = 2
- FLAC_LOSSLESS = 6 # 44.1/48khz 16 bits
- FLAC_LOSSLESS_HI_RES_1 = 7 # 44.1/48khz 24 bits HI-RES
- FLAC_LOSSLESS_HI_RES_2 = 8 # 88.2/96khz 24 bits HI-RES
- FLAC_LOSSLESS_HI_RES_3 = 9 # 176/192khz 24 bits HI-RES
- FLAC_LOSSLESS_HI_RES_4 = 10 # above 192khz 24 bits HI-RES
- UNKNOWN = 99
-
- @classmethod
- def _missing_(cls: "MediaQuality", value: str):
- """Set default enum member if an unknown value is provided."""
- return cls.UNKNOWN
+ UNKNOWN = 0
+ LOSSY_MP3 = 1
+ LOSSY_OGG = 2
+ LOSSY_AAC = 3
+ FLAC_LOSSLESS = 10 # 44.1/48khz 16 bits
+ FLAC_LOSSLESS_HI_RES_1 = 20 # 44.1/48khz 24 bits HI-RES
+ FLAC_LOSSLESS_HI_RES_2 = 21 # 88.2/96khz 24 bits HI-RES
+ FLAC_LOSSLESS_HI_RES_3 = 22 # 176/192khz 24 bits HI-RES
+ FLAC_LOSSLESS_HI_RES_4 = 23 # above 192khz 24 bits HI-RES
@dataclass
provider: str
item_id: str
- quality: MediaQuality = MediaQuality.UNKNOWN
- details: str = None
available: bool = True
+ quality: Optional[MediaQuality] = None
+ details: Optional[str] = None
+ url: Optional[str] = None
def __hash__(self):
"""Return custom hash."""
provider: str
name: str
sort_name: Optional[str] = None
- metadata: Dict[str, Any] = field(default_factory=dict)
+ metadata: Dict[str, MetadataTypes] = field(default_factory=dict)
provider_ids: List[MediaItemProviderId] = field(default_factory=list)
in_library: bool = False
media_type: MediaType = MediaType.UNKNOWN
self.uri = create_uri(self.media_type, self.provider, self.item_id)
if not self.sort_name:
self.sort_name = create_sort_name(self.name)
- if not self.provider_ids:
- self.provider_ids.append(
- MediaItemProviderId(provider=self.provider, item_id=self.item_id)
- )
@classmethod
def from_db_row(cls, db_row: Mapping):
:param limit: Number of items to return in the search (per type).
"""
result = []
- for track in await self.get_library_tracks():
+ for track in await self.get_library_tracks(True):
for search_part in search_query.split(" - "):
if media_types is None or MediaType.TRACK in media_types:
if compare_strings(track.name, search_part):
result.append(track.album.artist)
return result
- async def get_library_artists(self) -> List[Artist]:
+ async def get_library_artists(self, allow_cache=False) -> List[Artist]:
"""Retrieve all library artists."""
+ # pylint: disable = arguments-differ
+ cache_key = f"{self.id}.library_artists"
+ if allow_cache:
+ if cache_result := await self.mass.cache.get(cache_key):
+ return cache_result
result = []
prev_ids = set()
- for track in await self.get_library_tracks():
+ for track in await self.get_library_tracks(allow_cache):
if track.album is not None and track.album.artist is not None:
if track.album.artist.item_id not in prev_ids:
result.append(track.album.artist)
prev_ids.add(track.album.artist.item_id)
+ await self.mass.cache.set(cache_key, result)
return result
- async def get_library_albums(self) -> List[Album]:
+ async def get_library_albums(self, allow_cache=False) -> List[Album]:
"""Get album folders recursively."""
+ # pylint: disable = arguments-differ
+ cache_key = f"{self.id}.library_albums"
+ if allow_cache:
+ if cache_result := await self.mass.cache.get(cache_key):
+ return cache_result
result = []
prev_ids = set()
- for track in await self.get_library_tracks():
+ for track in await self.get_library_tracks(allow_cache):
if track.album is not None:
if track.album.item_id not in prev_ids:
result.append(track.album)
prev_ids.add(track.album.item_id)
+ await self.mass.cache.set(cache_key, result)
return result
- async def get_library_tracks(self) -> List[Track]:
+ async def get_library_tracks(self, allow_cache=False) -> List[Track]:
"""Get all tracks recursively."""
- # TODO: apply caching for very large libraries ?
+ # pylint: disable = arguments-differ
+ cache_key = f"{self.id}.library_tracks"
+ if allow_cache:
+ if cache_result := await self.mass.cache.get(cache_key):
+ return cache_result
result = []
for _root, _dirs, _files in os.walk(self._music_dir):
for file in _files:
if TinyTag.is_supported(filename):
if track := await self._parse_track(filename):
result.append(track)
+ await self.mass.cache.set(cache_key, result)
return result
- async def get_library_playlists(self) -> List[Playlist]:
+ async def get_library_playlists(self, allow_cache=False) -> List[Playlist]:
"""Retrieve playlists from disk."""
+ # pylint: disable = arguments-differ
if not self._playlists_dir:
return []
+ cache_key = f"{self.id}.library_playlists"
+ if allow_cache:
+ if cache_result := await self.mass.cache.get(cache_key):
+ return cache_result
result = []
for filename in os.listdir(self._playlists_dir):
filepath = os.path.join(self._playlists_dir, filename)
playlist = await self.get_playlist(filepath)
if playlist:
result.append(playlist)
+ await self.mass.cache.set(cache_key, result)
return result
async def get_artist(self, prov_artist_id: str) -> Artist:
return next(
(
track.album.artist
- for track in await self.get_library_tracks()
+ for track in await self.get_library_tracks(True)
if track.album is not None
and track.album.artist is not None
and track.album.artist.item_id == prov_artist_id
return next(
(
track.album
- for track in await self.get_library_tracks()
+ for track in await self.get_library_tracks(True)
if track.album is not None and track.album.item_id == prov_album_id
),
None,
"""Get album tracks for given album id."""
return [
track
- for track in await self.get_library_tracks()
+ for track in await self.get_library_tracks(True)
if track.album is not None and track.album.item_id == prov_album_id
]
"""Get a list of albums for the given artist."""
return [
track.album
- for track in await self.get_library_tracks()
+ for track in await self.get_library_tracks(True)
if track.album is not None
and track.album.artist is not None
and track.album.artist.item_id == prov_artist_id
"""Get a list of all tracks as we have no clue about preference."""
return [
track
- for track in await self.get_library_tracks()
+ for track in await self.get_library_tracks(True)
if track.artists is not None
and prov_artist_id in [x.item_id for x in track.provider_ids]
]
}
await self._get_data("/track/reportStreamingEnd", params)
- async def _parse_artist(self, artist_obj):
+ async def _parse_artist(self, artist_obj: dict):
"""Parse qobuz artist object to generic layout."""
artist = Artist(
item_id=str(artist_obj["id"]), provider=self.id, name=artist_obj["name"]
)
artist.provider_ids.append(
- MediaItemProviderId(provider=self.id, item_id=str(artist_obj["id"]))
+ MediaItemProviderId(
+ provider=self.id,
+ item_id=str(artist_obj["id"]),
+ url=artist_obj.get(
+ "url", f'https://open.qobuz.com/artist/{artist_obj["id"]}'
+ ),
+ )
)
artist.metadata["image"] = self.__get_image(artist_obj)
if artist_obj.get("biography"):
artist.metadata["biography"] = artist_obj["biography"].get("content", "")
- if artist_obj.get("url"):
- artist.metadata["qobuz_url"] = artist_obj["url"]
return artist
async def _parse_album(self, album_obj: dict, artist_obj: dict = None):
provider=self.id,
item_id=str(album_obj["id"]),
quality=quality,
+ url=album_obj.get(
+ "url", f'https://open.qobuz.com/album/{album_obj["id"]}'
+ ),
details=f'{album_obj["maximum_sampling_rate"]}kHz {album_obj["maximum_bit_depth"]}bit',
available=album_obj["streamable"] and album_obj["displayable"],
)
album.year = datetime.datetime.fromtimestamp(album_obj["released_at"]).year
if album_obj.get("copyright"):
album.metadata["copyright"] = album_obj["copyright"]
- if album_obj.get("hires"):
- album.metadata["hires"] = "true"
- if album_obj.get("url"):
- album.metadata["qobuz_url"] = album_obj["url"]
if album_obj.get("description"):
album.metadata["description"] = album_obj["description"]
return album
- async def _parse_track(self, track_obj):
+ async def _parse_track(self, track_obj: dict):
"""Parse qobuz track object to generic layout."""
name, version = parse_title_and_version(
track_obj["title"], track_obj.get("version")
track.album = album
if track_obj.get("hires"):
track.metadata["hires"] = "true"
- if track_obj.get("url"):
- track.metadata["qobuz_url"] = track_obj["url"]
if track_obj.get("isrc"):
track.isrc = track_obj["isrc"]
if track_obj.get("performers"):
provider=self.id,
item_id=str(track_obj["id"]),
quality=quality,
+ url=track_obj.get(
+ "url", f'https://open.qobuz.com/track/{track_obj["id"]}'
+ ),
details=f'{track_obj["maximum_sampling_rate"]}kHz {track_obj["maximum_bit_depth"]}bit',
available=track_obj["streamable"] and track_obj["displayable"],
)
owner=playlist_obj["owner"]["name"],
)
playlist.provider_ids.append(
- MediaItemProviderId(provider=self.id, item_id=str(playlist_obj["id"]))
+ MediaItemProviderId(
+ provider=self.id,
+ item_id=str(playlist_obj["id"]),
+ url=playlist_obj.get(
+ "url", f'https://open.qobuz.com/playlist/{playlist_obj["id"]}'
+ ),
+ )
)
playlist.is_editable = (
playlist_obj["owner"]["id"] == self.__user_auth_info["user"]["id"]
or playlist_obj["is_collaborative"]
)
playlist.metadata["image"] = self.__get_image(playlist_obj)
- if playlist_obj.get("url"):
- playlist.metadata["qobuz_url"] = playlist_obj["url"]
playlist.checksum = playlist_obj["updated_at"]
return playlist
item_id=artist_obj["id"], provider=self.id, name=artist_obj["name"]
)
artist.provider_ids.append(
- MediaItemProviderId(provider=self.id, item_id=artist_obj["id"])
+ MediaItemProviderId(
+ provider=self.id,
+ item_id=artist_obj["id"],
+ url=artist_obj["external_urls"]["spotify"],
+ )
)
if "genres" in artist_obj:
artist.metadata["genres"] = artist_obj["genres"]
if "2a96cbd8b46e442fc41c2b86b821562f" not in img_url:
artist.metadata["image"] = img_url
break
- if artist_obj.get("external_urls"):
- artist.metadata["spotify_url"] = artist_obj["external_urls"]["spotify"]
return artist
- async def _parse_album(self, album_obj):
+ async def _parse_album(self, album_obj: dict):
"""Parse spotify album object to generic layout."""
name, version = parse_title_and_version(album_obj["name"])
album = Album(
album.year = int(album_obj["release_date"].split("-")[0])
if album_obj.get("copyrights"):
album.metadata["copyright"] = album_obj["copyrights"][0]["text"]
- if album_obj.get("external_urls"):
- album.metadata["spotify_url"] = album_obj["external_urls"]["spotify"]
if album_obj.get("explicit"):
album.metadata["explicit"] = str(album_obj["explicit"]).lower()
album.provider_ids.append(
provider=self.id,
item_id=album_obj["id"],
quality=MediaQuality.LOSSY_OGG,
+ url=album_obj["external_urls"]["spotify"],
)
)
return album
track.metadata["copyright"] = track_obj["copyright"]
if track_obj.get("explicit"):
track.metadata["explicit"] = True
- if track_obj.get("external_urls"):
- track.metadata["spotify_url"] = track_obj["external_urls"]["spotify"]
if track_obj.get("popularity"):
track.metadata["popularity"] = track_obj["popularity"]
track.provider_ids.append(
provider=self.id,
item_id=track_obj["id"],
quality=MediaQuality.LOSSY_OGG,
+ url=track_obj["external_urls"]["spotify"],
available=not track_obj["is_local"] and track_obj["is_playable"],
)
)
owner=playlist_obj["owner"]["display_name"],
)
playlist.provider_ids.append(
- MediaItemProviderId(provider=self.id, item_id=playlist_obj["id"])
+ MediaItemProviderId(
+ provider=self.id,
+ item_id=playlist_obj["id"],
+ url=playlist_obj["external_urls"]["spotify"],
+ )
)
playlist.is_editable = (
playlist_obj["owner"]["id"] == self._sp_user["id"]
)
if playlist_obj.get("images"):
playlist.metadata["image"] = playlist_obj["images"][0]["url"]
- if playlist_obj.get("external_urls"):
- playlist.metadata["spotify_url"] = playlist_obj["external_urls"]["spotify"]
playlist.checksum = playlist_obj["snapshot_id"]
return playlist