from dataclasses import dataclass
from enum import Enum
-from typing import Any, Optional
+from time import time
+from typing import Any, Coroutine, Optional
class EventType(Enum):
RADIO_ADDED = "radio added"
TASK_UPDATED = "task updated"
PROVIDER_REGISTERED = "provider registered"
- BACKGROUND_JOBS_UPDATED = "background_jobs_updated"
+ BACKGROUND_JOB_UPDATED = "background_job_updated"
@dataclass
data: Optional[Any] = None # optional data (such as the object)
+class JobStatus(Enum):
+ """Enum with Job status."""
+
+ PENDING = "pending"
+ RUNNING = "running"
+ CANCELLED = "cancelled"
+ FINISHED = "success"
+ ERROR = "error"
+
+
+@dataclass
+class BackgroundJob:
+ """Description of a background job/task."""
+
+ id: str
+ coro: Coroutine
+ name: str
+ timestamp: float = time()
+ status: JobStatus = JobStatus.PENDING
+
+ def to_dict(self):
+ """Return serializable dict from object."""
+ return {
+ "id": self.id,
+ "name": self.name,
+ "timestamp": self.status.value,
+ }
+
+
# player attributes
ATTR_PLAYER_ID = "player_id"
ATTR_PROVIDER_ID = "provider_id"
"""All logic for metadata retrieval."""
+from __future__ import annotations
+
+from time import time
+from typing import Optional
-from music_assistant.helpers.cache import cached
from music_assistant.helpers.images import create_thumbnail
from music_assistant.helpers.typing import MusicAssistant
-from music_assistant.helpers.util import merge_dict
+from music_assistant.models.media_items import Album, Artist, Playlist, Radio, Track
+from .audiodb import TheAudioDb
from .fanarttv import FanartTv
from .musicbrainz import MusicBrainz
-# TODO: add more metadata providers such as theaudiodb
-# TODO: add metadata support for albums and other media types
-
TABLE_THUMBS = "thumbnails"
class MetaDataController:
"""Several helpers to search and store metadata for mediaitems."""
- # TODO: create periodic task to search for missing metadata
def __init__(self, mass: MusicAssistant) -> None:
"""Initialize class."""
self.mass = mass
self.logger = mass.logger.getChild("metadata")
self.fanarttv = FanartTv(mass)
self.musicbrainz = MusicBrainz(mass)
+ self.audiodb = TheAudioDb(mass)
+ self._pref_lang: Optional[str] = None
+
+ @property
+ def preferred_language(self) -> str:
+ """
+ Return preferred language for metadata as 2 letter country code (uppercase).
+
+ Defaults to English (EN).
+ """
+ return self._pref_lang or "EN"
+
+ @preferred_language.setter
+ def preferred_language(self, lang: str) -> None:
+ """
+ Set preferred language to 2 letter country code.
+
+ Can only be set once.
+ """
+ if self._pref_lang is None:
+ self._pref_lang = lang.upper()
async def setup(self):
"""Async initialize of module."""
UNIQUE(url, size));"""
)
- async def get_artist_metadata(self, mb_artist_id: str, cur_metadata: dict) -> dict:
- """Get/update rich metadata for an artist by providing the musicbrainz artist id."""
- metadata = cur_metadata
- if "fanart" not in metadata:
- # no need to query (other) metadata providers if we already have a result
- metadata = merge_dict(
- metadata, await self._get_fanarttv_metadata(mb_artist_id)
- )
+ async def get_artist_metadata(self, artist: Artist) -> None:
+ """Get/update rich metadata for an artist."""
+ if not artist.musicbrainz_id:
+ artist.musicbrainz_id = await self.get_artist_musicbrainz_id(artist)
+ if metadata := await self.fanarttv.get_artist_metadata(artist):
+ artist.metadata.update(metadata)
+ if metadata := await self.audiodb.get_artist_metadata(artist):
+ artist.metadata.update(metadata)
+
+ artist.metadata.last_refresh = int(time())
+
+ async def get_album_metadata(self, album: Album) -> None:
+ """Get/update rich metadata for an album."""
+ if metadata := await self.audiodb.get_album_metadata(album):
+ album.metadata.update(metadata)
+ if metadata := await self.fanarttv.get_album_metadata(album):
+ album.metadata.update(metadata)
+
+ album.metadata.last_refresh = int(time())
- return metadata
+ async def get_track_metadata(self, track: Track) -> None:
+ """Get/update rich metadata for a track."""
+ if metadata := await self.audiodb.get_track_metadata(track):
+ track.metadata.update(metadata)
+
+ track.metadata.last_refresh = int(time())
+
+ async def get_playlist_metadata(self, playlist: Playlist) -> None:
+ """Get/update rich metadata for a playlist."""
+ # retrieve genres from tracks
+ # TODO: retrieve style/mood ?
+ playlist.metadata.genres = set()
+ for track in await self.mass.music.playlists.tracks(
+ playlist.item_id, playlist.provider
+ ):
+ if track.metadata.genres:
+ playlist.metadata.genres.update(track.metadata.genres)
+ elif track.album.metadata.genres:
+ playlist.metadata.genres.update(track.album.metadata.genres)
+ # TODO: create mosaic thumb/fanart from playlist tracks
+ playlist.metadata.last_refresh = int(time())
+
+ async def get_radio_metadata(self, radio: Radio) -> None:
+ # pylint: disable=no-self-use
+ """Get/update rich metadata for a radio station."""
+ # NOTE: we do not have any metadata for radiso so consider this future proofing ;-)
+ radio.metadata.last_refresh = int(time())
+
+ async def get_artist_musicbrainz_id(self, artist: Artist) -> str:
+ """Fetch musicbrainz id by performing search using the artist name, albums and tracks."""
+ # try with album first
+ for lookup_album in await self.mass.music.artists.get_provider_artist_albums(
+ artist.item_id, artist.provider
+ ):
+ if artist.name != lookup_album.artist.name:
+ continue
+ musicbrainz_id = await self.musicbrainz.get_mb_artist_id(
+ artist.name,
+ albumname=lookup_album.name,
+ album_upc=lookup_album.upc,
+ )
+ if musicbrainz_id:
+ return musicbrainz_id
+ # fallback to track
+ for lookup_track in await self.mass.music.artists.get_provider_artist_toptracks(
+ artist.item_id, artist.provider
+ ):
+ musicbrainz_id = await self.musicbrainz.get_mb_artist_id(
+ artist.name,
+ trackname=lookup_track.name,
+ track_isrc=lookup_track.isrc,
+ )
+ if musicbrainz_id:
+ return musicbrainz_id
+ # lookup failed, use the shitty workaround to use the name as id.
+ self.logger.warning("Unable to get musicbrainz ID for artist %s !", artist.name)
+ return artist.name
async def get_thumbnail(self, url, size) -> bytes:
"""Get/create thumbnail image for url."""
TABLE_THUMBS, {**match, "img": thumbnail}
)
return thumbnail
-
- async def _get_fanarttv_metadata(self, mb_artist_id: str) -> dict:
- """Get metadata from fanarttv for artist."""
- metadata = {}
- self.logger.info(
- "Fetching metadata for MusicBrainz Artist %s on Fanrt.tv", mb_artist_id
- )
- cache_key = f"fanarttv.artist_metadata.{mb_artist_id}"
- res = await cached(
- self.cache, cache_key, self.fanarttv.get_artist_images, mb_artist_id
- )
- if res:
- metadata = res
- self.logger.debug(
- "Found metadata for MusicBrainz Artist %s on Fanart.tv: %s",
- mb_artist_id,
- ", ".join(res.keys()),
- )
- return metadata
--- /dev/null
+"""TheAudioDb Metadata provider."""
+from __future__ import annotations
+
+from json.decoder import JSONDecodeError
+from typing import Any, Dict, Optional
+
+import aiohttp
+from asyncio_throttle import Throttler
+
+from music_assistant.helpers.app_vars import ( # pylint: disable=no-name-in-module
+ app_var,
+)
+from music_assistant.helpers.cache import use_cache
+from music_assistant.helpers.compare import compare_strings
+from music_assistant.helpers.typing import MusicAssistant
+from music_assistant.models.media_items import (
+ Album,
+ AlbumType,
+ Artist,
+ ImageType,
+ LinkType,
+ MediaItemImage,
+ MediaItemLink,
+ MediaItemMetadata,
+ Track,
+)
+
+IMG_MAPPING = {
+ "strArtistThumb": ImageType.THUMB,
+ "strArtistLogo": ImageType.LOGO,
+ "strArtistCutout": ImageType.CUTOUT,
+ "strArtistClearart": ImageType.CLEARART,
+ "strArtistWideThumb": ImageType.WIDE_THUMB,
+ "strArtistFanart": ImageType.FANART,
+ "strArtistBanner": ImageType.BANNER,
+ "strAlbumThumb": ImageType.THUMB,
+ "strAlbumThumbHQ": ImageType.THUMB,
+ "strAlbumCDart": ImageType.CDART,
+ "strAlbum3DCase": ImageType.OTHER,
+ "strAlbum3DFlat": ImageType.OTHER,
+ "strAlbum3DFace": ImageType.OTHER,
+ "strAlbum3DThumb": ImageType.OTHER,
+ "strTrackThumb": ImageType.THUMB,
+ "strTrack3DCase": ImageType.OTHER,
+}
+
+LINK_MAPPING = {
+ "strWebsite": LinkType.WEBSITE,
+ "strFacebook": LinkType.FACEBOOK,
+ "strTwitter": LinkType.TWITTER,
+ "strLastFMChart": LinkType.LASTFM,
+}
+
+ALBUMTYPE_MAPPING = {
+ "Single": AlbumType.SINGLE,
+ "Compilation": AlbumType.COMPILATION,
+ "Album": AlbumType.ALBUM,
+}
+
+
+class TheAudioDb:
+ """TheAudioDb metadata provider."""
+
+ def __init__(self, mass: MusicAssistant):
+ """Initialize class."""
+ self.mass = mass
+ self.cache = mass.cache
+ self.logger = mass.logger.getChild("audiodb")
+ self.throttler = Throttler(rate_limit=2, period=1)
+
+ async def get_artist_metadata(self, artist: Artist) -> MediaItemMetadata | None:
+ """Retrieve metadata for artist on theaudiodb."""
+ self.logger.debug("Fetching metadata for Artist %s on TheAudioDb", artist.name)
+ if data := await self._get_data("artist-mb.php", i=artist.musicbrainz_id):
+ if data.get("artists"):
+ return self.__parse_artist(data["artists"][0])
+ return None
+
+ async def get_album_metadata(self, album: Album) -> MediaItemMetadata | None:
+ """Retrieve metadata for album on theaudiodb."""
+ adb_album = None
+ if album.musicbrainz_id:
+ result = await self._get_data("album-mb.php", i=album.musicbrainz_id)
+ if result and result.get("album"):
+ adb_album = result["album"][0]
+ else:
+ # lookup by name
+ result = await self._get_data(
+ "searchalbum.php", s=album.artist.name, a=album.name
+ )
+ if result and result.get("album"):
+ for item in result["album"]:
+ if album.artist.musicbrainz_id:
+ if (
+ album.artist.musicbrainz_id
+ != item["strMusicBrainzArtistID"]
+ ):
+ continue
+ elif not compare_strings(
+ album.artist.name, item["strArtistStripped"]
+ ):
+ continue
+ if compare_strings(album.name, item["strAlbumStripped"]):
+ adb_album = item
+ break
+ if adb_album:
+ if not album.year:
+ album.year = int(adb_album.get("intYearReleased", "0"))
+ if not album.musicbrainz_id:
+ album.musicbrainz_id = adb_album["strMusicBrainzID"]
+ if not album.artist.musicbrainz_id:
+ album.artist.musicbrainz_id = adb_album["strMusicBrainzArtistID"]
+ if album.album_type == AlbumType.UNKNOWN:
+ album.album_type = ALBUMTYPE_MAPPING.get(
+ adb_album.get("strReleaseFormat"), AlbumType.UNKNOWN
+ )
+ return self.__parse_album(adb_album)
+ return None
+
+ async def get_track_metadata(self, track: Track) -> MediaItemMetadata | None:
+ """Retrieve metadata for track on theaudiodb."""
+ adb_track = None
+ if track.musicbrainz_id:
+ result = await self._get_data("track-mb.php", i=track.musicbrainz_id)
+ if result and result.get("track"):
+ return self.__parse_track(result["track"][0])
+
+ # lookup by name
+ for track_artist in track.artists:
+ result = await self._get_data(
+ "searchtrack.php?", s=track_artist.name, t=track.name
+ )
+ if result and result.get("track"):
+ for item in result["track"]:
+ if track_artist.musicbrainz_id:
+ if (
+ track_artist.musicbrainz_id
+ != item["strMusicBrainzArtistID"]
+ ):
+ continue
+ elif not compare_strings(track_artist.name, item["strArtist"]):
+ continue
+ if compare_strings(track.name, item["strTrack"]):
+ adb_track = item
+ break
+ if adb_track:
+ if not track.musicbrainz_id:
+ track.musicbrainz_id = adb_track["strMusicBrainzID"]
+ if not track.album.musicbrainz_id:
+ track.album.musicbrainz_id = adb_track["strMusicBrainzAlbumID"]
+ if not track_artist.musicbrainz_id:
+ track_artist.musicbrainz_id = adb_track["strMusicBrainzArtistID"]
+
+ return self.__parse_track(adb_track)
+ return None
+
+ def __parse_artist(self, artist_obj: Dict[str, Any]) -> MediaItemMetadata:
+ """Parse audiodb artist object to MediaItemMetadata."""
+ metadata = MediaItemMetadata()
+ # generic data
+ metadata.label = artist_obj.get("strLabel")
+ metadata.style = artist_obj.get("strStyle")
+ if genre := artist_obj.get("strGenre"):
+ metadata.genres = {genre}
+ metadata.mood = artist_obj.get("strMood")
+ # links
+ metadata.links = set()
+ for key, link_type in LINK_MAPPING.items():
+ if link := artist_obj.get(key):
+ metadata.links.add(MediaItemLink(link_type, link))
+ # description/biography
+ if desc := artist_obj.get(
+ f"strBiography{self.mass.metadata.preferred_language}"
+ ):
+ metadata.description = desc
+ else:
+ metadata.description = artist_obj.get("strBiographyEN")
+ # images
+ metadata.images = set()
+ for key, img_type in IMG_MAPPING.items():
+ for postfix in ("", "2", "3", "4", "5", "6", "7", "8", "9", "10"):
+ if img := artist_obj.get(f"{key}{postfix}"):
+ metadata.images.add(MediaItemImage(img_type, img))
+ else:
+ break
+ return metadata
+
+ def __parse_album(self, album_obj: Dict[str, Any]) -> MediaItemMetadata:
+ """Parse audiodb album object to MediaItemMetadata."""
+ metadata = MediaItemMetadata()
+ # generic data
+ metadata.label = album_obj.get("strLabel")
+ metadata.style = album_obj.get("strStyle")
+ if genre := album_obj.get("strGenre"):
+ metadata.genres = {genre}
+ metadata.mood = album_obj.get("strMood")
+ # links
+ metadata.links = set()
+ if link := album_obj.get("strWikipediaID"):
+ metadata.links.add(
+ MediaItemLink(LinkType.WIKIPEDIA, f"https://wikipedia.org/wiki/{link}")
+ )
+ if link := album_obj.get("strAllMusicID"):
+ metadata.links.add(
+ MediaItemLink(
+ LinkType.ALLMUSIC, f"https://www.allmusic.com/album/{link}"
+ )
+ )
+
+ # description
+ if desc := album_obj.get(
+ f"strDescription{self.mass.metadata.preferred_language}"
+ ):
+ metadata.description = desc
+ else:
+ metadata.description = album_obj.get("strDescriptionEN")
+ metadata.review = album_obj.get("strReview")
+ # images
+ metadata.images = set()
+ for key, img_type in IMG_MAPPING.items():
+ for postfix in ("", "2", "3", "4", "5", "6", "7", "8", "9", "10"):
+ if img := album_obj.get(f"{key}{postfix}"):
+ metadata.images.add(MediaItemImage(img_type, img))
+ else:
+ break
+ return metadata
+
+ def __parse_track(self, track_obj: Dict[str, Any]) -> MediaItemMetadata:
+ """Parse audiodb track object to MediaItemMetadata."""
+ metadata = MediaItemMetadata()
+ # generic data
+ metadata.lyrics = track_obj.get("strTrackLyrics")
+ metadata.style = track_obj.get("strStyle")
+ if genre := track_obj.get("strGenre"):
+ metadata.genres = {genre}
+ metadata.mood = track_obj.get("strMood")
+ # description
+ if desc := track_obj.get(
+ f"strDescription{self.mass.metadata.preferred_language}"
+ ):
+ metadata.description = desc
+ else:
+ metadata.description = track_obj.get("strDescriptionEN")
+ # images
+ metadata.images = set()
+ for key, img_type in IMG_MAPPING.items():
+ for postfix in ("", "2", "3", "4", "5", "6", "7", "8", "9", "10"):
+ if img := track_obj.get(f"{key}{postfix}"):
+ metadata.images.add(MediaItemImage(img_type, img))
+ else:
+ break
+ return metadata
+
+ @use_cache(86400 * 14)
+ async def _get_data(self, endpoint, **kwargs) -> Optional[dict]:
+ """Get data from api."""
+ url = f"https://theaudiodb.com/api/v1/json/{app_var(3)}/{endpoint}"
+ async with self.throttler:
+ async with self.mass.http_session.get(
+ url, params=kwargs, verify_ssl=False
+ ) as response:
+ try:
+ result = await response.json()
+ except (
+ aiohttp.ContentTypeError,
+ JSONDecodeError,
+ ):
+ self.logger.error("Failed to retrieve %s", endpoint)
+ text_result = await response.text()
+ self.logger.debug(text_result)
+ return None
+ except aiohttp.ClientConnectorError:
+ self.logger.error("Failed to retrieve %s", endpoint)
+ return None
+ if "error" in result and "limit" in result["error"]:
+ self.logger.error(result["error"])
+ return None
+ return result
"""FanartTv Metadata provider."""
+from __future__ import annotations
from json.decoder import JSONDecodeError
-from typing import Dict
+from typing import Optional
import aiohttp
from asyncio_throttle import Throttler
+from music_assistant.helpers.app_vars import ( # pylint: disable=no-name-in-module
+ app_var,
+)
+from music_assistant.helpers.cache import use_cache
from music_assistant.helpers.typing import MusicAssistant
+from music_assistant.models.media_items import (
+ Album,
+ Artist,
+ ImageType,
+ MediaItemImage,
+ MediaItemMetadata,
+)
# TODO: add support for personal api keys ?
# TODO: Add support for album artwork ?
+IMG_MAPPING = {
+ "artistthumb": ImageType.THUMB,
+ "hdmusiclogo": ImageType.LOGO,
+ "musicbanner": ImageType.BANNER,
+ "artistbackground": ImageType.FANART,
+}
+
class FanartTv:
"""Fanart.tv metadata provider."""
def __init__(self, mass: MusicAssistant):
"""Initialize class."""
self.mass = mass
+ self.cache = mass.cache
self.logger = mass.logger.getChild("fanarttv")
- self.throttler = Throttler(rate_limit=1, period=2)
+ self.throttler = Throttler(rate_limit=2, period=1)
+
+ async def get_artist_metadata(self, artist: Artist) -> MediaItemMetadata | None:
+ """Retrieve metadata for artist on fanart.tv."""
+ if not artist.musicbrainz_id:
+ return
+ self.logger.debug("Fetching metadata for Artist %s on Fanart.tv", artist.name)
+ if data := await self._get_data(f"music/{artist.musicbrainz_id}"):
+ metadata = MediaItemMetadata()
+ metadata.images = set()
+ for key, img_type in IMG_MAPPING.items():
+ items = data.get(key)
+ if not items:
+ continue
+ for item in items:
+ metadata.images.add(MediaItemImage(img_type, item["url"]))
+ return metadata
+ return None
- async def get_artist_images(self, mb_artist_id: str) -> Dict:
- """Retrieve images by musicbrainz artist id."""
- metadata = {}
- data = await self._get_data(f"music/{mb_artist_id}")
- if data:
- if data.get("hdmusiclogo"):
- metadata["logo"] = data["hdmusiclogo"][0]["url"]
- elif data.get("musiclogo"):
- metadata["logo"] = data["musiclogo"][0]["url"]
- if data.get("artistbackground"):
- count = 0
- for item in data["artistbackground"]:
- key = "fanart" if count == 0 else f"fanart.{count}"
- metadata[key] = item["url"]
- if data.get("artistthumb"):
- url = data["artistthumb"][0]["url"]
- if "2a96cbd8b46e442fc41c2b86b821562f" not in url:
- metadata["image"] = url
- if data.get("musicbanner"):
- metadata["banner"] = data["musicbanner"][0]["url"]
- return metadata
+ async def get_album_metadata(self, album: Album) -> MediaItemMetadata | None:
+ """Retrieve metadata for album on fanart.tv."""
+ if not album.musicbrainz_id:
+ return
+ self.logger.debug("Fetching metadata for Album %s on Fanart.tv", album.name)
+ if data := await self._get_data(f"music/albums/{album.musicbrainz_id}"):
+ if data and data.get("albums"):
+ data = data["albums"][album.musicbrainz_id]
+ metadata = MediaItemMetadata()
+ metadata.images = set()
+ for key, img_type in IMG_MAPPING.items():
+ items = data.get(key)
+ if not items:
+ continue
+ for item in items:
+ metadata.images.add(MediaItemImage(img_type, item["url"]))
+ return metadata
+ return None
- async def _get_data(self, endpoint, params=None):
+ @use_cache(86400 * 14)
+ async def _get_data(self, endpoint, **kwargs) -> Optional[dict]:
"""Get data from api."""
- if params is None:
- params = {}
url = f"http://webservice.fanart.tv/v3/{endpoint}"
- params["api_key"] = "639191cb0774661597f28a47e7e2bad5"
+ kwargs["api_key"] = app_var(4)
async with self.throttler:
async with self.mass.http_session.get(
- url, params=params, verify_ssl=False
+ url, params=kwargs, verify_ssl=False
) as response:
try:
result = await response.json()
"""Handle getting Id's from MusicBrainz."""
+from __future__ import annotations
import re
from json.decoder import JSONDecodeError
-from typing import Optional
import aiohttp
from asyncio_throttle import Throttler
-from music_assistant.helpers.cache import cached
+from music_assistant.helpers.cache import use_cache
from music_assistant.helpers.compare import compare_strings, get_compare_string
from music_assistant.helpers.typing import MusicAssistant
get_compare_string(artistname),
]:
if album_upc:
- endpoint = "release"
- params = {"query": f"barcode:{album_upc}"}
- cache_key = f"{endpoint}.barcode.{album_upc}"
+ query = f"barcode:{album_upc}"
else:
searchalbum = re.sub(LUCENE_SPECIAL, r"\\\1", albumname)
- endpoint = "release"
- params = {
- "query": f'artist:"{searchartist}" AND release:"{searchalbum}"'
- }
- cache_key = f"{endpoint}.{searchartist}.{searchalbum}"
- result = await cached(
- self.mass.cache, cache_key, self.get_data, endpoint, params
- )
+ query = f'artist:"{searchartist}" AND release:"{searchalbum}"'
+ result = await self.get_data("release", query=query)
if result and "releases" in result:
for strictness in [True, False]:
for item in result["releases"]:
async def search_artist_by_track(self, artistname, trackname=None, track_isrc=None):
"""Retrieve artist id by providing the artist name and trackname or track isrc."""
- endpoint = "recording"
searchartist = re.sub(LUCENE_SPECIAL, r"\\\1", artistname)
if track_isrc:
- endpoint = f"isrc/{track_isrc}"
- params = {"inc": "artist-credits"}
- cache_key = endpoint
+ result = await self.get_data(f"isrc/{track_isrc}", inc="artist-credits")
else:
searchtrack = re.sub(LUCENE_SPECIAL, r"\\\1", trackname)
- endpoint = "recording"
- params = {"query": '"{searchtrack}" AND artist:"{searchartist}"'}
- cache_key = f"{endpoint}.{searchtrack}.{searchartist}"
- result = await cached(
- self.mass.cache, cache_key, self.get_data(endpoint, params)
- )
+ result = await self.get_data(
+ "recording", query=f'"{searchtrack}" AND artist:"{searchartist}"'
+ )
if result and "recordings" in result:
for strictness in [True, False]:
for item in result["recordings"]:
return artist["id"]
return ""
- async def get_data(self, endpoint: str, params: Optional[dict] = None):
+ @use_cache(86400 * 30)
+ async def get_data(self, endpoint: str, **kwargs):
"""Get data from api."""
- if params is None:
- params = {}
url = f"http://musicbrainz.org/ws/2/{endpoint}"
- headers = {"User-Agent": "Music Assistant/1.0.0 https://github.com/marcelveldt"}
- params["fmt"] = "json"
+ headers = {
+ "User-Agent": "Music Assistant/1.0.0 https://github.com/music-assistant"
+ }
+ kwargs["fmt"] = "json"
async with self.throttler:
async with self.mass.http_session.get(
- url, headers=headers, params=params, verify_ssl=False
+ url, headers=headers, params=kwargs, verify_ssl=False
) as response:
try:
result = await response.json()
import asyncio
import statistics
-from typing import Dict, List, Tuple
+from typing import Dict, List, Optional, Tuple
+
+from databases import Database as Db
from music_assistant.constants import EventType, MassEvent
from music_assistant.controllers.music.albums import AlbumsController
from music_assistant.controllers.music.playlists import PlaylistController
from music_assistant.controllers.music.radio import RadioController
from music_assistant.controllers.music.tracks import TracksController
-from music_assistant.helpers.cache import cached
+from music_assistant.helpers.database import (
+ TABLE_PLAYLOG,
+ TABLE_PROV_MAPPINGS,
+ TABLE_TRACK_LOUDNESS,
+)
from music_assistant.helpers.datetime import utc_timestamp
from music_assistant.helpers.typing import MusicAssistant
from music_assistant.helpers.util import run_periodic
SetupFailedError,
)
from music_assistant.models.media_items import (
- Album,
MediaItem,
MediaItemProviderId,
MediaItemType,
MediaType,
- Playlist,
)
from music_assistant.models.provider import MusicProvider
-DB_PROV_MAPPINGS = "provider_mappings"
-DB_TRACK_LOUDNESS = "track_loudness"
-DB_PLAYLOG = "playlog"
-
class MusicController:
"""Several helpers around the musicproviders."""
async def setup(self):
"""Async initialize of module."""
- await self.__setup_database_tables()
- # setup generic controllers
- await self.artists.setup()
- await self.albums.setup()
- await self.tracks.setup()
- await self.radio.setup()
- await self.playlists.setup()
self.mass.create_task(self.__periodic_sync)
@property
)
try:
provider.mass = self.mass
+ provider.cache = self.mass.cache
provider.logger = self.logger.getChild(provider.id)
await provider.setup()
except Exception as err: # pylint: disable=broad-except
+ await self.radio.search(search_query, "database", limit)
)
provider = self.get_provider(provider_id)
- media_types_str = ".".join(sorted([x.value for x in media_types]))
- cache_key = f"{provider_id}.search.{search_query}.{media_types_str}.{limit}"
- return await cached(
- self.mass.cache,
- cache_key,
- provider.search,
- search_query,
- media_types,
- limit,
- )
+ return await provider.search(search_query, media_types, limit)
async def get_item_by_uri(
self, uri: str, force_refresh: bool = False, lazy: bool = True
item_id, provider_id, force_refresh=force_refresh, lazy=lazy
)
+ async def add_to_library(
+ self,
+ media_type: MediaType,
+ provider_item_id: str,
+ provider_id: str,
+ ) -> None:
+ """Add an item to the library."""
+ ctrl = self.get_controller(media_type)
+ await ctrl.add_to_library(provider_item_id, provider_id)
+
+ async def remove_from_library(
+ self, media_type: MediaType, provider_item_id: str, provider_id: str
+ ) -> None:
+ """Remove item from the library."""
+ ctrl = self.get_controller(media_type)
+ await ctrl.remove_from_library(provider_item_id, provider_id)
+
+ async def get_provider_mapping(
+ self,
+ media_type: MediaType,
+ provider_id: str,
+ provider_item_id: str,
+ db: Optional[Db] = None, # pylint: disable=invalid-name
+ ) -> int | None:
+ """Lookup database id for media item from provider id."""
+ if result := await self.mass.database.get_row(
+ TABLE_PROV_MAPPINGS,
+ {
+ "media_type": media_type.value,
+ "provider": provider_id,
+ "prov_item_id": provider_item_id,
+ },
+ db=db,
+ ):
+ return result["item_id"]
+ return None
+
+ async def set_provider_mappings(
+ self,
+ item_id: int,
+ media_type: MediaType,
+ prov_ids: List[MediaItemProviderId],
+ db: Optional[Db] = None, # pylint: disable=invalid-name
+ ):
+ """Store provider ids for media item to database."""
+ async with self.mass.database.get_db(db) as _db:
+ # make sure that existing items are deleted first
+ await self.mass.database.delete(
+ TABLE_PROV_MAPPINGS,
+ {"item_id": int(item_id), "media_type": media_type.value},
+ db=_db,
+ )
+ for prov_id in prov_ids:
+ await self.mass.database.insert_or_replace(
+ TABLE_PROV_MAPPINGS,
+ {
+ "item_id": item_id,
+ "media_type": media_type.value,
+ "prov_item_id": prov_id.item_id,
+ "provider": prov_id.provider,
+ "quality": prov_id.quality.value if prov_id.quality else None,
+ "details": prov_id.details,
+ "url": prov_id.url,
+ },
+ db=_db,
+ )
+
async def refresh_items(self, items: List[MediaItem]) -> None:
"""
Refresh MediaItems to force retrieval of full info and matches.
item.item_id, item.provider, item.media_type, lazy=False
)
- async def get_provider_mapping(
- self, media_type: MediaType, provider_id: str, provider_item_id: str
- ) -> int | None:
- """Lookup database id for media item from provider id."""
- if result := await self.mass.database.get_row(
- DB_PROV_MAPPINGS,
- {
- "media_type": media_type.value,
- "provider": provider_id,
- "prov_item_id": provider_item_id,
- },
- ):
- return result["item_id"]
- return None
-
- async def set_provider_mappings(
- self,
- item_id: int,
- media_type: MediaType,
- prov_ids: List[MediaItemProviderId],
- ):
- """Store provider ids for media item to database."""
- # make sure that existing items are deleted first
- await self.mass.database.delete(
- DB_PROV_MAPPINGS, {"item_id": int(item_id), "media_type": media_type.value}
- )
- for prov_id in prov_ids:
- await self.mass.database.insert_or_replace(
- DB_PROV_MAPPINGS,
- {
- "item_id": item_id,
- "media_type": media_type.value,
- "prov_item_id": prov_id.item_id,
- "provider": prov_id.provider,
- "quality": prov_id.quality.value if prov_id.quality else None,
- "details": prov_id.details,
- "url": prov_id.url,
- },
- )
-
- async def add_to_library(
- self, media_type: MediaType, provider_item_id: str, provider_id: str
- ) -> None:
- """Add an item to the library."""
- ctrl = self.get_controller(media_type)
- await ctrl.add_to_library(provider_item_id, provider_id)
-
- async def remove_from_library(
- self, media_type: MediaType, provider_item_id: str, provider_id: str
- ) -> None:
- """Remove item from the library."""
- ctrl = self.get_controller(media_type)
- await ctrl.remove_from_library(provider_item_id, provider_id)
-
async def set_track_loudness(self, item_id: str, provider_id: str, loudness: int):
"""List integrated loudness for a track in db."""
await self.mass.database.insert_or_replace(
- DB_TRACK_LOUDNESS,
+ TABLE_TRACK_LOUDNESS,
{"item_id": item_id, "provider": provider_id, "loudness": loudness},
)
) -> float | None:
"""Get integrated loudness for a track in db."""
if result := await self.mass.database.get_row(
- DB_TRACK_LOUDNESS,
+ TABLE_TRACK_LOUDNESS,
{
"item_id": provider_item_id,
"provider": provider_id,
"""Get average integrated loudness for tracks of given provider."""
all_items = []
for db_row in await self.mass.database.get_rows(
- DB_TRACK_LOUDNESS,
+ TABLE_TRACK_LOUDNESS,
{
"provider": provider_id,
},
"""Mark item as played in playlog."""
timestamp = utc_timestamp()
await self.mass.database.insert_or_replace(
- DB_PLAYLOG,
+ TABLE_PLAYLOG,
{"item_id": item_id, "provider": provider_id, "timestamp": timestamp},
)
db_item = await controller.get(
prov_item.item_id,
prov_item.provider,
- details=prov_item,
lazy=False,
)
- elif db_item and db_item.available != prov_item.available:
- # availability changed
- db_item = await controller.add_db_item(prov_item)
- elif db_item and not db_item.available:
- # use auto matching magic to find a substitute for missing item
- db_item = await controller.get(
- prov_item.item_id,
- prov_item.provider,
- lazy=False,
- details=prov_item,
- )
elif not db_item:
# for other mediatypes its enough to simply dump the item in the db
db_item = await controller.add_db_item(prov_item)
+ elif (
+ media_type == MediaType.PLAYLIST
+ and db_item.checksum != prov_item.checksum
+ ):
+ # playlist checksum changed
+ db_item = await controller.add_db_item(prov_item)
+
cur_ids.add(db_item.item_id)
if not db_item.in_library:
await controller.set_db_library(db_item.item_id, True)
- # sync album tracks
- if media_type == MediaType.ALBUM:
- await self._sync_album_tracks(db_item)
- # sync playlist tracks
- if media_type == MediaType.PLAYLIST:
- await self._sync_playlist_tracks(db_item)
+ # precache playlist/album tracks
+ if media_type in [MediaType.PLAYLIST, MediaType.ALBUM]:
+ await controller.tracks(prov_item.item_id, provider_id)
# process deletions
for item_id in prev_ids:
}
await controller.update_db_item(item_id, db_item, True)
- async def _sync_album_tracks(self, db_album: Album) -> None:
- """Store album tracks of in-library album in database."""
- for prov_id in db_album.provider_ids:
- for album_track in await self.albums.get_provider_album_tracks(
- prov_id.item_id, prov_id.provider
- ):
- db_track = await self.tracks.get_db_item_by_prov_id(
- album_track.provider, album_track.item_id
- )
- if db_track and not db_track.available:
- # use auto matching magic to find a substitute for missing track
- db_track = await self.tracks.get(
- album_track.item_id,
- album_track.provider,
- lazy=False,
- details=album_track,
- )
- elif not db_track:
- db_track = await self.tracks.add_db_item(album_track)
-
- # add track to album_tracks
- await self.mass.music.albums.add_db_album_track(
- db_album.item_id,
- db_track.item_id,
- album_track.disc_number,
- album_track.track_number,
- )
-
- async def _sync_playlist_tracks(self, db_playlist: Playlist) -> None:
- """Store playlist tracks of in-library playlist in database."""
- for prov_id in db_playlist.provider_ids:
- provider = self.get_provider(prov_id.provider)
- if not provider:
- continue
- # clear db first
- await self.playlists.remove_db_playlist_track(db_playlist.item_id)
- for playlist_track in await self.playlists.get_provider_playlist_tracks(
- prov_id.item_id, prov_id.provider
- ):
- db_track = await self.tracks.get_db_item_by_prov_id(
- playlist_track.provider, playlist_track.item_id
- )
- if db_track and not db_track.available:
- # try auto matching magic to find a substitute for missing track
- db_track = await self.tracks.get(
- playlist_track.item_id,
- playlist_track.provider,
- lazy=False,
- details=playlist_track,
- )
- elif not db_track:
- db_track = await self.tracks.add_db_item(playlist_track)
- assert playlist_track.position is not None
- await self.playlists.add_db_playlist_track(
- db_playlist.item_id,
- db_track.item_id,
- playlist_track.position,
- )
-
def get_controller(
self, media_type: MediaType
) -> ArtistsController | AlbumsController | TracksController | RadioController | PlaylistController:
if media_type == MediaType.PLAYLIST:
return self.playlists
- async def __setup_database_tables(self) -> None:
- """Init generic database tables."""
- async with self.mass.database.get_db() as _db:
- await _db.execute(
- f"""CREATE TABLE IF NOT EXISTS {DB_PROV_MAPPINGS}(
- item_id INTEGER NOT NULL,
- media_type TEXT NOT NULL,
- prov_item_id TEXT NOT NULL,
- provider TEXT NOT NULL,
- quality INTEGER NULL,
- details TEXT NULL,
- url TEXT NULL,
- UNIQUE(item_id, media_type, prov_item_id, provider)
- );"""
- )
- await _db.execute(
- f"""CREATE TABLE IF NOT EXISTS {DB_TRACK_LOUDNESS}(
- item_id INTEGER NOT NULL,
- provider TEXT NOT NULL,
- loudness REAL,
- UNIQUE(item_id, provider));"""
- )
- await _db.execute(
- f"""CREATE TABLE IF NOT EXISTS {DB_PLAYLOG}(
- item_id INTEGER NOT NULL,
- provider TEXT NOT NULL,
- timestamp REAL,
- UNIQUE(item_id, provider));"""
- )
-
@run_periodic(3 * 3600, True)
async def __periodic_sync(self):
"""Periodically sync all providers."""
from typing import List
from music_assistant.constants import EventType, MassEvent
-from music_assistant.helpers.cache import cached
from music_assistant.helpers.compare import compare_album, compare_strings
+from music_assistant.helpers.database import TABLE_ALBUMS
from music_assistant.helpers.json import json_serializer
-from music_assistant.helpers.util import create_sort_name, merge_dict
+from music_assistant.helpers.util import create_sort_name
from music_assistant.models.media_controller import MediaControllerBase
from music_assistant.models.media_items import (
Album,
class AlbumsController(MediaControllerBase[Album]):
"""Controller managing MediaItems of type Album."""
- db_table = "albums"
+ db_table = TABLE_ALBUMS
media_type = MediaType.ALBUM
item_cls = Album
- async def setup(self):
- """Async initialize of module."""
- # prepare database
- async with self.mass.database.get_db() as _db:
- await _db.execute(
- f"""CREATE TABLE IF NOT EXISTS {self.db_table}(
- item_id INTEGER PRIMARY KEY AUTOINCREMENT,
- name TEXT NOT NULL,
- sort_name TEXT NOT NULL,
- album_type TEXT,
- year INTEGER,
- version TEXT,
- in_library BOOLEAN DEFAULT 0,
- upc TEXT,
- artist json,
- metadata json,
- provider_ids json
- );"""
- )
- await _db.execute(
- """CREATE TABLE IF NOT EXISTS album_tracks(
- album_id INTEGER NOT NULL,
- track_id INTEGER NOT NULL,
- disc_number INTEGER NOT NULL,
- track_number INTEGER NOT NULL,
- UNIQUE(album_id, disc_number, track_number)
- );"""
- )
+ async def get(self, *args, **kwargs) -> Album:
+ """Return (full) details for a single media item."""
+ album = await super().get(*args, **kwargs)
+ # append full artist details to full album item
+ album.artist = await self.mass.music.artists.get(
+ album.artist.item_id, album.artist.provider
+ )
+ return album
async def tracks(self, item_id: str, provider_id: str) -> List[Track]:
"""Return album tracks for the given provider album id."""
album = await self.get(item_id, provider_id)
- # for in-library albums we have the tracks in db
- if album.in_library and album.provider == "database":
- return await self.get_db_album_tracks(album.item_id)
- # else: simply return the tracks from the first provider
+ # simply return the tracks from the first provider
for prov in album.provider_ids:
if tracks := await self.get_provider_album_tracks(
prov.item_id, prov.provider
"""Add album to local db and return the database item."""
# make sure we have an artist
assert item.artist
+ # grab additional metadata
+ await self.mass.metadata.get_album_metadata(item)
db_item = await self.add_db_item(item)
# also fetch same album on all providers
await self._match(db_item)
provider = self.mass.music.get_provider(provider_id)
if not provider:
return []
- cache_key = f"{provider_id}.albumtracks.{item_id}"
- return await cached(
- self.mass.cache,
- cache_key,
- provider.get_album_tracks,
- item_id,
- )
+ return await provider.get_album_tracks(item_id)
async def add_db_item(self, album: Album) -> Album:
"""Add a new album record to the database."""
if not album.sort_name:
album.sort_name = create_sort_name(album.name)
assert album.provider_ids
- # always try to grab existing item by external_id
- if album.upc:
- match = {"upc": album.upc}
- cur_item = await self.mass.database.get_row(self.db_table, match)
- if not cur_item:
- # fallback to matching
- match = {"sort_name": album.sort_name}
- for row in await self.mass.database.get_rows(self.db_table, match):
- row_album = Album.from_db_row(row)
- if compare_album(row_album, album):
- cur_item = row_album
- break
- if cur_item:
- # update existing
- return await self.update_db_item(cur_item.item_id, album)
-
- # insert new album
- album_artist = ItemMapping.from_item(
- await self.mass.music.artists.get_db_item_by_prov_id(
- album.artist.provider, album.artist.item_id
+ async with self.mass.database.get_db() as _db:
+ # always try to grab existing item by external_id
+ if album.musicbrainz_id:
+ match = {"musicbrainz_id": album.musicbrainz_id}
+ cur_item = await self.mass.database.get_row(
+ self.db_table, match, db=_db
+ )
+ if not cur_item and album.upc:
+ match = {"upc": album.upc}
+ cur_item = await self.mass.database.get_row(
+ self.db_table, match, db=_db
+ )
+ if not cur_item:
+ # fallback to matching
+ match = {"sort_name": album.sort_name}
+ for row in await self.mass.database.get_rows(
+ self.db_table, match, db=_db
+ ):
+ row_album = Album.from_db_row(row)
+ if compare_album(row_album, album):
+ cur_item = row_album
+ break
+ if cur_item:
+ # update existing
+ return await self.update_db_item(cur_item.item_id, album)
+
+ # insert new album
+ if album.artist.musicbrainz_id and album.artist.provider != "database":
+ album_artist = await self.mass.music.artists.add_db_item(album.artist)
+ else:
+ album_artist = (
+ await self.mass.music.artists.get_db_item_by_prov_id(
+ album.artist.provider, album.artist.item_id, db=_db
+ )
+ or album.artist
+ )
+ new_item = await self.mass.database.insert_or_replace(
+ self.db_table,
+ {
+ **album.to_db_row(),
+ "artist": json_serializer(ItemMapping.from_item(album_artist)),
+ },
+ db=_db,
)
- or album.artist
- )
- new_item = await self.mass.database.insert_or_replace(
- self.db_table,
- {**album.to_db_row(), "artist": json_serializer(album_artist)},
- )
- item_id = new_item["item_id"]
- # store provider mappings
- await self.mass.music.set_provider_mappings(
- item_id, MediaType.ALBUM, album.provider_ids
- )
- self.logger.debug("added %s to database", album.name)
- # return created object
- return await self.get_db_item(item_id)
+ item_id = new_item["item_id"]
+ # store provider mappings
+ await self.mass.music.set_provider_mappings(
+ item_id, MediaType.ALBUM, album.provider_ids, db=_db
+ )
+ self.logger.debug("added %s to database", album.name)
+ # return created object
+ return await self.get_db_item(item_id, db=_db)
async def update_db_item(
self, item_id: int, album: Album, overwrite: bool = False
) -> Album:
"""Update Album record in the database."""
- cur_item = await self.get_db_item(item_id)
- if overwrite:
- metadata = album.metadata
- provider_ids = album.provider_ids
- album_artist = ItemMapping.from_item(
- await self.mass.music.artists.get_db_item_by_prov_id(
- album.artist.provider, album.artist.item_id
+ async with self.mass.database.get_db() as _db:
+ cur_item = await self.get_db_item(item_id)
+ if album.artist.musicbrainz_id and album.artist.provider != "database":
+ album_artist = await self.mass.music.artists.add_db_item(album.artist)
+ else:
+ album_artist = (
+ await self.mass.music.artists.get_db_item_by_prov_id(
+ album.artist.provider, album.artist.item_id, db=_db
+ )
+ or album.artist
)
- or album.artist
+ if overwrite:
+ metadata = album.metadata
+ provider_ids = album.provider_ids
+ else:
+ metadata = cur_item.metadata.update(album.metadata)
+ provider_ids = {*cur_item.provider_ids, *album.provider_ids}
+
+ if album.album_type != AlbumType.UNKNOWN:
+ album_type = album.album_type
+ else:
+ album_type = cur_item.album_type
+
+ await self.mass.database.update(
+ self.db_table,
+ {"item_id": item_id},
+ {
+ "name": album.name if overwrite else cur_item.name,
+ "sort_name": album.sort_name if overwrite else cur_item.sort_name,
+ "version": album.version if overwrite else cur_item.version,
+ "year": album.year or cur_item.year,
+ "upc": album.upc or cur_item.upc,
+ "album_type": album_type.value,
+ "artist": json_serializer(ItemMapping.from_item(album_artist)),
+ "metadata": json_serializer(metadata),
+ "provider_ids": json_serializer(provider_ids),
+ },
+ db=_db,
)
- else:
- metadata = merge_dict(cur_item.metadata, album.metadata)
- provider_ids = {*cur_item.provider_ids, *album.provider_ids}
- album_artist = ItemMapping.from_item(
- await self.mass.music.artists.get_db_item_by_prov_id(
- cur_item.artist.provider, cur_item.artist.item_id
- )
- or cur_item.artist
+ await self.mass.music.set_provider_mappings(
+ item_id, MediaType.ALBUM, provider_ids, db=_db
)
-
- if album.album_type != AlbumType.UNKNOWN:
- album_type = album.album_type
- else:
- album_type = cur_item.album_type
-
- await self.mass.database.update(
- self.db_table,
- {"item_id": item_id},
- {
- "name": album.name if overwrite else cur_item.name,
- "sort_name": album.sort_name if overwrite else cur_item.sort_name,
- "version": album.version if overwrite else cur_item.version,
- "year": album.year or cur_item.year,
- "upc": album.upc or cur_item.upc,
- "album_type": album_type.value,
- "artist": json_serializer(album_artist),
- "metadata": json_serializer(metadata),
- "provider_ids": json_serializer(provider_ids),
- },
- )
- await self.mass.music.set_provider_mappings(
- item_id, MediaType.ALBUM, album.provider_ids
- )
- self.logger.debug("updated %s in database: %s", album.name, item_id)
- return await self.get_db_item(item_id)
-
- async def get_db_album_tracks(self, item_id) -> List[Track]:
- """Get album tracks for an in-library album."""
- query = (
- "SELECT TRACKS.*, ALBUMTRACKS.disc_number, ALBUMTRACKS.track_number "
- "FROM [tracks] TRACKS "
- "JOIN album_tracks ALBUMTRACKS ON TRACKS.item_id = ALBUMTRACKS.track_id "
- f"WHERE ALBUMTRACKS.album_id = {item_id}"
- )
- return await self.mass.music.tracks.get_db_items(query)
-
- async def add_db_album_track(
- self, album_id: int, track_id: int, disc_number: int, track_number: int
- ) -> None:
- """Add album track for an in-library album."""
- return await self.mass.database.insert_or_replace(
- "album_tracks",
- {
- "album_id": album_id,
- "track_id": track_id,
- "disc_number": disc_number,
- "track_number": track_number,
- },
- )
+ self.logger.debug("updated %s in database: %s", album.name, item_id)
+ return await self.get_db_item(item_id)
async def _match(self, db_album: Album) -> None:
"""
match_found = True
# while we're here, also match the artist
if db_album.artist.provider == "database":
- prov_artist = await self.mass.music.artists.get_provider_item(
- prov_album.artist.item_id, prov_album.artist.provider
- )
await self.mass.music.artists.update_db_item(
- db_album.artist.item_id, prov_artist
+ db_album.artist.item_id, prov_album.artist
)
# no match found
# try to find match on all providers
for provider in self.mass.music.providers:
+ if provider.id == "filesystem":
+ continue
if MediaType.ALBUM in provider.supported_mediatypes:
await find_prov_match(provider)
from typing import List
from music_assistant.constants import EventType, MassEvent
-from music_assistant.helpers.cache import cached
from music_assistant.helpers.compare import (
compare_album,
compare_strings,
compare_track,
)
+from music_assistant.helpers.database import TABLE_ARTISTS
from music_assistant.helpers.json import json_serializer
-from music_assistant.helpers.util import create_sort_name, merge_dict
+from music_assistant.helpers.util import create_sort_name
from music_assistant.models.media_controller import MediaControllerBase
from music_assistant.models.media_items import (
Album,
class ArtistsController(MediaControllerBase[Artist]):
"""Controller managing MediaItems of type Artist."""
- db_table = "artists"
+ db_table = TABLE_ARTISTS
media_type = MediaType.ARTIST
item_cls = Artist
- async def setup(self):
- """Async initialize of module."""
- # prepare database
- async with self.mass.database.get_db() as _db:
- await _db.execute(
- f"""CREATE TABLE IF NOT EXISTS {self.db_table}(
- item_id INTEGER PRIMARY KEY AUTOINCREMENT,
- name TEXT NOT NULL,
- sort_name TEXT NOT NULL,
- musicbrainz_id TEXT NOT NULL UNIQUE,
- in_library BOOLEAN DEFAULT 0,
- metadata json,
- provider_ids json
- );"""
- )
-
async def toptracks(self, item_id: str, provider_id: str) -> List[Track]:
"""Return top tracks for an artist."""
artist = await self.get(item_id, provider_id)
async def add(self, item: Artist) -> Artist:
"""Add artist to local db and return the database item."""
- if not item.musicbrainz_id:
- item.musicbrainz_id = await self.get_artist_musicbrainz_id(item)
- # grab additional metadata
- item.metadata = await self.mass.metadata.get_artist_metadata(
- item.musicbrainz_id, item.metadata
- )
+ # grab musicbrainz id and additional metadata
+ await self.mass.metadata.get_artist_metadata(item)
db_item = await self.add_db_item(item)
# also fetch same artist on all providers
await self.match_artist(db_item)
for provider in self.mass.music.providers:
if provider.id in cur_providers:
continue
+ if provider.id == "filesystem":
+ continue
if MediaType.ARTIST not in provider.supported_mediatypes:
continue
if not await self._match(db_artist, provider):
provider = self.mass.music.get_provider(provider_id)
if not provider:
return []
- cache_key = f"{provider_id}.artist_toptracks.{item_id}"
- return await cached(
- self.mass.cache,
- cache_key,
- provider.get_artist_toptracks,
- item_id,
- )
+ return await provider.get_artist_toptracks(item_id)
async def get_provider_artist_albums(
self, item_id: str, provider_id: str
provider = self.mass.music.get_provider(provider_id)
if not provider:
return []
- cache_key = f"{provider_id}.artistalbums.{item_id}"
- return await cached(
- self.mass.cache,
- cache_key,
- provider.get_artist_albums,
- item_id,
- )
+ return await provider.get_artist_albums(item_id)
async def add_db_item(self, artist: Artist) -> Artist:
"""Add a new artist record to the database."""
# update existing
return await self.update_db_item(cur_item["item_id"], artist)
# insert artist
- if not artist.sort_name:
- artist.sort_name = create_sort_name(artist.name)
- new_item = await self.mass.database.insert_or_replace(
- self.db_table, artist.to_db_row()
- )
- item_id = new_item["item_id"]
- # store provider mappings
- await self.mass.music.set_provider_mappings(
- item_id, MediaType.ARTIST, artist.provider_ids
- )
- self.logger.debug("added %s to database", artist.name)
- # return created object
- return await self.get_db_item(item_id)
+ async with self.mass.database.get_db() as _db:
+ if not artist.sort_name:
+ artist.sort_name = create_sort_name(artist.name)
+ new_item = await self.mass.database.insert_or_replace(
+ self.db_table, artist.to_db_row(), db=_db
+ )
+ item_id = new_item["item_id"]
+ # store provider mappings
+ await self.mass.music.set_provider_mappings(
+ item_id, MediaType.ARTIST, artist.provider_ids, db=_db
+ )
+ self.logger.debug("added %s to database", artist.name)
+ # return created object
+ return await self.get_db_item(item_id, db=_db)
async def update_db_item(
self, item_id: int, artist: Artist, overwrite: bool = False
metadata = artist.metadata
provider_ids = artist.provider_ids
else:
- metadata = merge_dict(cur_item.metadata, artist.metadata)
+ metadata = cur_item.metadata.update(artist.metadata)
provider_ids = {*cur_item.provider_ids, *artist.provider_ids}
- await self.mass.database.update(
- self.db_table,
- {"item_id": item_id},
- {
- "name": artist.name if overwrite else cur_item.name,
- "sort_name": artist.sort_name if overwrite else cur_item.sort_name,
- "musicbrainz_id": artist.musicbrainz_id or cur_item.musicbrainz_id,
- "metadata": json_serializer(metadata),
- "provider_ids": json_serializer(provider_ids),
- },
- )
- await self.mass.music.set_provider_mappings(
- item_id, MediaType.ARTIST, artist.provider_ids
- )
- self.logger.debug("updated %s in database: %s", artist.name, item_id)
- return await self.get_db_item(item_id)
-
- async def get_artist_musicbrainz_id(self, artist: Artist) -> str:
- """Fetch musicbrainz id by performing search using the artist name, albums and tracks."""
- # try with album first
- for lookup_album in await self.get_provider_artist_albums(
- artist.item_id, artist.provider
- ):
- if not lookup_album:
- continue
- if artist.name != lookup_album.artist.name:
- continue
- musicbrainz_id = await self.mass.metadata.musicbrainz.get_mb_artist_id(
- artist.name,
- albumname=lookup_album.name,
- album_upc=lookup_album.upc,
+ async with self.mass.database.get_db() as _db:
+ await self.mass.database.update(
+ self.db_table,
+ {"item_id": item_id},
+ {
+ "name": artist.name if overwrite else cur_item.name,
+ "sort_name": artist.sort_name if overwrite else cur_item.sort_name,
+ "musicbrainz_id": artist.musicbrainz_id or cur_item.musicbrainz_id,
+ "metadata": json_serializer(metadata),
+ "provider_ids": json_serializer(provider_ids),
+ },
+ db=_db,
)
- if musicbrainz_id:
- return musicbrainz_id
- # fallback to track
- for lookup_track in await self.get_provider_artist_toptracks(
- artist.item_id, artist.provider
- ):
- if not lookup_track:
- continue
- musicbrainz_id = await self.mass.metadata.musicbrainz.get_mb_artist_id(
- artist.name,
- trackname=lookup_track.name,
- track_isrc=lookup_track.isrc,
+ await self.mass.music.set_provider_mappings(
+ item_id, MediaType.ARTIST, provider_ids, db=_db
)
- if musicbrainz_id:
- return musicbrainz_id
- # lookup failed, use the shitty workaround to use the name as id.
- self.logger.warning("Unable to get musicbrainz ID for artist %s !", artist.name)
- return artist.name
+ self.logger.debug("updated %s in database: %s", artist.name, item_id)
+ return await self.get_db_item(item_id, db=_db)
async def _match(self, db_artist: Artist, provider: MusicProvider) -> bool:
"""Try to find matching artists on given provider for the provided (database) artist."""
"""Manage MediaItems of type Playlist."""
from __future__ import annotations
-from typing import List, Optional
+from time import time
+from typing import List
from music_assistant.constants import EventType, MassEvent
-from music_assistant.helpers.cache import cached
+from music_assistant.helpers.database import TABLE_PLAYLISTS
from music_assistant.helpers.json import json_serializer
-from music_assistant.helpers.util import create_sort_name, merge_dict
+from music_assistant.helpers.util import create_sort_name
from music_assistant.models.errors import InvalidDataError, MediaNotFoundError
from music_assistant.models.media_controller import MediaControllerBase
from music_assistant.models.media_items import MediaType, Playlist, Track
class PlaylistController(MediaControllerBase[Playlist]):
"""Controller managing MediaItems of type Playlist."""
- db_table = "playlists"
+ db_table = TABLE_PLAYLISTS
media_type = MediaType.PLAYLIST
item_cls = Playlist
- async def setup(self):
- """Async initialize of module."""
- # prepare database
- async with self.mass.database.get_db() as _db:
- await _db.execute(
- f"""CREATE TABLE IF NOT EXISTS {self.db_table}(
- item_id INTEGER PRIMARY KEY AUTOINCREMENT,
- name TEXT NOT NULL,
- sort_name TEXT NOT NULL,
- owner TEXT NOT NULL,
- is_editable BOOLEAN NOT NULL,
- checksum TEXT NOT NULL,
- in_library BOOLEAN DEFAULT 0,
- metadata json,
- provider_ids json,
- UNIQUE(name, owner)
- );"""
- )
- await _db.execute(
- """CREATE TABLE IF NOT EXISTS playlist_tracks(
- playlist_id INTEGER NOT NULL,
- track_id INTEGER NOT NULL,
- position INTEGER NOT NULL,
- UNIQUE(playlist_id, position)
- );"""
- )
-
async def get_playlist_by_name(self, name: str) -> Playlist | None:
"""Get in-library playlist by name."""
return await self.mass.database.get_row(self.db_table, {"name": name})
async def tracks(self, item_id: str, provider_id: str) -> List[Track]:
"""Return playlist tracks for the given provider playlist id."""
playlist = await self.get(item_id, provider_id)
- if playlist.in_library and playlist.provider == "database":
- # for in-library playlists we have the tracks in db
- return await self.get_db_playlist_tracks(playlist.item_id)
- # else: simply return the tracks from the first provider
+ # simply return the tracks from the first provider
for prov in playlist.provider_ids:
if tracks := await self.get_provider_playlist_tracks(
prov.item_id, prov.provider
async def add(self, item: Playlist) -> Playlist:
"""Add playlist to local db and return the new database item."""
+ item.metadata.last_refresh = int(time())
+ await self.mass.metadata.get_playlist_metadata(item)
db_item = await self.add_db_item(item)
self.mass.signal_event(
MassEvent(EventType.PLAYLIST_ADDED, object_id=db_item.uri, data=db_item)
provider = self.mass.music.get_provider(playlist_prov.provider)
await provider.add_playlist_tracks(playlist_prov.item_id, [track_id_to_add])
# update local db entry
- await self.add_db_playlist_track(db_playlist_id, track.item_id, count + 1)
self.mass.signal_event(
MassEvent(
type=EventType.PLAYLIST_UPDATED, object_id=db_playlist_id, data=playlist
if track_ids_to_remove:
provider = self.mass.music.get_provider(prov.provider)
await provider.remove_playlist_tracks(prov.item_id, track_ids_to_remove)
- # update db
- for pos in positions:
- await self.remove_db_playlist_track(db_playlist_id, position=pos)
self.mass.signal_event(
MassEvent(
type=EventType.PLAYLIST_UPDATED, object_id=db_playlist_id, data=playlist
provider = self.mass.music.get_provider(provider_id)
if not provider:
return []
- playlist = await provider.get_playlist(item_id)
- cache_key = f"{provider_id}.playlisttracks.{item_id}"
# we need to make sure that position is set on the track
def playlist_track_with_position(track: Track, index: int):
track.position = index
return track
- tracks = await cached(
- self.mass.cache,
- cache_key,
- provider.get_playlist_tracks,
- item_id,
- checksum=playlist.checksum,
- )
-
+ tracks = await provider.get_playlist_tracks(item_id)
return [
playlist_track_with_position(track, index)
for index, track in enumerate(tracks)
async def add_db_item(self, playlist: Playlist) -> Playlist:
"""Add a new playlist record to the database."""
- match = {"name": playlist.name, "owner": playlist.owner}
- if cur_item := await self.mass.database.get_row(self.db_table, match):
- # update existing
- return await self.update_db_item(cur_item["item_id"], playlist)
+ async with self.mass.database.get_db() as _db:
+ match = {"name": playlist.name, "owner": playlist.owner}
+ if cur_item := await self.mass.database.get_row(
+ self.db_table, match, db=_db
+ ):
+ # update existing
+ return await self.update_db_item(cur_item["item_id"], playlist)
- # insert new playlist
- new_item = await self.mass.database.insert_or_replace(
- self.db_table,
- playlist.to_db_row(),
- )
- item_id = new_item["item_id"]
- # store provider mappings
- await self.mass.music.set_provider_mappings(
- item_id, MediaType.PLAYLIST, playlist.provider_ids
- )
- self.logger.debug("added %s to database", playlist.name)
- # return created object
- return await self.get_db_item(item_id)
+ # insert new playlist
+ new_item = await self.mass.database.insert_or_replace(
+ self.db_table, playlist.to_db_row(), db=_db
+ )
+ item_id = new_item["item_id"]
+ # store provider mappings
+ await self.mass.music.set_provider_mappings(
+ item_id, MediaType.PLAYLIST, playlist.provider_ids, db=_db
+ )
+ self.logger.debug("added %s to database", playlist.name)
+ # return created object
+ return await self.get_db_item(item_id, db=_db)
async def update_db_item(
self, item_id: int, playlist: Playlist, overwrite: bool = False
metadata = playlist.metadata
provider_ids = playlist.provider_ids
else:
- metadata = merge_dict(cur_item.metadata, playlist.metadata)
+ metadata = cur_item.metadata.update(playlist.metadata)
provider_ids = {*cur_item.provider_ids, *playlist.provider_ids}
if not playlist.sort_name:
playlist.sort_name = create_sort_name(playlist.name)
- await self.mass.database.update(
- self.db_table,
- {"item_id": item_id},
- {
- "name": playlist.name,
- "sort_name": playlist.sort_name,
- "owner": playlist.owner,
- "is_editable": playlist.is_editable,
- "checksum": playlist.checksum,
- "metadata": json_serializer(metadata),
- "provider_ids": json_serializer(provider_ids),
- },
- )
- await self.mass.music.set_provider_mappings(
- item_id, MediaType.PLAYLIST, playlist.provider_ids
- )
- self.logger.debug("updated %s in database: %s", playlist.name, item_id)
- db_item = await self.get_db_item(item_id)
- self.mass.signal_event(
- MassEvent(type=EventType.PLAYLIST_UPDATED, object_id=item_id, data=playlist)
- )
- return db_item
-
- async def get_db_playlist_tracks(self, item_id) -> List[Track]:
- """Get playlist tracks for an in-library playlist."""
- query = (
- "SELECT TRACKS.*, PLAYLISTTRACKS.position "
- "FROM [tracks] TRACKS "
- "JOIN playlist_tracks PLAYLISTTRACKS ON TRACKS.item_id = PLAYLISTTRACKS.track_id "
- f"WHERE PLAYLISTTRACKS.playlist_id = {item_id}"
- )
- return await self.mass.music.tracks.get_db_items(query)
-
- async def add_db_playlist_track(
- self, playlist_id: int, track_id: int, position: int
- ) -> None:
- """Add playlist track for an in-library playlist."""
- return await self.mass.database.insert_or_replace(
- "playlist_tracks",
- {"playlist_id": playlist_id, "track_id": track_id, "position": position},
- )
-
- async def remove_db_playlist_track(
- self,
- playlist_id: int,
- track_id: Optional[int] = None,
- position: Optional[int] = None,
- ) -> None:
- """Remove playlist track from an in-library playlist."""
- match = {"playlist_id": playlist_id}
- if track_id is not None:
- match["track_id"] = track_id
- if position is not None:
- match["position"] = position
- return await self.mass.database.delete(
- "playlist_tracks",
- match,
- )
+ async with self.mass.database.get_db() as _db:
+ await self.mass.database.update(
+ self.db_table,
+ {"item_id": item_id},
+ {
+ "name": playlist.name,
+ "sort_name": playlist.sort_name,
+ "owner": playlist.owner,
+ "is_editable": playlist.is_editable,
+ "checksum": playlist.checksum,
+ "metadata": json_serializer(metadata),
+ "provider_ids": json_serializer(provider_ids),
+ },
+ db=_db,
+ )
+ await self.mass.music.set_provider_mappings(
+ item_id, MediaType.PLAYLIST, provider_ids, db=_db
+ )
+ self.logger.debug("updated %s in database: %s", playlist.name, item_id)
+ db_item = await self.get_db_item(item_id, db=_db)
+ self.mass.signal_event(
+ MassEvent(
+ type=EventType.PLAYLIST_UPDATED, object_id=item_id, data=playlist
+ )
+ )
+ return db_item
"""Manage MediaItems of type Radio."""
from __future__ import annotations
+from time import time
+
from music_assistant.constants import EventType, MassEvent
+from music_assistant.helpers.database import TABLE_RADIOS
from music_assistant.helpers.json import json_serializer
-from music_assistant.helpers.util import create_sort_name, merge_dict
+from music_assistant.helpers.util import create_sort_name
from music_assistant.models.media_controller import MediaControllerBase
from music_assistant.models.media_items import MediaType, Radio
class RadioController(MediaControllerBase[Radio]):
"""Controller managing MediaItems of type Radio."""
- db_table = "radios"
+ db_table = TABLE_RADIOS
media_type = MediaType.RADIO
item_cls = Radio
- async def setup(self):
- """Async initialize of module."""
- # prepare database
- async with self.mass.database.get_db() as _db:
- await _db.execute(
- f"""CREATE TABLE IF NOT EXISTS {self.db_table}(
- item_id INTEGER PRIMARY KEY AUTOINCREMENT,
- name TEXT NOT NULL UNIQUE,
- sort_name TEXT NOT NULL,
- in_library BOOLEAN DEFAULT 0,
- metadata json,
- provider_ids json
- );"""
- )
-
async def get_radio_by_name(self, name: str) -> Radio | None:
"""Get in-library radio by name."""
return await self.mass.database.get_row(self.db_table, {"name": name})
async def add(self, item: Radio) -> Radio:
"""Add radio to local db and return the new database item."""
+ item.metadata.last_refresh = int(time())
+ await self.mass.metadata.get_radio_metadata(item)
db_item = await self.add_db_item(item)
self.mass.signal_event(
MassEvent(EventType.RADIO_ADDED, object_id=db_item.uri, data=db_item)
if not radio.sort_name:
radio.sort_name = create_sort_name(radio.name)
assert radio.provider_ids
- match = {"sort_name": radio.sort_name}
- if cur_item := await self.mass.database.get_row(self.db_table, match):
- # update existing
- return await self.update_db_item(cur_item["item_id"], radio)
+ async with self.mass.database.get_db() as _db:
+ match = {"sort_name": radio.sort_name}
+ if cur_item := await self.mass.database.get_row(
+ self.db_table, match, db=_db
+ ):
+ # update existing
+ return await self.update_db_item(cur_item["item_id"], radio)
- # insert new radio
- new_item = await self.mass.database.insert_or_replace(
- self.db_table, radio.to_db_row()
- )
- item_id = new_item["item_id"]
- # store provider mappings
- await self.mass.music.set_provider_mappings(
- item_id, MediaType.RADIO, radio.provider_ids
- )
- self.logger.debug("added %s to database", radio.name)
- # return created object
- return await self.get_db_item(item_id)
+ # insert new radio
+ new_item = await self.mass.database.insert_or_replace(
+ self.db_table, radio.to_db_row(), db=_db
+ )
+ item_id = new_item["item_id"]
+ # store provider mappings
+ await self.mass.music.set_provider_mappings(
+ item_id, MediaType.RADIO, radio.provider_ids, db=_db
+ )
+ self.logger.debug("added %s to database", radio.name)
+ # return created object
+ return await self.get_db_item(item_id, db=_db)
async def update_db_item(
self, item_id: int, radio: Radio, overwrite: bool = False
) -> Radio:
"""Update Radio record in the database."""
- cur_item = await self.get_db_item(item_id)
- if overwrite:
- metadata = radio.metadata
- provider_ids = radio.provider_ids
- else:
- metadata = merge_dict(cur_item.metadata, radio.metadata)
- provider_ids = {*cur_item.provider_ids, *radio.provider_ids}
- if not radio.sort_name:
- radio.sort_name = create_sort_name(radio.name)
+ async with self.mass.database.get_db() as _db:
+ cur_item = await self.get_db_item(item_id, db=_db)
+ if overwrite:
+ metadata = radio.metadata
+ provider_ids = radio.provider_ids
+ else:
+ metadata = cur_item.metadata.update(radio.metadata)
+ provider_ids = {*cur_item.provider_ids, *radio.provider_ids}
+ if not radio.sort_name:
+ radio.sort_name = create_sort_name(radio.name)
- match = {"item_id": item_id}
- await self.mass.database.update(
- self.db_table,
- match,
- {
- "name": radio.name,
- "sort_name": radio.sort_name,
- "metadata": json_serializer(metadata),
- "provider_ids": json_serializer(provider_ids),
- },
- )
- await self.mass.music.set_provider_mappings(
- item_id, MediaType.RADIO, radio.provider_ids
- )
- self.logger.debug("updated %s in database: %s", radio.name, item_id)
- return await self.get_db_item(item_id)
+ match = {"item_id": item_id}
+ await self.mass.database.update(
+ self.db_table,
+ match,
+ {
+ "name": radio.name,
+ "sort_name": radio.sort_name,
+ "metadata": json_serializer(metadata),
+ "provider_ids": json_serializer(provider_ids),
+ },
+ db=_db,
+ )
+ await self.mass.music.set_provider_mappings(
+ item_id, MediaType.RADIO, provider_ids, db=_db
+ )
+ self.logger.debug("updated %s in database: %s", radio.name, item_id)
+ return await self.get_db_item(item_id, db=_db)
compare_strings,
compare_track,
)
+from music_assistant.helpers.database import TABLE_TRACKS
from music_assistant.helpers.json import json_serializer
-from music_assistant.helpers.util import create_sort_name, merge_dict
+from music_assistant.helpers.util import create_sort_name
from music_assistant.models.media_controller import MediaControllerBase
from music_assistant.models.media_items import ItemMapping, MediaType, Track
class TracksController(MediaControllerBase[Track]):
"""Controller managing MediaItems of type Track."""
- db_table = "tracks"
+ db_table = TABLE_TRACKS
media_type = MediaType.TRACK
item_cls = Track
- async def setup(self) -> None:
- """Async initialize of module."""
- # prepare database
- async with self.mass.database.get_db() as _db:
- await _db.execute(
- f"""CREATE TABLE IF NOT EXISTS {self.db_table}(
- item_id INTEGER PRIMARY KEY AUTOINCREMENT,
- name TEXT NOT NULL,
- sort_name TEXT NOT NULL,
- version TEXT,
- duration INTEGER,
- in_library BOOLEAN DEFAULT 0,
- isrc TEXT,
- artists json,
- metadata json,
- provider_ids json
- );"""
+ async def get(self, *args, **kwargs) -> Track:
+ """Return (full) details for a single media item."""
+ track = await super().get(*args, **kwargs)
+ # append full album details to full track item
+ if track.album:
+ track.album = await self.mass.music.albums.get(
+ track.album.item_id, track.album.provider
+ )
+ # append full artist details to full track item
+ full_artists = []
+ for artist in track.artists:
+ full_artists.append(
+ await self.mass.music.artists.get(artist.item_id, artist.provider)
)
+ track.artists = full_artists
+ return track
async def add(self, item: Track) -> Track:
"""Add track to local db and return the new database item."""
# make sure we have artists
assert item.artists
+ # grab additional metadata
+ await self.mass.metadata.get_track_metadata(item)
db_item = await self.add_db_item(item)
# also fetch same track on all providers (will also get other quality versions)
await self._match(db_item)
for provider in self.mass.music.providers:
if MediaType.TRACK not in provider.supported_mediatypes:
continue
+ if provider.id == "filesystem":
+ continue
self.logger.debug(
"Trying to match track %s on provider %s", db_track.name, provider.name
)
if not track.sort_name:
track.sort_name = create_sort_name(track.name)
cur_item = None
- # always try to grab existing item by external_id
- if track.isrc:
- match = {"isrc": track.isrc}
- cur_item = await self.mass.database.get_row(self.db_table, match)
- if not cur_item:
- # fallback to matching
- match = {"sort_name": track.sort_name}
- for row in await self.mass.database.get_rows(self.db_table, match):
- row_track = Track.from_db_row(row)
- if compare_track(row_track, track):
- cur_item = row_track
- break
- if cur_item:
- # update existing
- return await self.update_db_item(cur_item.item_id, track)
-
- # no existing match found: insert new track
- track_artists = await self._get_track_artists(track)
- new_item = await self.mass.database.insert_or_replace(
- self.db_table,
- {
- **track.to_db_row(),
- "artists": json_serializer(track_artists),
- },
- )
- item_id = new_item["item_id"]
- # store provider mappings
- await self.mass.music.set_provider_mappings(
- item_id, MediaType.TRACK, track.provider_ids
- )
-
- # add track to album_tracks
- if track.album is not None:
- album = await self.get_db_item_by_prov_id(
- track.album.provider, track.album.item_id
- ) or await self.mass.music.albums.add_db_item(track.album)
- if (
- album
- and track.disc_number is not None
- and track.track_number is not None
- ):
- await self.mass.music.albums.add_db_album_track(
- album.item_id, item_id, track.disc_number, track.track_number
+ async with self.mass.database.get_db() as _db:
+ if track.album:
+ track.album = ItemMapping.from_item(
+ await self.get_db_item_by_prov_id(
+ track.album.provider, track.album.item_id, db=_db
+ )
+ or await self.mass.music.albums.add_db_item(track.album)
+ )
+ # always try to grab existing item by external_id
+ if track.musicbrainz_id:
+ match = {"musicbrainz_id": track.musicbrainz_id}
+ cur_item = await self.mass.database.get_row(
+ self.db_table, match, db=_db
+ )
+ if not cur_item and track.isrc:
+ match = {"isrc": track.isrc}
+ cur_item = await self.mass.database.get_row(
+ self.db_table, match, db=_db
)
- # return created object
- return await self.get_db_item(item_id)
+ if not cur_item:
+ # fallback to matching
+ match = {"sort_name": track.sort_name}
+ for row in await self.mass.database.get_rows(
+ self.db_table, match, db=_db
+ ):
+ row_track = Track.from_db_row(row)
+ if compare_track(row_track, track):
+ cur_item = row_track
+ break
+ if cur_item:
+ # update existing
+ return await self.update_db_item(cur_item.item_id, track)
+
+ # no existing match found: insert new track
+ track_artists = await self._get_track_artists(track)
+ new_item = await self.mass.database.insert_or_replace(
+ self.db_table,
+ {
+ **track.to_db_row(),
+ "artists": json_serializer(track_artists),
+ },
+ db=_db,
+ )
+ item_id = new_item["item_id"]
+ # store provider mappings
+ await self.mass.music.set_provider_mappings(
+ item_id, MediaType.TRACK, track.provider_ids, db=_db
+ )
+
+ # return created object
+ return await self.get_db_item(item_id, db=_db)
async def update_db_item(
self, item_id: int, track: Track, overwrite: bool = False
) -> Track:
"""Update Track record in the database, merging data."""
- cur_item = await self.get_db_item(item_id)
- if overwrite:
- metadata = track.metadata
- provider_ids = track.provider_ids
- track_artists = track.artists
- else:
- metadata = merge_dict(cur_item.metadata, track.metadata)
- provider_ids = {*cur_item.provider_ids, *track.provider_ids}
+ async with self.mass.database.get_db() as _db:
+ cur_item = await self.get_db_item(item_id, db=_db)
+ if overwrite:
+ metadata = track.metadata
+ provider_ids = track.provider_ids
+ track_artists = track.artists
+ else:
+ metadata = cur_item.metadata.update(track.metadata)
+ provider_ids = {*cur_item.provider_ids, *track.provider_ids}
+ track_artists = await self._get_track_artists(track, cur_item.artists)
+ if track.album and not isinstance(track.album, ItemMapping):
+ track.album = ItemMapping.from_item(
+ await self.get_db_item_by_prov_id(
+ track.album.provider, track.album.item_id, db=_db
+ )
+ or await self.mass.music.albums.add_db_item(track.album)
+ )
+
+ # we store a mapping to artists on the track for easier access/listings
track_artists = await self._get_track_artists(track, cur_item.artists)
+ await self.mass.database.update(
+ self.db_table,
+ {"item_id": item_id},
+ {
+ "name": track.name if overwrite else cur_item.name,
+ "sort_name": track.sort_name if overwrite else cur_item.sort_name,
+ "version": track.version if overwrite else cur_item.version,
+ "duration": track.duration if overwrite else cur_item.duration,
+ "artists": json_serializer(track_artists),
+ "metadata": json_serializer(metadata),
+ "provider_ids": json_serializer(provider_ids),
+ "isrc": track.isrc or cur_item.isrc,
+ },
+ db=_db,
+ )
+ await self.mass.music.set_provider_mappings(
+ item_id, MediaType.TRACK, provider_ids, db=_db
+ )
- # we store a mapping to artists on the track for easier access/listings
- track_artists = await self._get_track_artists(track, cur_item.artists)
- await self.mass.database.update(
- self.db_table,
- {"item_id": item_id},
- {
- "name": track.name if overwrite else cur_item.name,
- "sort_name": track.sort_name if overwrite else cur_item.sort_name,
- "version": track.version if overwrite else cur_item.version,
- "duration": track.duration if overwrite else cur_item.duration,
- "artists": json_serializer(track_artists),
- "metadata": json_serializer(metadata),
- "provider_ids": json_serializer(provider_ids),
- "isrc": track.isrc or cur_item.isrc,
- },
- )
- await self.mass.music.set_provider_mappings(
- item_id, MediaType.TRACK, track.provider_ids
- )
- # add track to album_tracks
- if (
- track.album is not None
- and track.disc_number is not None
- and track.track_number is not None
- ):
- album = await self.get_db_item_by_prov_id(
- track.album.provider, track.album.item_id
- ) or await self.mass.music.albums.add_db_item(track.album)
- if album:
- await self.mass.music.albums.add_db_album_track(
- album.item_id, item_id, track.disc_number, track.track_number
- )
- self.logger.debug("updated %s in database: %s", track.name, item_id)
- return await self.get_db_item(item_id)
+ self.logger.debug("updated %s in database: %s", track.name, item_id)
+ return await self.get_db_item(item_id, db=_db)
async def _get_track_artists(
self, track: Track, cur_artists: List[ItemMapping] | None = None
# pylint: skip-file
# fmt: off
# flake8: noqa
-(lambda __g: [(lambda __mod: [[[None for __g['app_var'], app_var.__name__ in [(lambda index: (lambda __l: [[AV(aap(__l['var'].encode()).decode()) for __l['var'] in [(vars.split('acb2')[__l['index']][::(-1)])]][0] for __l['index'] in [(index)]][0])({}), 'app_var')]][0] for __g['vars'] in [('3YTNyUDOyQTOacb2=EmN5M2YjdzMhljYzYzYhlDMmFGNlVTOmNDZwMzNxYzNacb2=UDMzEGOyADO1QWO5kDNygTMlJGN5QzNzIWOmZTOiVmMacb2yMTNzITN')]][0] for __g['aap'] in [(__mod.b64decode)]][0])(__import__('base64', __g, __g, ('b64decode',), 0)) for __g['AV'] in [((lambda b, d: d.get('__metaclass__', getattr(b[0], '__class__', type(b[0])))('AV', b, d))((str,), (lambda __l: [__l for __l['__repr__'], __l['__repr__'].__name__ in [(lambda self: (lambda __l: [__name__ for __l['self'] in [(self)]][0])({}), '__repr__')]][0])({'__module__': __name__})))]][0])(globals())
+(lambda __g: [(lambda __mod: [[[None for __g['app_var'], app_var.__name__ in [(lambda index: (lambda __l: [[AV(aap(__l['var'].encode()).decode()) for __l['var'] in [(vars.split('acb2')[__l['index']][::(-1)])]][0] for __l['index'] in [(index)]][0])({}), 'app_var')]][0] for __g['vars'] in [('3YTNyUDOyQTOacb2=EmN5M2YjdzMhljYzYzYhlDMmFGNlVTOmNDZwMzNxYzNacb2=UDMzEGOyADO1QWO5kDNygTMlJGN5QzNzIWOmZTOiVmMacb2yMTNzITNacb2=UDZhJmMldTZ3QTY4IjZ3kTNxYjN0czNwI2YxkTM5MjN')]][0] for __g['aap'] in [(__mod.b64decode)]][0])(__import__('base64', __g, __g, ('b64decode',), 0)) for __g['AV'] in [((lambda b, d: d.get('__metaclass__', getattr(b[0], '__class__', type(b[0])))('AV', b, d))((str,), (lambda __l: [__l for __l['__repr__'], __l['__repr__'].__name__ in [(lambda self: (lambda __l: [__name__ for __l['self'] in [(self)]][0])({}), '__repr__')]][0])({'__module__': __name__})))]][0])(globals())
import asyncio
import functools
-import pickle
+import json
import time
-from typing import Awaitable
from music_assistant.helpers.typing import MusicAssistant
):
try:
data = await asyncio.get_running_loop().run_in_executor(
- None, pickle.loads, db_row["data"]
+ None, json.loads, db_row["data"]
)
except Exception as exc: # pylint: disable=broad-except
self.logger.exception(
checksum = self._get_checksum(checksum)
expires = int(time.time() + expiration)
self._mem_cache[cache_key] = (data, checksum, expires)
- data = await asyncio.get_running_loop().run_in_executor(
- None, pickle.dumps, data
- )
+ data = await asyncio.get_running_loop().run_in_executor(None, json.dumps, data)
await self.mass.database.insert_or_replace(
DB_TABLE,
{"key": cache_key, "expires": expires, "checksum": checksum, "data": data},
return functools.reduce(lambda x, y: x + y, map(ord, stringinput))
-async def cached(
- cache: Cache,
- cache_key: str,
- coro_func: Awaitable,
- *args,
- expires: int = (86400 * 30),
- checksum=None,
-):
- """Return helper method to store results of a coroutine in the cache."""
- cache_result = await cache.get(cache_key, checksum)
- if cache_result is not None:
- return cache_result
- if asyncio.iscoroutine(coro_func):
- result = await coro_func
- else:
- result = await coro_func(*args)
- cache.mass.create_task(cache.set(cache_key, result, checksum, expires))
- return result
+def use_cache(expiration=86400 * 30):
+ """Return decorator that can be used to cache a method's result."""
+
+ def wrapper(func):
+ @functools.wraps(func)
+ async def wrapped(*args, **kwargs):
+ method_class = args[0]
+ method_class_name = method_class.__class__.__name__
+ cache_key_parts = [method_class_name, func.__name__]
+ skip_cache = kwargs.pop("skip_cache", False)
+ cache_checksum = kwargs.pop("cache_checksum", None)
+ if len(args) > 1:
+ cache_key_parts += args[1:]
+ for key in sorted(kwargs.keys()):
+ cache_key_parts.append(f"{key}{kwargs[key]}")
+ cache_key = ".".join(cache_key_parts)
+ cachedata = await method_class.cache.get(cache_key, checksum=cache_checksum)
+
+ if not skip_cache and cachedata is not None:
+ return cachedata
+ result = await func(*args, **kwargs)
+ await method_class.cache.set(
+ cache_key, result, expiration=expiration, checksum=cache_checksum
+ )
+ return result
+
+ return wrapped
+
+ return wrapper
import unidecode
if TYPE_CHECKING:
- from music_assistant.models.media_items import Album, Artist, Track
+ from music_assistant.models.media_items import (
+ Album,
+ Artist,
+ MediaItemMetadata,
+ Track,
+ )
def get_compare_string(input_str):
return left_versions == right_versions
-def compare_explicit(left_metadata: dict, right_metadata: dict):
+def compare_explicit(left: MediaItemMetadata, right: MediaItemMetadata):
"""Compare if explicit is same in metadata."""
- left = left_metadata.get("explicit")
- right = right_metadata.get("explicit")
- if left is None and right is None:
+ if left.explicit is None and right.explicit is None:
return True
return left == right
if (left_album.upc in right_album.upc) or (right_album.upc in left_album.upc):
# UPC is always 100% accurate match
return True
+ if left_album.musicbrainz_id and right_album.musicbrainz_id:
+ if left_album.musicbrainz_id == right_album.musicbrainz_id:
+ # musicbrainz_id is always 100% accurate match
+ return True
if not compare_strings(left_album.name, right_album.name):
return False
if not compare_version(left_album.version, right_album.version):
if left_track.isrc and left_track.isrc == right_track.isrc:
# ISRC is always 100% accurate match
return True
+ if left_track.musicbrainz_id and right_track.musicbrainz_id:
+ if left_track.musicbrainz_id == right_track.musicbrainz_id:
+ # musicbrainz_id is always 100% accurate match
+ return True
# track name and version must match
if not compare_strings(left_track.name, right_track.name):
return False
# pylint: disable=invalid-name
-SCHEMA_VERSION = 2
+SCHEMA_VERSION = 3
+
+TABLE_PROV_MAPPINGS = "provider_mappings"
+TABLE_TRACK_LOUDNESS = "track_loudness"
+TABLE_PLAYLOG = "playlog"
+TABLE_ARTISTS = "artists"
+TABLE_ALBUMS = "albums"
+TABLE_TRACKS = "tracks"
+TABLE_PLAYLISTS = "playlists"
+TABLE_RADIOS = "radios"
class Database:
SCHEMA_VERSION,
)
- if prev_version < 1:
- # schema version 1: too many breaking changes, simply drop the media tables for now
+ if prev_version < 3:
+ # schema version 3: too many breaking changes, rebuild db
async with self.get_db() as _db:
await _db.execute("DROP TABLE IF EXISTS artists")
await _db.execute("DROP TABLE IF EXISTS albums")
await _db.execute("DROP TABLE IF EXISTS provider_mappings")
await _db.execute("DROP TABLE IF EXISTS cache")
- if prev_version < 2:
- # schema version 2: repair invalid data for radio items
- async with self.get_db() as _db:
- await _db.execute("DROP TABLE IF EXISTS radios")
- if await self.exists("provider_mappings", _db):
- await self.delete(
- "provider_mappings", {"media_type": "radio"}, _db
- )
-
+ # create db tables
+ await self.__create_database_tables()
# store current schema version
await self.insert_or_replace(
"settings", {"key": "version", "value": str(SCHEMA_VERSION)}
)
+
+ async def __create_database_tables(self) -> None:
+ """Init generic database tables."""
+ async with self.mass.database.get_db() as _db:
+ await _db.execute(
+ f"""CREATE TABLE IF NOT EXISTS {TABLE_PROV_MAPPINGS}(
+ item_id INTEGER NOT NULL,
+ media_type TEXT NOT NULL,
+ prov_item_id TEXT NOT NULL,
+ provider TEXT NOT NULL,
+ quality INTEGER NULL,
+ details TEXT NULL,
+ url TEXT NULL,
+ UNIQUE(item_id, media_type, prov_item_id, provider)
+ );"""
+ )
+ await _db.execute(
+ f"""CREATE TABLE IF NOT EXISTS {TABLE_TRACK_LOUDNESS}(
+ item_id INTEGER NOT NULL,
+ provider TEXT NOT NULL,
+ loudness REAL,
+ UNIQUE(item_id, provider));"""
+ )
+ await _db.execute(
+ f"""CREATE TABLE IF NOT EXISTS {TABLE_PLAYLOG}(
+ item_id INTEGER NOT NULL,
+ provider TEXT NOT NULL,
+ timestamp REAL,
+ UNIQUE(item_id, provider));"""
+ )
+ await _db.execute(
+ f"""CREATE TABLE IF NOT EXISTS {TABLE_ALBUMS}(
+ item_id INTEGER PRIMARY KEY AUTOINCREMENT,
+ name TEXT NOT NULL,
+ sort_name TEXT NOT NULL,
+ album_type TEXT,
+ year INTEGER,
+ version TEXT,
+ in_library BOOLEAN DEFAULT 0,
+ upc TEXT,
+ musicbrainz_id TEXT,
+ artist json,
+ metadata json,
+ provider_ids json
+ );"""
+ )
+ await _db.execute(
+ f"""CREATE TABLE IF NOT EXISTS {TABLE_ARTISTS}(
+ item_id INTEGER PRIMARY KEY AUTOINCREMENT,
+ name TEXT NOT NULL,
+ sort_name TEXT NOT NULL,
+ musicbrainz_id TEXT NOT NULL UNIQUE,
+ in_library BOOLEAN DEFAULT 0,
+ metadata json,
+ provider_ids json
+ );"""
+ )
+ await _db.execute(
+ f"""CREATE TABLE IF NOT EXISTS {TABLE_TRACKS}(
+ item_id INTEGER PRIMARY KEY AUTOINCREMENT,
+ name TEXT NOT NULL,
+ sort_name TEXT NOT NULL,
+ version TEXT,
+ duration INTEGER,
+ in_library BOOLEAN DEFAULT 0,
+ isrc TEXT,
+ musicbrainz_id TEXT,
+ artists json,
+ metadata json,
+ provider_ids json
+ );"""
+ )
+ await _db.execute(
+ f"""CREATE TABLE IF NOT EXISTS {TABLE_PLAYLISTS}(
+ item_id INTEGER PRIMARY KEY AUTOINCREMENT,
+ name TEXT NOT NULL,
+ sort_name TEXT NOT NULL,
+ owner TEXT NOT NULL,
+ is_editable BOOLEAN NOT NULL,
+ checksum TEXT NOT NULL,
+ in_library BOOLEAN DEFAULT 0,
+ metadata json,
+ provider_ids json,
+ UNIQUE(name, owner)
+ );"""
+ )
+ await _db.execute(
+ f"""CREATE TABLE IF NOT EXISTS {TABLE_RADIOS}(
+ item_id INTEGER PRIMARY KEY AUTOINCREMENT,
+ name TEXT NOT NULL UNIQUE,
+ sort_name TEXT NOT NULL,
+ in_library BOOLEAN DEFAULT 0,
+ metadata json,
+ provider_ids json
+ );"""
+ )
if isinstance(media_item, ItemMapping):
media_item = await mass.music.get_item_by_uri(media_item.uri)
if media_item and media_item.metadata.get("image"):
- return media_item.metadata["image"]
+ return media_item.metadata.image
if (
hasattr(media_item, "album")
and hasattr(media_item.album, "metadata")
and media_item.album.metadata.get("image")
):
- return media_item.album.metadata["image"]
+ return media_item.album.metadata.image
if hasattr(media_item, "albums"):
for album in media_item.albums:
if hasattr(album, "metadata") and album.metadata.get("image"):
- return album.metadata["image"]
+ return album.metadata.image
if (
hasattr(media_item, "artist")
and hasattr(media_item.artist, "metadata")
and media_item.artist.metadata.get("image")
):
- return media_item.artist.metadata["image"]
+ return media_item.artist.metadata.image
if media_item.media_type == MediaType.TRACK and media_item.album:
# try album instead for tracks
return await get_image_url(mass, media_item.album)
import functools
import logging
import threading
+from collections import deque
+from functools import partial
from time import time
from types import TracebackType
-from typing import Any, Callable, Coroutine, List, Optional, Set, Tuple, Type, Union
+from typing import Any, Callable, Coroutine, Deque, List, Optional, Tuple, Type, Union
+from uuid import uuid4
import aiohttp
from databases import DatabaseURL
-from music_assistant.constants import EventType, MassEvent
+from music_assistant.constants import BackgroundJob, EventType, JobStatus, MassEvent
from music_assistant.controllers.metadata import MetaDataController
from music_assistant.controllers.music import MusicController
from music_assistant.controllers.players import PlayerController
EventCallBackType, Optional[Tuple[EventType]], Optional[Tuple[str]]
]
+MAX_SIMULTANEOUS_JOBS = 5
+
class MusicAssistant:
"""Main MusicAssistant object."""
self.logger = logging.getLogger(__name__)
self._listeners = []
- self._jobs = asyncio.Queue()
- self._job_names = set()
+ self._jobs: Deque[BackgroundJob] = deque()
+ self._jobs_event = asyncio.Event()
# init core controllers
self.database = Database(self, db_url)
return remove_listener
def add_job(
- self, job: Coroutine, name: Optional[str] = None, allow_duplicate=False
+ self, coro: Coroutine, name: Optional[str] = None, allow_duplicate=False
) -> None:
- """Add job to be (slowly) processed in the background (one by one)."""
- if not allow_duplicate and name in self._job_names:
- self.logger.debug("Ignored duplicate job: %s", name)
- job.close()
- return
+ """Add job to be (slowly) processed in the background."""
+ if not allow_duplicate:
+ # pylint: disable=protected-access
+ if any(x for x in self._jobs if x.name == name):
+ self.logger.debug("Ignored duplicate job: %s", name)
+ coro.close()
+ return
if not name:
- name = job.__qualname__ or job.__name__
- self._job_names.add(name)
- self._jobs.put_nowait((name, job))
- self.signal_event(MassEvent(EventType.BACKGROUND_JOBS_UPDATED, data=self.jobs))
+ name = coro.__qualname__ or coro.__name__
+ job = BackgroundJob(str(uuid4()), name=name, coro=coro)
+ self._jobs.append(job)
+ self._jobs_event.set()
+ self.signal_event(MassEvent(EventType.BACKGROUND_JOB_UPDATED, data=job))
def create_task(
self,
return task
@property
- def jobs(self) -> Set[str]:
- """Return the (names of) running background jobs."""
- return self._job_names
+ def jobs(self) -> List[BackgroundJob]:
+ """Return the pending/running background jobs."""
+ return list(self._jobs)
async def __process_jobs(self):
"""Process jobs in the background."""
while True:
- name, job = await self._jobs.get()
- time_start = time()
- self.logger.debug("Start processing job [%s].", name)
- try:
- # await job
- task = self.create_task(job, name=name)
- await task
- except Exception as err: # pylint: disable=broad-except
- self.logger.error(
- "Job [%s] failed with error %s.", name, str(err), exc_info=err
+ await self._jobs_event.wait()
+ self._jobs_event.clear()
+ # make sure we're not running more jobs than allowed
+ running_jobs = tuple(x for x in self._jobs if x.status == JobStatus.RUNNING)
+ slots_available = MAX_SIMULTANEOUS_JOBS - len(running_jobs)
+ count = 0
+ while count <= slots_available:
+ count += 1
+ next_job = next(
+ (x for x in self._jobs if x.status == JobStatus.PENDING), None
)
- else:
- duration = round(time() - time_start, 2)
- self.logger.info("Finished job [%s] in %s seconds.", name, duration)
- if name in self._job_names:
- self._job_names.remove(name)
- self.signal_event(
- MassEvent(EventType.BACKGROUND_JOBS_UPDATED, data=self.jobs)
+ if next_job is None:
+ break
+ # create task from coroutine and attach task_done callback
+ next_job.timestamp = time()
+ next_job.status = JobStatus.RUNNING
+ self.logger.debug("Start processing job [%s].", next_job.name)
+ task = self.create_task(next_job.coro)
+ task.set_name(next_job.name)
+ task.add_done_callback(partial(self.__job_done_cb, job=next_job))
+ self.signal_event(
+ MassEvent(EventType.BACKGROUND_JOB_UPDATED, data=next_job)
+ )
+
+ def __job_done_cb(self, task: asyncio.Task, job: BackgroundJob):
+ """Call when background job finishes."""
+ if task.cancelled():
+ job.status = JobStatus.CANCELLED
+ self.logger.debug("Job [%s] is cancelled.", job.name)
+ elif err := task.exception():
+ job.status = JobStatus.ERROR
+ self.logger.error(
+ "Job [%s] failed with error %s.",
+ job.name,
+ str(err),
+ exc_info=err,
+ )
+ else:
+ job.status = JobStatus.FINISHED
+ execution_time = round(time() - job.timestamp, 2)
+ self.logger.info(
+ "Finished job [%s] in %s seconds.", job.name, execution_time
)
+ self._jobs.remove(job)
+ self._jobs_event.set()
+ self.signal_event(MassEvent(EventType.BACKGROUND_JOB_UPDATED, data=job))
async def __aenter__(self) -> "MusicAssistant":
"""Return Context manager."""
from time import time
from typing import Generic, List, Optional, Tuple, TypeVar
-from music_assistant.helpers.cache import cached
+from databases import Database as Db
+
from music_assistant.helpers.typing import MusicAssistant
from music_assistant.models.errors import MediaNotFoundError, ProviderUnavailableError
ItemCls = TypeVar("ItemCls", bound="MediaControllerBase")
REFRESH_INTERVAL = 60 * 60 * 24 * 30
-REFRESH_KEY = "last_refresh"
class MediaControllerBase(Generic[ItemCls], metaclass=ABCMeta):
self.mass = mass
self.logger = mass.logger.getChild(f"music.{self.media_type.value}")
- @abstractmethod
- async def setup(self):
- """Async initialize of module."""
-
@abstractmethod
async def add(self, item: ItemCls) -> ItemCls:
"""Add item to local db and return the database item."""
) -> ItemCls:
"""Return (full) details for a single media item."""
db_item = await self.get_db_item_by_prov_id(provider_id, provider_item_id)
- if (
- db_item
- and (time() - db_item.metadata.get(REFRESH_KEY, 0)) > REFRESH_INTERVAL
- ):
+ if db_item and (time() - db_item.last_refresh) > REFRESH_INTERVAL:
force_refresh = True
if db_item and force_refresh:
provider_id, provider_item_id = await self.get_provider_id(db_item)
return db_item
if not details:
details = await self.get_provider_item(provider_item_id, provider_id)
- details.metadata[REFRESH_KEY] = int(time())
if not lazy:
return await self.add(details)
self.mass.add_job(self.add(details), f"Add {details.uri} to database")
provider = self.mass.music.get_provider(provider_id)
if not provider:
return {}
- cache_key = (
- f"{provider_id}.search.{self.media_type.value}.{search_query}.{limit}"
- )
- return await cached(
- self.mass.cache,
- cache_key,
- provider.search,
+ return await provider.search(
search_query,
[self.media_type],
limit,
func = self.mass.database.get_rows(self.db_table)
return [self.item_cls.from_db_row(db_row) for db_row in await func]
- async def get_db_item(self, item_id: int) -> ItemCls:
+ async def get_db_item(self, item_id: int, db: Optional[Db] = None) -> ItemCls:
+ # pylint: disable = invalid-name
"""Get record by id."""
match = {"item_id": int(item_id)}
- if db_row := await self.mass.database.get_row(self.db_table, match):
+ if db_row := await self.mass.database.get_row(self.db_table, match, db=db):
return self.item_cls.from_db_row(db_row)
return None
self,
provider_id: str,
provider_item_id: str,
+ db: Optional[Db] = None, # pylint: disable = invalid-name
) -> ItemCls | None:
"""Get the database album for the given prov_id."""
if provider_id == "database":
- return await self.get_db_item(provider_item_id)
+ return await self.get_db_item(provider_item_id, db=db)
if item_id := await self.mass.music.get_provider_mapping(
self.media_type, provider_id, provider_item_id
):
- return await self.get_db_item(item_id)
+ return await self.get_db_item(item_id, db=db)
return None
async def set_db_library(self, item_id: int, in_library: bool) -> None:
provider = self.mass.music.get_provider(provider_id)
if not provider:
raise ProviderUnavailableError(f"Provider {provider_id} is not available!")
- cache_key = f"{provider_id}.get_{self.media_type.value}.{item_id}"
- item = await cached(
- self.mass.cache, cache_key, provider.get_item, self.media_type, item_id
- )
+ item = await provider.get_item(self.media_type, item_id)
if not item:
raise MediaNotFoundError(
f"{self.media_type.value} {item_id} not found on provider {provider_id}"
"""Models and helpers for media items."""
from __future__ import annotations
-from dataclasses import dataclass, field
+from dataclasses import dataclass, field, fields
from enum import Enum, IntEnum
from typing import Any, Dict, List, Mapping, Optional, Set, Union
FLAC_LOSSLESS_HI_RES_4 = 23 # above 192khz 24 bits HI-RES
-@dataclass
+@dataclass(frozen=True)
class MediaItemProviderId(DataClassDictMixin):
"""Model for a MediaItem's provider id."""
return hash((self.provider, self.item_id, self.quality))
+class LinkType(Enum):
+ """Enum wth link types."""
+
+ WEBSITE = "website"
+ FACEBOOK = "facebook"
+ TWITTER = "twitter"
+ LASTFM = "lastfm"
+ YOUTUBE = "youtube"
+ INSTAGRAM = "instagram"
+ SNAPCHAT = "snapchat"
+ TIKTOK = "tiktok"
+ DISCOGS = "discogs"
+ WIKIPEDIA = "wikipedia"
+ ALLMUSIC = "allmusic"
+
+
+@dataclass(frozen=True)
+class MediaItemLink(DataClassDictMixin):
+ """Model for a link."""
+
+ type: LinkType
+ url: str
+
+ def __hash__(self):
+ """Return custom hash."""
+ return hash((self.type.value))
+
+
+class ImageType(Enum):
+ """Enum wth image types."""
+
+ THUMB = "thumb"
+ WIDE_THUMB = "wide_thumb"
+ FANART = "fanart"
+ LOGO = "logo"
+ CLEARART = "clearart"
+ BANNER = "banner"
+ CUTOUT = "cutout"
+ BACK = "back"
+ CDART = "cdart"
+ OTHER = "other"
+
+
+@dataclass(frozen=True)
+class MediaItemImage(DataClassDictMixin):
+ """Model for a image."""
+
+ type: ImageType
+ url: str
+
+ def __hash__(self):
+ """Return custom hash."""
+ return hash((self.url))
+
+
+@dataclass
+class MediaItemMetadata(DataClassDictMixin):
+ """Model for a MediaItem's metadata."""
+
+ description: Optional[str] = None
+ review: Optional[str] = None
+ explicit: Optional[bool] = None
+ images: Optional[Set[MediaItemImage]] = None
+ genres: Optional[Set[str]] = None
+ mood: Optional[str] = None
+ style: Optional[str] = None
+ copyright: Optional[str] = None
+ lyrics: Optional[str] = None
+ ean: Optional[str] = None
+ label: Optional[str] = None
+ links: Optional[Set[MediaItemLink]] = None
+ performers: Optional[Set[str]] = None
+ preview: Optional[str] = None
+ replaygain: Optional[float] = None
+ popularity: Optional[int] = None
+ # last_refresh: timestamp the (full) metadata was last collected
+ last_refresh: Optional[int] = None
+
+ def update(
+ self,
+ new_values: "MediaItemMetadata",
+ allow_overwrite: bool = False,
+ ) -> "MediaItemMetadata":
+ """Update metadata (in-place) with new values."""
+ for fld in fields(self):
+ new_val = getattr(new_values, fld.name)
+ if new_val is None:
+ continue
+ cur_val = getattr(self, fld.name)
+ if isinstance(cur_val, set):
+ cur_val.update(new_val)
+ elif cur_val is None or allow_overwrite:
+ setattr(self, fld.name, new_val)
+ return self
+
+
@dataclass
class MediaItem(DataClassDictMixin):
"""Base representation of a media item."""
# optional fields below
provider_ids: Set[MediaItemProviderId] = field(default_factory=set)
sort_name: Optional[str] = None
- metadata: Dict[str, MetadataTypes] = field(default_factory=dict)
+ metadata: MediaItemMetadata = field(default_factory=MediaItemMetadata)
in_library: bool = False
media_type: MediaType = MediaType.UNKNOWN
uri: str = ""
"""Return (calculated) availability."""
return any(x.available for x in self.provider_ids)
+ @property
+ def image(self) -> str | None:
+ """Return (first/random) image/thumb from metadata (if any)."""
+ if self.metadata is None or self.metadata.images is None:
+ return None
+ return next(
+ (x.url for x in self.metadata.images if x.type == ImageType.THUMB), None
+ )
+
def add_provider_id(self, prov_id: MediaItemProviderId) -> None:
"""Add provider ID, overwrite existing entry."""
self.provider_ids = {
}
self.provider_ids.add(prov_id)
+ @property
+ def last_refresh(self) -> int:
+ """Return timestamp the metadata was last refreshed (0 if full data never retrieved)."""
+ return self.metadata.last_refresh or 0
-@dataclass
+
+@dataclass(frozen=True)
class ItemMapping(DataClassDictMixin):
"""Representation of a minimized item object."""
"""Model for an artist."""
media_type: MediaType = MediaType.ARTIST
- musicbrainz_id: str = ""
+ musicbrainz_id: Optional[str] = None
class AlbumType(Enum):
artist: Union[ItemMapping, Artist, None] = None
album_type: AlbumType = AlbumType.UNKNOWN
upc: Optional[str] = None
+ musicbrainz_id: Optional[str] = None # release group id
def __hash__(self):
"""Return custom hash."""
media_type: MediaType = MediaType.TRACK
duration: int = 0
version: str = ""
- isrc: str = ""
+ isrc: Optional[str] = None
+ musicbrainz_id: Optional[str] = None # Recording ID
artists: List[Union[ItemMapping, Artist]] = field(default_factory=list)
# album track only
album: Union[ItemMapping, Album, None] = None
self._repeat_enabled = bool(db_row["repeat_enabled"])
self._crossfade_duration = db_row["crossfade_duration"]
if queue_cache := await self.mass.cache.get(f"queue_items.{self.queue_id}"):
- self._items = queue_cache["items"]
+ self._items = [QueueItem.from_dict(x) for x in queue_cache["items"]]
self._current_index = queue_cache["current_index"]
async def _save_state(self, save_items: bool = True) -> None:
if save_items:
await self.mass.cache.set(
f"queue_items.{self.queue_id}",
- {"items": self._items, "current_index": self._current_index},
+ {
+ "items": [x.to_dict() for x in self._items],
+ "current_index": self._current_index,
+ },
)
_attr_available: bool = True
_attr_supported_mediatypes: List[MediaType] = []
mass: MusicAssistant = None # set by setup
+ cache: MusicAssistant = None # set by setup
logger: Logger = None # set by setup
@abstractmethod
]
if playlist_dir is not None:
self._attr_supported_mediatypes.append(MediaType.PLAYLIST)
+ self._cached_tracks: List[Track] = []
async def setup(self) -> None:
"""Handle async initialization of the provider."""
async def get_library_tracks(self, allow_cache=False) -> List[Track]:
"""Get all tracks recursively."""
# pylint: disable = arguments-differ
- cache_key = f"{self.id}.library_tracks"
- if allow_cache:
- if cache_result := await self.mass.cache.get(cache_key):
- return cache_result
+ if allow_cache and self._cached_tracks:
+ return self._cached_tracks
result = []
cur_ids = set()
for _root, _dirs, _files in os.walk(self._music_dir):
if track := await self._parse_track(filename):
result.append(track)
cur_ids.add(track.item_id)
- await self.mass.cache.set(cache_key, result)
+ self._cached_tracks = result
return result
async def get_library_playlists(self) -> List[Playlist]:
else:
track.album.album_type = AlbumType.ALBUM
# parse other info
- track.metadata["genres"] = list(split_items(tag.genre))
+ track.metadata.genres = set(split_items(tag.genre))
track.disc_number = try_parse_int(tag.disc)
track.track_number = try_parse_int(tag.track)
track.isrc = tag.extra.get("isrc", "")
if "copyright" in tag.extra:
- track.metadata["copyright"] = tag.extra["copyright"]
+ track.metadata.copyright = tag.extra["copyright"]
if "lyrics" in tag.extra:
- track.metadata["lyrics"] = tag.extra["lyrics"]
+ track.metadata.lyrics = tag.extra["lyrics"]
quality_details = ""
if filename.endswith(".flac"):
from music_assistant.helpers.app_vars import ( # pylint: disable=no-name-in-module
app_var,
)
+from music_assistant.helpers.cache import use_cache
from music_assistant.helpers.util import parse_title_and_version, try_parse_int
from music_assistant.models.errors import LoginFailed
from music_assistant.models.media_items import (
AlbumType,
Artist,
ContentType,
+ ImageType,
+ MediaItemImage,
MediaItemProviderId,
MediaItemType,
MediaQuality,
params["type"] = "tracks"
if media_types[0] == MediaType.PLAYLIST:
params["type"] = "playlists"
- if searchresult := await self._get_data("catalog/search", params):
+ if searchresult := await self._get_data("catalog/search", **params):
if "artists" in searchresult:
result += [
await self._parse_artist(item)
async def get_library_artists(self) -> List[Artist]:
"""Retrieve all library artists from Qobuz."""
- params = {"type": "artists"}
endpoint = "favorite/getUserFavorites"
return [
await self._parse_artist(item)
- for item in await self._get_all_items(endpoint, params, key="artists")
+ for item in await self._get_all_items(
+ endpoint, key="artists", type="artists"
+ )
if (item and item["id"])
]
async def get_library_albums(self) -> List[Album]:
"""Retrieve all library albums from Qobuz."""
- params = {"type": "albums"}
endpoint = "favorite/getUserFavorites"
return [
await self._parse_album(item)
- for item in await self._get_all_items(endpoint, params, key="albums")
+ for item in await self._get_all_items(endpoint, key="albums", type="albums")
if (item and item["id"])
]
async def get_library_tracks(self) -> List[Track]:
"""Retrieve library tracks from Qobuz."""
- params = {"type": "tracks"}
endpoint = "favorite/getUserFavorites"
return [
await self._parse_track(item)
- for item in await self._get_all_items(endpoint, params, key="tracks")
+ for item in await self._get_all_items(endpoint, key="tracks", type="tracks")
if (item and item["id"])
]
async def get_artist(self, prov_artist_id) -> Artist:
"""Get full artist details by id."""
params = {"artist_id": prov_artist_id}
- artist_obj = await self._get_data("artist/get", params)
+ artist_obj = await self._get_data("artist/get", **params)
return (
await self._parse_artist(artist_obj)
if artist_obj and artist_obj["id"]
async def get_album(self, prov_album_id) -> Album:
"""Get full album details by id."""
params = {"album_id": prov_album_id}
- album_obj = await self._get_data("album/get", params)
+ album_obj = await self._get_data("album/get", **params)
return (
await self._parse_album(album_obj)
if album_obj and album_obj["id"]
async def get_track(self, prov_track_id) -> Track:
"""Get full track details by id."""
params = {"track_id": prov_track_id}
- track_obj = await self._get_data("track/get", params)
+ track_obj = await self._get_data("track/get", **params)
return (
await self._parse_track(track_obj)
if track_obj and track_obj["id"]
async def get_playlist(self, prov_playlist_id) -> Playlist:
"""Get full playlist details by id."""
params = {"playlist_id": prov_playlist_id}
- playlist_obj = await self._get_data("playlist/get", params)
+ playlist_obj = await self._get_data("playlist/get", **params)
return (
await self._parse_playlist(playlist_obj)
if playlist_obj and playlist_obj["id"]
params = {"album_id": prov_album_id}
return [
await self._parse_track(item)
- for item in await self._get_all_items("album/get", params, key="tracks")
+ for item in await self._get_all_items("album/get", **params, key="tracks")
if (item and item["id"])
]
async def get_playlist_tracks(self, prov_playlist_id) -> List[Track]:
"""Get all playlist tracks for given playlist id."""
- params = {"playlist_id": prov_playlist_id, "extra": "tracks"}
+ playlist = await self.get_playlist(prov_playlist_id)
endpoint = "playlist/get"
return [
await self._parse_track(item)
- for item in await self._get_all_items(endpoint, params, key="tracks")
+ for item in await self._get_all_items(
+ endpoint,
+ key="tracks",
+ playlist_id=prov_playlist_id,
+ extra="tracks",
+ cache_checksum=playlist.checksum,
+ )
if (item and item["id"])
]
async def get_artist_albums(self, prov_artist_id) -> List[Album]:
"""Get a list of albums for the given artist."""
- params = {"artist_id": prov_artist_id, "extra": "albums"}
endpoint = "artist/get"
return [
await self._parse_album(item)
- for item in await self._get_all_items(endpoint, params, key="albums")
+ for item in await self._get_all_items(
+ endpoint, key="albums", artist_id=prov_artist_id, extra="albums"
+ )
if (item and item["id"] and str(item["artist"]["id"]) == prov_artist_id)
]
async def get_artist_toptracks(self, prov_artist_id) -> List[Track]:
"""Get a list of most popular tracks for the given artist."""
- params = {
- "artist_id": prov_artist_id,
- "extra": "playlists",
- "offset": 0,
- "limit": 25,
- }
- result = await self._get_data("artist/get", params)
+ result = await self._get_data(
+ "artist/get",
+ artist_id=prov_artist_id,
+ extra="playlists",
+ offset=0,
+ limit=25,
+ )
if result and result["playlists"]:
return [
await self._parse_track(item)
]
# fallback to search
artist = await self.get_artist(prov_artist_id)
- params = {"query": artist.name, "limit": 25, "type": "tracks"}
- searchresult = await self._get_data("catalog/search", params)
+ searchresult = await self._get_data(
+ "catalog/search", query=artist.name, limit=25, type="tracks"
+ )
return [
await self._parse_track(item)
for item in searchresult["tracks"]["items"]
self, prov_playlist_id: str, prov_track_ids: List[str]
) -> None:
"""Add track(s) to playlist."""
- params = {
- "playlist_id": prov_playlist_id,
- "track_ids": ",".join(prov_track_ids),
- "playlist_track_ids": ",".join(prov_track_ids),
- }
- return await self._get_data("playlist/addTracks", params)
+ return await self._get_data(
+ "playlist/addTracks",
+ playlist_id=prov_playlist_id,
+ track_ids=",".join(prov_track_ids),
+ playlist_track_ids=",".join(prov_track_ids),
+ )
async def remove_playlist_tracks(
self, prov_playlist_id: str, prov_track_ids: List[str]
) -> None:
"""Remove track(s) from playlist."""
playlist_track_ids = set()
- params = {"playlist_id": prov_playlist_id, "extra": "tracks"}
- for track in await self._get_all_items("playlist/get", params, key="tracks"):
+ for track in await self._get_all_items(
+ "playlist/get", key="tracks", playlist_id=prov_playlist_id, extra="tracks"
+ ):
if str(track["id"]) in prov_track_ids:
playlist_track_ids.add(str(track["playlist_track_id"]))
- params = {
- "playlist_id": prov_playlist_id,
- "playlist_track_ids": ",".join(playlist_track_ids),
- }
- return await self._get_data("playlist/deleteTracks", params)
+ return await self._get_data(
+ "playlist/deleteTracks",
+ playlist_id=prov_playlist_id,
+ playlist_track_ids=",".join(playlist_track_ids),
+ )
async def get_stream_details(self, item_id: str) -> StreamDetails:
"""Return the content details for the given track when it will be streamed."""
for format_id in [27, 7, 6, 5]:
# it seems that simply requesting for highest available quality does not work
# from time to time the api response is empty for this request ?!
- params = {"format_id": format_id, "track_id": item_id, "intent": "stream"}
- result = await self._get_data("track/getFileUrl", params, sign_request=True)
+ result = await self._get_data(
+ "track/getFileUrl",
+ sign_request=True,
+ format_id=format_id,
+ track_id=item_id,
+ intent="stream",
+ skip_cache=True,
+ )
if result and result.get("url"):
streamdata = result
break
elif event.type == EventType.STREAM_ENDED:
# report streaming ended to qobuz
user_id = self.__user_auth_info["user"]["id"]
- params = {
- "user_id": user_id,
- "track_id": str(event.data.item_id),
- "duration": try_parse_int(event.data.seconds_played),
- }
- await self._get_data("/track/reportStreamingEnd", params)
+ await self._get_data(
+ "/track/reportStreamingEnd",
+ user_id=user_id,
+ track_id=str(event.data.item_id),
+ duration=try_parse_int(event.data.seconds_played),
+ )
async def _parse_artist(self, artist_obj: dict):
"""Parse qobuz artist object to generic layout."""
),
)
)
- artist.metadata["image"] = self.__get_image(artist_obj)
+ if img := self.__get_image(artist_obj):
+ artist.metadata.images = {MediaItemImage(ImageType.THUMB, img)}
if artist_obj.get("biography"):
- artist.metadata["biography"] = artist_obj["biography"].get("content", "")
+ artist.metadata.description = artist_obj["biography"].get("content")
return artist
async def _parse_album(self, album_obj: dict, artist_obj: dict = None):
):
album.album_type = AlbumType.ALBUM
if "genre" in album_obj:
- album.metadata["genre"] = album_obj["genre"]["name"]
- album.metadata["image"] = self.__get_image(album_obj)
+ album.metadata.genres = {album_obj["genre"]["name"]}
+ if img := self.__get_image(album_obj):
+ album.metadata.images = {MediaItemImage(ImageType.THUMB, img)}
if len(album_obj["upc"]) == 13:
# qobuz writes ean as upc ?!
- album.metadata["ean"] = album_obj["upc"]
album.upc = album_obj["upc"][1:]
else:
album.upc = album_obj["upc"]
if "label" in album_obj:
- album.metadata["label"] = album_obj["label"]["name"]
+ album.metadata.label = album_obj["label"]["name"]
if album_obj.get("released_at"):
album.year = datetime.datetime.fromtimestamp(album_obj["released_at"]).year
if album_obj.get("copyright"):
- album.metadata["copyright"] = album_obj["copyright"]
+ album.metadata.copyright = album_obj["copyright"]
if album_obj.get("description"):
- album.metadata["description"] = album_obj["description"]
+ album.metadata.description = album_obj["description"]
return album
async def _parse_track(self, track_obj: dict):
album = await self._parse_album(track_obj["album"])
if album:
track.album = album
- if track_obj.get("hires"):
- track.metadata["hires"] = "true"
if track_obj.get("isrc"):
track.isrc = track_obj["isrc"]
if track_obj.get("performers"):
- track.metadata["performers"] = track_obj["performers"]
+ track.metadata.performers = track_obj["performers"]
if track_obj.get("copyright"):
- track.metadata["copyright"] = track_obj["copyright"]
+ track.metadata.copyright = track_obj["copyright"]
if track_obj.get("audio_info"):
- track.metadata["replaygain"] = track_obj["audio_info"][
- "replaygain_track_gain"
- ]
+ track.metadata.replaygain = track_obj["audio_info"]["replaygain_track_gain"]
if track_obj.get("parental_warning"):
- track.metadata["explicit"] = True
- track.metadata["image"] = self.__get_image(track_obj)
+ track.metadata.explicit = True
+ if img := self.__get_image(track_obj):
+ track.metadata.images = {MediaItemImage(ImageType.THUMB, img)}
# get track quality
if track_obj["maximum_sampling_rate"] > 192:
quality = MediaQuality.FLAC_LOSSLESS_HI_RES_4
playlist_obj["owner"]["id"] == self.__user_auth_info["user"]["id"]
or playlist_obj["is_collaborative"]
)
- playlist.metadata["image"] = self.__get_image(playlist_obj)
+ if img := self.__get_image(playlist_obj):
+ playlist.metadata.images = {MediaItemImage(ImageType.THUMB, img)}
playlist.checksum = playlist_obj["updated_at"]
return playlist
"password": self._password,
"device_manufacturer_id": "music_assistant",
}
- details = await self._get_data("user/login", params)
+ details = await self._get_data("user/login", **params)
if details and "user" in details:
self.__user_auth_info = details
self.logger.info(
"Succesfully logged in to Qobuz as %s", details["user"]["display_name"]
)
+ self.mass.metadata.preferred_language = details["user"]["country_code"]
return details["user_auth_token"]
- async def _get_all_items(self, endpoint, params=None, key="tracks"):
+ @use_cache(3600 * 24)
+ async def _get_all_items(self, endpoint, key="tracks", **kwargs):
"""Get all items from a paged list."""
- if not params:
- params = {}
limit = 50
offset = 0
all_items = []
while True:
- params["limit"] = limit
- params["offset"] = offset
- result = await self._get_data(endpoint, params=params)
+ kwargs["limit"] = limit
+ kwargs["offset"] = offset
+ result = await self._get_data(endpoint, skip_cache=True, **kwargs)
offset += limit
if not result:
break
break
return all_items
- async def _get_data(self, endpoint, params=None, sign_request=False):
+ @use_cache(3600 * 2)
+ async def _get_data(self, endpoint, sign_request=False, **kwargs):
"""Get data from api."""
- if not params:
- params = {}
url = f"http://www.qobuz.com/api.json/0.2/{endpoint}"
headers = {"X-App-Id": app_var(0)}
if endpoint != "user/login":
headers["X-User-Auth-Token"] = auth_token
if sign_request:
signing_data = "".join(endpoint.split("/"))
- keys = list(params.keys())
+ keys = list(kwargs.keys())
keys.sort()
for key in keys:
- signing_data += f"{key}{params[key]}"
+ signing_data += f"{key}{kwargs[key]}"
request_ts = str(time.time())
request_sig = signing_data + request_ts + app_var(1)
request_sig = str(hashlib.md5(request_sig.encode()).hexdigest())
- params["request_ts"] = request_ts
- params["request_sig"] = request_sig
- params["app_id"] = app_var(0)
- params["user_auth_token"] = await self._auth_token()
+ kwargs["request_ts"] = request_ts
+ kwargs["request_sig"] = request_sig
+ kwargs["app_id"] = app_var(0)
+ kwargs["user_auth_token"] = await self._auth_token()
async with self._throttler:
async with self.mass.http_session.get(
- url, headers=headers, params=params, verify_ssl=False
+ url, headers=headers, params=kwargs, verify_ssl=False
) as response:
try:
result = await response.json()
from music_assistant.helpers.app_vars import ( # noqa # pylint: disable=no-name-in-module
app_var,
)
+from music_assistant.helpers.cache import use_cache
from music_assistant.helpers.util import parse_title_and_version
from music_assistant.models.errors import LoginFailed
from music_assistant.models.media_items import (
AlbumType,
Artist,
ContentType,
+ ImageType,
+ MediaItemImage,
MediaItemProviderId,
MediaItemType,
MediaQuality,
if MediaType.PLAYLIST in media_types:
searchtypes.append("playlist")
searchtype = ",".join(searchtypes)
- params = {"q": search_query, "type": searchtype, "limit": limit}
- if searchresult := await self._get_data("search", params=params):
+ if searchresult := await self._get_data(
+ "search", q=search_query, type=searchtype, limit=limit
+ ):
if "artists" in searchresult:
result += [
await self._parse_artist(item)
async def get_library_artists(self) -> List[Artist]:
"""Retrieve library artists from spotify."""
- spotify_artists = await self._get_data("me/following?type=artist&limit=50")
+ spotify_artists = await self._get_data(
+ "me/following", type="artist", limit=50, skip_cache=True
+ )
return [
await self._parse_artist(item)
for item in spotify_artists["artists"]["items"]
"""Retrieve library albums from the provider."""
return [
await self._parse_album(item["album"])
- for item in await self._get_all_items("me/albums")
+ for item in await self._get_all_items("me/albums", skip_cache=True)
if (item["album"] and item["album"]["id"])
]
"""Retrieve library tracks from the provider."""
return [
await self._parse_track(item["track"])
- for item in await self._get_all_items("me/tracks")
+ for item in await self._get_all_items("me/tracks", skip_cache=True)
if (item and item["track"]["id"])
]
"""Retrieve playlists from the provider."""
return [
await self._parse_playlist(item)
- for item in await self._get_all_items("me/playlists")
+ for item in await self._get_all_items("me/playlists", skip_cache=True)
if (item and item["id"])
]
async def get_playlist_tracks(self, prov_playlist_id) -> List[Track]:
"""Get all playlist tracks for given playlist id."""
+ playlist = await self.get_playlist(prov_playlist_id)
return [
await self._parse_track(item["track"])
for item in await self._get_all_items(
- f"playlists/{prov_playlist_id}/tracks"
+ f"playlists/{prov_playlist_id}/tracks", cache_checksum=playlist.checksum
)
if (item and item["track"] and item["track"]["id"])
]
)
)
if "genres" in artist_obj:
- artist.metadata["genres"] = artist_obj["genres"]
+ artist.metadata.genres = set(artist_obj["genres"])
if artist_obj.get("images"):
for img in artist_obj["images"]:
img_url = img["url"]
if "2a96cbd8b46e442fc41c2b86b821562f" not in img_url:
- artist.metadata["image"] = img_url
+ artist.metadata.images = {MediaItemImage(ImageType.THUMB, img_url)}
break
return artist
elif album_obj["album_type"] == "album":
album.album_type = AlbumType.ALBUM
if "genres" in album_obj:
- album.metadata["genres"] = album_obj["genres"]
+ album.metadata.genre = set(album_obj["genres"])
if album_obj.get("images"):
- album.metadata["image"] = album_obj["images"][0]["url"]
+ album.metadata.images = {
+ MediaItemImage(ImageType.THUMB, album_obj["images"][0]["url"])
+ }
if "external_ids" in album_obj and album_obj["external_ids"].get("upc"):
album.upc = album_obj["external_ids"]["upc"]
if "label" in album_obj:
- album.metadata["label"] = album_obj["label"]
+ album.metadata.label = album_obj["label"]
if album_obj.get("release_date"):
album.year = int(album_obj["release_date"].split("-")[0])
if album_obj.get("copyrights"):
- album.metadata["copyright"] = album_obj["copyrights"][0]["text"]
+ album.metadata.copyright = album_obj["copyrights"][0]["text"]
if album_obj.get("explicit"):
- album.metadata["explicit"] = str(album_obj["explicit"]).lower()
+ album.metadata.explicit = album_obj["explicit"]
album.add_provider_id(
MediaItemProviderId(
provider=self.id,
if artist and artist.item_id not in {x.item_id for x in track.artists}:
track.artists.append(artist)
- track.metadata["explicit"] = str(track_obj["explicit"]).lower()
+ track.metadata.explicit = track_obj["explicit"]
if "preview_url" in track_obj:
- track.metadata["preview"] = track_obj["preview_url"]
+ track.metadata.preview = track_obj["preview_url"]
if "external_ids" in track_obj and "isrc" in track_obj["external_ids"]:
track.isrc = track_obj["external_ids"]["isrc"]
if "album" in track_obj:
track.album = await self._parse_album(track_obj["album"])
if track_obj["album"].get("images"):
- track.metadata["image"] = track_obj["album"]["images"][0]["url"]
+ track.metadata.images = {
+ MediaItemImage(
+ ImageType.THUMB, track_obj["album"]["images"][0]["url"]
+ )
+ }
if track_obj.get("copyright"):
- track.metadata["copyright"] = track_obj["copyright"]
+ track.metadata.copyright = track_obj["copyright"]
if track_obj.get("explicit"):
- track.metadata["explicit"] = True
+ track.metadata.explicit = True
if track_obj.get("popularity"):
- track.metadata["popularity"] = track_obj["popularity"]
+ track.metadata.popularity = track_obj["popularity"]
track.add_provider_id(
MediaItemProviderId(
provider=self.id,
or playlist_obj["collaborative"]
)
if playlist_obj.get("images"):
- playlist.metadata["image"] = playlist_obj["images"][0]["url"]
+ playlist.metadata.images = {
+ MediaItemImage(ImageType.THUMB, playlist_obj["images"][0]["url"])
+ }
playlist.checksum = playlist_obj["snapshot_id"]
return playlist
if tokeninfo:
self._auth_token = tokeninfo
self._sp_user = await self._get_data("me")
+ self.mass.metadata.preferred_language = self._sp_user["country"]
self.logger.info(
"Succesfully logged in to Spotify as %s", self._sp_user["id"]
)
return tokeninfo
return None
- async def _get_all_items(self, endpoint, params=None, key="items"):
+ @use_cache(3600 * 24)
+ async def _get_all_items(self, endpoint, key="items", **kwargs):
"""Get all items from a paged list."""
- if not params:
- params = {}
limit = 50
offset = 0
all_items = []
while True:
- params["limit"] = limit
- params["offset"] = offset
- result = await self._get_data(endpoint, params=params)
+ kwargs["limit"] = limit
+ kwargs["offset"] = offset
+ result = await self._get_data(endpoint, skip_cache=True, **kwargs)
offset += limit
if not result or key not in result or not result[key]:
break
break
return all_items
- async def _get_data(self, endpoint, params=None):
+ @use_cache(3600 * 2)
+ async def _get_data(self, endpoint, **kwargs):
"""Get data from api."""
- if not params:
- params = {}
url = f"https://api.spotify.com/v1/{endpoint}"
- params["market"] = "from_token"
- params["country"] = "from_token"
+ kwargs["market"] = "from_token"
+ kwargs["country"] = "from_token"
token = await self.get_token()
if not token:
return None
headers = {"Authorization": f'Bearer {token["accessToken"]}'}
async with self._throttler:
async with self.mass.http_session.get(
- url, headers=headers, params=params, verify_ssl=False
+ url, headers=headers, params=kwargs, verify_ssl=False
) as response:
try:
result = await response.json()
return None
return result
- async def _delete_data(self, endpoint, params=None, data=None):
+ async def _delete_data(self, endpoint, data=None, **kwargs):
"""Delete data from api."""
- if not params:
- params = {}
url = f"https://api.spotify.com/v1/{endpoint}"
token = await self.get_token()
if not token:
return None
headers = {"Authorization": f'Bearer {token["accessToken"]}'}
async with self.mass.http_session.delete(
- url, headers=headers, params=params, json=data, verify_ssl=False
+ url, headers=headers, params=kwargs, json=data, verify_ssl=False
) as response:
return await response.text()
- async def _put_data(self, endpoint, params=None, data=None):
+ async def _put_data(self, endpoint, data=None, **kwargs):
"""Put data on api."""
- if not params:
- params = {}
url = f"https://api.spotify.com/v1/{endpoint}"
token = await self.get_token()
if not token:
return None
headers = {"Authorization": f'Bearer {token["accessToken"]}'}
async with self.mass.http_session.put(
- url, headers=headers, params=params, json=data, verify_ssl=False
+ url, headers=headers, params=kwargs, json=data, verify_ssl=False
) as response:
return await response.text()
- async def _post_data(self, endpoint, params=None, data=None):
+ async def _post_data(self, endpoint, data=None, **kwargs):
"""Post data on api."""
- if not params:
- params = {}
url = f"https://api.spotify.com/v1/{endpoint}"
token = await self.get_token()
if not token:
return None
headers = {"Authorization": f'Bearer {token["accessToken"]}'}
async with self.mass.http_session.post(
- url, headers=headers, params=params, json=data, verify_ssl=False
+ url, headers=headers, params=kwargs, json=data, verify_ssl=False
) as response:
return await response.text()
from asyncio_throttle import Throttler
+from music_assistant.helpers.cache import use_cache
from music_assistant.models.media_items import (
ContentType,
+ ImageType,
+ MediaItemImage,
MediaItemProviderId,
MediaItemType,
MediaQuality,
async def get_library_radios(self) -> List[Radio]:
"""Retrieve library/subscribed radio stations from the provider."""
result = []
- params = {"c": "presets"}
- radios = await self._get_data("Browse.ashx", params)
+ radios = await self._get_data("Browse.ashx", c="presets")
if radios and "body" in radios:
for radio in radios["body"]:
if radio.get("type", "") != "audio":
continue
# each radio station can have multiple streams add each one as different quality
- params = {"id": radio["preset_id"]}
- stream_info = await self._get_data("Tune.ashx", params)
+ stream_info = await self._get_data("Tune.ashx", id=radio["preset_id"])
for stream in stream_info["body"]:
result.append(await self._parse_radio(radio, stream))
return result
"""Get radio station details."""
prov_radio_id, media_type = prov_radio_id.split("--", 1)
params = {"c": "composite", "detail": "listing", "id": prov_radio_id}
- result = await self._get_data("Describe.ashx", params)
+ result = await self._get_data("Describe.ashx", **params)
if result and result.get("body") and result["body"][0].get("children"):
item = result["body"][0]["children"][0]
- stream_info = await self._get_data("Tune.ashx", {"id": prov_radio_id})
+ stream_info = await self._get_data("Tune.ashx", id=prov_radio_id)
for stream in stream_info["body"]:
if stream["media_type"] != media_type:
continue
details=stream["url"],
)
)
- # image
- if "image" in details:
- radio.metadata["image"] = details["image"]
- elif "logo" in details:
- radio.metadata["image"] = details["logo"]
+ # images
+ if img := details.get("image"):
+ radio.metadata.images = {MediaItemImage(ImageType.THUMB, img)}
+ if img := details.get("logo"):
+ radio.metadata.images = {MediaItemImage(ImageType.LOGO, img)}
return radio
async def get_stream_details(self, item_id: str) -> StreamDetails:
"""Get streamdetails for a radio station."""
item_id, media_type = item_id.split("--", 1)
- stream_info = await self._get_data("Tune.ashx", {"id": item_id})
+ stream_info = await self._get_data("Tune.ashx", id=item_id)
for stream in stream_info["body"]:
if stream["media_type"] == media_type:
return StreamDetails(
)
return None
- async def _get_data(self, endpoint, params=None):
+ @use_cache(3600 * 2)
+ async def _get_data(self, endpoint, **kwargs):
"""Get data from api."""
- if not params:
- params = {}
url = f"https://opml.radiotime.com/{endpoint}"
- params["render"] = "json"
- params["formats"] = "ogg,aac,wma,mp3"
- params["username"] = self._username
- params["partnerId"] = "1"
+ kwargs["render"] = "json"
+ kwargs["formats"] = "ogg,aac,wma,mp3"
+ kwargs["username"] = self._username
+ kwargs["partnerId"] = "1"
async with self._throttler:
async with self.mass.http_session.get(
- url, params=params, verify_ssl=False
+ url, params=kwargs, verify_ssl=False
) as response:
result = await response.json()
if not result or "error" in result:
self.logger.error(url)
- self.logger.error(params)
+ self.logger.error(kwargs)
result = None
return result
PROJECT_REQ_PYTHON_VERSION = "3.9"
PROJECT_LICENSE = "Apache License 2.0"
PROJECT_AUTHOR = "Marcel van der Veldt"
-PROJECT_URL = "https://music-assistant.github.io/"
PROJECT_EMAIL = "marcelveldt@users.noreply.github.com"
PROJECT_GITHUB_USERNAME = "music-assistant"
DOWNLOAD_URL = f"{GITHUB_URL}/archive/{PROJECT_VERSION}.zip"
PROJECT_URLS = {
"Bug Reports": f"{GITHUB_URL}/issues",
- "Website": "https://music-assistant.github.io/",
- "Discord": "https://discord.gg/9xHYFY",
+ "Website": GITHUB_URL,
+ "Discord": "https://discord.gg/AmDBM6QCAs",
}
PROJECT_DIR = Path(__file__).parent.resolve()
README_FILE = PROJECT_DIR / "README.md"
setup(
name=PROJECT_PACKAGE_NAME,
version=PROJECT_VERSION,
- url=PROJECT_URL,
+ url=GITHUB_URL,
download_url=DOWNLOAD_URL,
project_urls=PROJECT_URLS,
author=PROJECT_AUTHOR,