media_type: MediaType,
item_id: str,
provider_instance_id_or_domain: str,
- force_refresh: bool = False,
- lazy: bool = True,
- add_to_library: bool = False,
) -> MediaItemType | ItemMapping:
"""Get single music item by id and media type."""
return media_from_dict(
media_type=media_type,
item_id=item_id,
provider_instance_id_or_domain=provider_instance_id_or_domain,
- force_refresh=force_refresh,
- lazy=lazy,
- add_to_library=add_to_library,
)
)
from random import choice, random
from typing import TYPE_CHECKING
-from music_assistant.common.helpers.global_cache import get_global_cache_value
from music_assistant.common.helpers.json import serialize_to_json
from music_assistant.common.models.enums import ProviderFeature
from music_assistant.common.models.errors import (
self,
item_id: str,
provider_instance_id_or_domain: str,
- force_refresh: bool = False,
- lazy: bool = True,
- details: Album | ItemMapping = None,
- add_to_library: bool = False,
+ recursive: bool = True,
) -> Album:
"""Return (full) details for a single media item."""
album = await super().get(
item_id,
provider_instance_id_or_domain,
- force_refresh=force_refresh,
- lazy=lazy,
- details=details,
- add_to_library=add_to_library,
)
- # append artist details to full track item (resolve ItemMappings)
+ if not recursive:
+ return album
+
+ # append artist details to full album item (resolve ItemMappings)
album_artists = UniqueList()
for artist in album.artists:
if not isinstance(artist, ItemMapping):
await self.mass.music.artists.get(
artist.item_id,
artist.provider,
- lazy=lazy,
- details=artist,
- add_to_library=False,
)
)
album.artists = album_artists
- if not force_refresh:
- return album
- # if force refresh, we need to ensure that we also refresh all album tracks
- # in case of a filebased (non streaming) provider to ensure we catch changes the user
- # made on track level and then pressed the refresh button on album level.
- file_provs = get_global_cache_value("non_streaming_providers", [])
- for album_provider_mapping in album.provider_mappings:
- if album_provider_mapping.provider_instance not in file_provs:
- continue
- for prov_album_track in await self._get_provider_album_tracks(
- album_provider_mapping.item_id, album_provider_mapping.provider_instance
- ):
- if prov_album_track.provider != "library":
- continue
- for track_prov_map in prov_album_track.provider_mappings:
- if track_prov_map.provider_instance != album_provider_mapping.provider_instance:
- continue
- prov_track = await self.mass.music.tracks.get_provider_item(
- track_prov_map.item_id, track_prov_map.provider_instance, force_refresh=True
- )
- await self.mass.music.tracks._update_library_item(
- prov_album_track.item_id, prov_track, True
- )
- break
return album
async def remove_item_from_library(self, item_id: str | int) -> None:
)
return ItemMapping.from_item(db_artist)
- async def _match(self, db_album: Album) -> None:
+ async def match_providers(self, db_album: Album) -> None:
"""Try to find match on all (streaming) providers for the provided (database) album.
This is used to link objects of different providers/qualities together.
match_found = True
for provider_mapping in search_result_item.provider_mappings:
await self.add_provider_mapping(db_album.item_id, provider_mapping)
+ db_album.provider_mappings.add(provider_mapping)
return match_found
# try to find match on all providers
msg = "No Music Provider found that supports requesting similar tracks."
raise UnsupportedFeaturedException(msg)
- async def _match(self, db_artist: Artist) -> None:
+ async def match_providers(self, db_artist: Artist) -> None:
"""Try to find matching artists on all providers for the provided (database) item_id.
This is used to link objects of different providers together.
# 100% match, we update the db with the additional provider mapping(s)
for provider_mapping in prov_artist.provider_mappings:
await self.add_provider_mapping(db_artist.item_id, provider_mapping)
+ db_artist.provider_mappings.add(provider_mapping)
return True
# try to get a match with some reference albums of this artist
ref_albums = await self.mass.music.artists.albums(db_artist.item_id, db_artist.provider)
from abc import ABCMeta, abstractmethod
from collections.abc import Iterable
from contextlib import suppress
-from time import time
from typing import TYPE_CHECKING, Any, Generic, TypeVar
from music_assistant.common.helpers.json import json_loads, serialize_to_json
ItemCls = TypeVar("ItemCls", bound="MediaItemType")
-REFRESH_INTERVAL = 60 * 60 * 24 * 30
JSON_KEYS = ("artists", "album", "metadata", "provider_mappings", "external_ids")
SORT_KEYS = {
self._db_add_lock = asyncio.Lock()
async def add_item_to_library(
- self, item: ItemCls, metadata_lookup: bool = True, overwrite_existing: bool = False
+ self,
+ item: ItemCls,
+ metadata_lookup: bool = True,
+ overwrite_existing: bool = False,
) -> ItemCls:
"""Add item to library and return the new (or updated) database item."""
new_item = False
- # grab additional metadata
- if metadata_lookup:
- await self.mass.metadata.get_metadata(item)
# check for existing item first
library_id = await self._get_library_item_by_match(item, overwrite_existing)
-
if library_id is None:
# actually add a new item in the library db
async with self._db_add_lock:
library_id = await self._add_library_item(item)
new_item = True
- # also fetch same track on all providers (will also get other quality versions)
+ # grab additional metadata
if metadata_lookup:
library_item = await self.get_library_item(library_id)
- await self._match(library_item)
+ await self.mass.metadata.update_metadata(library_item)
# return final library_item after all match/metadata actions
library_item = await self.get_library_item(library_id)
self.mass.signal_event(
async def update_item_in_library(
self, item_id: str | int, update: ItemCls, overwrite: bool = False
) -> ItemCls:
- """Update existing library record in the database."""
+ """Update existing library record in the library database."""
await self._update_library_item(item_id, update, overwrite=overwrite)
# return the updated object
library_item = await self.get_library_item(item_id)
self,
item_id: str,
provider_instance_id_or_domain: str,
- force_refresh: bool = False,
- lazy: bool = True,
- details: ItemCls = None,
- add_to_library: bool = False,
) -> ItemCls:
"""Return (full) details for a single media item."""
- metadata_lookup = False
# always prefer the full library item if we have it
- library_item = await self.get_library_item_by_prov_id(
+ if library_item := await self.get_library_item_by_prov_id(
item_id,
provider_instance_id_or_domain,
- )
- if library_item and (time() - (library_item.metadata.last_refresh or 0)) > REFRESH_INTERVAL:
- # it's been too long since the full metadata was last retrieved (or never at all)
- # NOTE: do not attempt metadata refresh on unavailable items as it has side effects
- metadata_lookup = library_item.available
-
- if library_item and not (force_refresh or metadata_lookup or add_to_library):
- # we have a library item and no refreshing is needed, return the results!
+ ):
return library_item
-
- if force_refresh:
- # get (first) provider item id belonging to this library item
- add_to_library = True
- metadata_lookup = True
- if library_item:
- # resolve library item into a provider item to get the source details
- provider_instance_id_or_domain, item_id = await self.get_provider_mapping(
- library_item
- )
-
# grab full details from the provider
- details = await self.get_provider_item(
+ return await self.get_provider_item(
item_id,
provider_instance_id_or_domain,
- force_refresh=force_refresh,
- fallback=details,
)
- if not details and library_item:
- # something went wrong while trying to fetch/refresh this item
- # return the existing (unavailable) library item and leave this for another day
- return library_item
-
- if not details:
- # we couldn't get a match from any of the providers, raise error
- msg = f"Item not found: {provider_instance_id_or_domain}/{item_id}"
- raise MediaNotFoundError(msg)
-
- if not (add_to_library or metadata_lookup):
- # return the provider item as-is
- return details
-
- # create task to add the item to the library,
- # including matching metadata etc. takes some time
- # in 99% of the cases we just return lazy because we want the details as fast as possible
- # only if we really need to wait for the result (e.g. to prevent race conditions),
- # we can set lazy to false and we await the job to complete.
- overwrite_existing = force_refresh and library_item is not None
- task_id = f"add_{self.media_type.value}.{details.provider}.{details.item_id}"
- add_task = self.mass.create_task(
- self.add_item_to_library,
- item=details,
- metadata_lookup=metadata_lookup,
- overwrite_existing=overwrite_existing,
- task_id=task_id,
- )
- if not lazy:
- await add_task
- return add_task.result()
-
- return library_item or details
async def search(
self,
) -> None:
"""Update existing library record in the database."""
- async def _match(self, db_item: ItemCls) -> None:
+ async def match_providers(self, db_item: ItemCls) -> None:
"""
Try to find match on all (streaming) providers for the provided (database) item.
from __future__ import annotations
import random
+import time
from typing import Any
from music_assistant.common.helpers.json import serialize_to_json
playlist = await self.get(
item_id,
provider_instance_id_or_domain,
- force_refresh=force_refresh,
- lazy=not force_refresh,
)
+ # a playlist can only have one provider so simply pick the first one
prov_map = next(x for x in playlist.provider_mappings)
cache_checksum = playlist.cache_checksum
+ # playlist tracks ar enot stored in the db,
+ # we always fetched them (cached) from the provider
tracks = await self._get_provider_playlist_tracks(
prov_map.item_id,
prov_map.provider_instance,
cache_checksum=cache_checksum,
offset=offset,
limit=limit,
+ force_refresh=force_refresh,
)
if prefer_library_items:
final_tracks = []
continue
# ensure we have a full library track
- db_track = await self.mass.music.tracks.get(
- item_id, provider_instance_id_or_domain, lazy=False, add_to_library=True
+ full_track = await self.mass.music.tracks.get(
+ item_id,
+ provider_instance_id_or_domain,
+ recursive=provider_instance_id_or_domain != "library",
)
+ if full_track.provider == "library":
+ db_track = full_track
+ else:
+ db_track = await self.mass.music.tracks.add_item_to_library(full_track)
# a track can contain multiple versions on the same provider
# simply sort by quality and just add the first available version
for track_version in sorted(
# actually add the tracks to the playlist on the provider
await playlist_prov.add_playlist_tracks(playlist_prov_map.item_id, list(ids_to_add))
# invalidate cache so tracks get refreshed
- await self.get(
- playlist.item_id,
- playlist.provider,
- force_refresh=True,
- )
+ playlist.cache_checksum = str(time.time())
+ await self.update_item_in_library(db_playlist_id, playlist)
async def add_playlist_track(self, db_playlist_id: str | int, track_uri: str) -> None:
"""Add (single) track to playlist."""
continue
await provider.remove_playlist_tracks(prov_mapping.item_id, positions_to_remove)
# invalidate cache so tracks get refreshed
- await self.get(
- playlist.item_id,
- playlist.provider,
- force_refresh=True,
- )
+ playlist.cache_checksum = str(time.time())
+ await self.update_item_in_library(db_playlist_id, playlist)
async def get_all_playlist_tracks(
self, playlist: Playlist, prefer_library_items: bool = False
cache_checksum: Any = None,
offset: int = 0,
limit: int = 50,
+ force_refresh: bool = False,
) -> list[PlaylistTrack]:
"""Return playlist tracks for the given provider playlist id."""
assert provider_instance_id_or_domain != "library"
return []
# prefer cache items (if any)
cache_key = f"{provider.lookup_key}.playlist.{item_id}.tracks.{offset}.{limit}"
- if (cache := await self.mass.cache.get(cache_key, checksum=cache_checksum)) is not None:
+ if (
+ not force_refresh
+ and (cache := await self.mass.cache.get(cache_key, checksum=cache_checksum)) is not None
+ ):
return [PlaylistTrack.from_dict(x) for x in cache]
- # no items in cache - get listing from provider
+ # no items in cache (or force_refresh) - get listing from provider
result: list[Track] = []
for item in await provider.get_playlist_tracks(item_id, offset=offset, limit=limit):
# double check if position set
self,
item_id: str,
provider_instance_id_or_domain: str,
- force_refresh: bool = False,
- lazy: bool = True,
- details: Track = None,
+ recursive: bool = True,
album_uri: str | None = None,
- add_to_library: bool = False,
) -> Track:
"""Return (full) details for a single media item."""
track = await super().get(
item_id,
provider_instance_id_or_domain,
- force_refresh=force_refresh,
- lazy=lazy,
- details=details,
- add_to_library=add_to_library,
)
+ if not recursive and album_uri is None:
+ # return early if we do not want recursive full details and no album uri is provided
+ return track
+
# append full album details to full track item (resolve ItemMappings)
try:
if album_uri and (album := await self.mass.music.get_item_by_uri(album_uri)):
)
elif isinstance(track.album, ItemMapping) or (track.album and not track.album.image):
track.album = await self.mass.music.albums.get(
- track.album.item_id,
- track.album.provider,
- lazy=lazy,
- details=None if isinstance(track.album, ItemMapping) else track.album,
- add_to_library=False, # TODO: make this configurable
+ track.album.item_id, track.album.provider, recursive=False
)
except MusicAssistantError as err:
# edge case where playlist track has invalid albumdetails
self.logger.warning("Unable to fetch album details %s - %s", track.album.uri, str(err))
+ if not recursive:
+ return track
+
# append artist details to full track item (resolve ItemMappings)
track_artists = []
for artist in track.artists:
await self.mass.music.artists.get(
artist.item_id,
artist.provider,
- lazy=lazy,
- add_to_library=False, # TODO: make this configurable
)
)
except MusicAssistantError as err:
provider_instance_id_or_domain: str,
) -> UniqueList[Track]:
"""Return all versions of a track we can find on all providers."""
- track = await self.get(item_id, provider_instance_id_or_domain, add_to_library=False)
+ track = await self.get(item_id, provider_instance_id_or_domain)
search_query = f"{track.artist_str} - {track.name}"
result: UniqueList[Track] = UniqueList()
for provider_id in self.mass.music.get_unique_providers():
query = f"WHERE {DB_TABLE_ALBUMS}.item_id in ({subquery})"
return await self.mass.music.albums._get_library_items_by_query(extra_query=query)
- async def _match(self, db_track: Track) -> None:
+ async def match_providers(self, db_track: Track) -> None:
"""Try to find matching track on all providers for the provided (database) track_id.
This is used to link objects of different providers/qualities together.
match_found = True
for provider_mapping in search_result_item.provider_mappings:
await self.add_provider_mapping(db_track.item_id, provider_mapping)
+ db_track.provider_mappings.add(provider_mapping)
if not match_found:
self.logger.debug(
ConfigValueType,
)
from music_assistant.common.models.enums import (
+ AlbumType,
ConfigEntryType,
ImageType,
MediaType,
ProviderFeature,
ProviderType,
)
-from music_assistant.common.models.errors import ProviderUnavailableError
+from music_assistant.common.models.errors import MediaNotFoundError, ProviderUnavailableError
from music_assistant.common.models.media_items import (
Album,
Artist,
}
DEFAULT_LANGUAGE = "en_US"
+REFRESH_INTERVAL = 60 * 60 * 24 * 90
+MAX_ONLINE_CALLS_PER_DAY = 30
class MetaDataController(CoreController):
"Music Assistant's core controller which handles all metadata for music."
)
self.manifest.icon = "book-information-variant"
+ self._reset_online_slots()
async def get_config_entries(
self,
await asyncio.to_thread(os.mkdir, self._collage_images_dir)
self.mass.streams.register_dynamic_route("/imageproxy", self.handle_imageproxy)
+ self.mass.call_later(60, self._metadata_scanner())
async def close(self) -> None:
"""Handle logic on server stop."""
# if we reach this point, we couldn't match the language
self.logger.warning("%s is not a valid language", lang)
- async def get_artist_metadata(self, artist: Artist) -> None:
- """Get/update rich metadata for an artist."""
- if not artist.mbid:
- # The musicbrainz ID is mandatory for all metadata lookups
- artist.mbid = await self.get_artist_mbid(artist)
- if not artist.mbid:
- return
- # collect metadata from all providers
- for provider in self.providers:
- if ProviderFeature.ARTIST_METADATA not in provider.supported_features:
- continue
- if metadata := await provider.get_artist_metadata(artist):
- artist.metadata.update(metadata)
- self.logger.debug(
- "Fetched metadata for Artist %s on provider %s",
- artist.name,
- provider.name,
- )
- # set timestamp, used to determine when this function was last called
- artist.metadata.last_refresh = int(time())
-
- async def get_album_metadata(self, album: Album) -> None:
- """Get/update rich metadata for an album."""
- # ensure the album has a musicbrainz id or artist(s)
- if not (album.mbid or album.artists):
- return
- # collect metadata from all providers
- for provider in self.providers:
- if ProviderFeature.ALBUM_METADATA not in provider.supported_features:
- continue
- if metadata := await provider.get_album_metadata(album):
- album.metadata.update(metadata)
- self.logger.debug(
- "Fetched metadata for Album %s on provider %s",
- album.name,
- provider.name,
- )
- # set timestamp, used to determine when this function was last called
- album.metadata.last_refresh = int(time())
-
- async def get_track_metadata(self, track: Track) -> None:
- """Get/update rich metadata for a track."""
- if not (track.album and track.artists):
- return
- # collect metadata from all providers
- for provider in self.providers:
- if ProviderFeature.TRACK_METADATA not in provider.supported_features:
- continue
- if metadata := await provider.get_track_metadata(track):
- track.metadata.update(metadata)
- self.logger.debug(
- "Fetched metadata for Track %s on provider %s",
- track.name,
- provider.name,
- )
- # set timestamp, used to determine when this function was last called
- track.metadata.last_refresh = int(time())
-
- async def get_playlist_metadata(self, playlist: Playlist) -> None:
- """Get/update rich metadata for a playlist."""
- playlist.metadata.genres = set()
- all_playlist_tracks_images = set()
- playlist_genres: dict[str, int] = {}
- # retrieve metedata for the playlist from the tracks (such as genres etc.)
- # TODO: retrieve style/mood ?
- playlist_items = await self.mass.music.playlists.tracks(playlist.item_id, playlist.provider)
- for track in playlist_items:
- if track.image:
- all_playlist_tracks_images.add(track.image)
- if track.metadata.genres:
- genres = track.metadata.genres
- elif track.album and isinstance(track.album, Album) and track.album.metadata.genres:
- genres = track.album.metadata.genres
- else:
- genres = set()
- for genre in genres:
- if genre not in playlist_genres:
- playlist_genres[genre] = 0
- playlist_genres[genre] += 1
- await asyncio.sleep(0) # yield to eventloop
-
- playlist_genres_filtered = {genre for genre, count in playlist_genres.items() if count > 5}
- playlist.metadata.genres.update(playlist_genres_filtered)
- # create collage images
- cur_images = playlist.metadata.images or []
- new_images = []
- # thumb image
- thumb_image = next((x for x in cur_images if x.type == ImageType.THUMB), None)
- if not thumb_image or self._collage_images_dir in thumb_image.path:
- thumb_image_path = (
- thumb_image.path
- if thumb_image
- else os.path.join(self._collage_images_dir, f"{uuid4().hex}_thumb.jpg")
- )
- if collage_thumb_image := await self.create_collage_image(
- all_playlist_tracks_images, thumb_image_path
- ):
- new_images.append(collage_thumb_image)
- elif thumb_image:
- # just use old image
- new_images.append(thumb_image)
- # fanart image
- fanart_image = next((x for x in cur_images if x.type == ImageType.FANART), None)
- if not fanart_image or self._collage_images_dir in fanart_image.path:
- fanart_image_path = (
- fanart_image.path
- if fanart_image
- else os.path.join(self._collage_images_dir, f"{uuid4().hex}_fanart.jpg")
- )
- if collage_fanart_image := await self.create_collage_image(
- all_playlist_tracks_images, fanart_image_path, fanart=True
- ):
- new_images.append(collage_fanart_image)
- elif fanart_image:
- # just use old image
- new_images.append(fanart_image)
- playlist.metadata.images = new_images
- # set timestamp, used to determine when this function was last called
- playlist.metadata.last_refresh = int(time())
-
- async def get_radio_metadata(self, radio: Radio) -> None:
- """Get/update rich metadata for a radio station."""
- # NOTE: we do not have any metadata for radio so consider this future proofing ;-)
- radio.metadata.last_refresh = int(time())
-
- async def get_metadata(self, item: MediaItemType) -> None:
- """Get/update rich metadata for/on given MediaItem."""
+ @api_command("metadata/update_metadata")
+ async def update_metadata(self, item: str | MediaItemType, force_refresh: bool = False) -> None:
+ """Get/update extra/enhanced metadata for/on given MediaItem."""
+ if isinstance(item, str):
+ item = await self.mass.music.get_item_by_uri(item)
+ if item.provider != "library":
+ # this shouldn't happen but just in case.
+ raise RuntimeError("Metadata can only be updated for library items")
if item.media_type == MediaType.ARTIST:
- await self.get_artist_metadata(item)
+ await self._update_artist_metadata(item)
if item.media_type == MediaType.ALBUM:
- await self.get_album_metadata(item)
+ await self._update_album_metadata(item)
if item.media_type == MediaType.TRACK:
- await self.get_track_metadata(item)
+ await self._update_track_metadata(item)
if item.media_type == MediaType.PLAYLIST:
- await self.get_playlist_metadata(item)
+ await self._update_playlist_metadata(item)
if item.media_type == MediaType.RADIO:
- await self.get_radio_metadata(item)
-
- async def get_artist_mbid(self, artist: Artist) -> str | None:
- """Fetch musicbrainz id by performing search using the artist name, albums and tracks."""
- if compare_strings(artist.name, VARIOUS_ARTISTS_NAME):
- return VARIOUS_ARTISTS_ID_MBID
- ref_albums = await self.mass.music.artists.albums(
- artist.item_id, artist.provider, in_library_only=False
- )
- ref_tracks = await self.mass.music.artists.tracks(
- artist.item_id, artist.provider, in_library_only=False
- )
- # start lookup of musicbrainz id
- musicbrainz: MusicbrainzProvider = self.mass.get_provider("musicbrainz")
- assert musicbrainz
- if mbid := await musicbrainz.get_musicbrainz_artist_id(
- artist, ref_albums=ref_albums, ref_tracks=ref_tracks
- ):
- return mbid
-
- # lookup failed
- ref_albums_str = "/".join(x.name for x in ref_albums) or "none"
- ref_tracks_str = "/".join(x.name for x in ref_tracks) or "none"
- self.logger.debug(
- "Unable to get musicbrainz ID for artist %s\n"
- " - using lookup-album(s): %s\n"
- " - using lookup-track(s): %s\n",
- artist.name,
- ref_albums_str,
- ref_tracks_str,
- )
- return None
+ await self._update_radio_metadata(item)
async def get_image_data_for_item(
self,
exc_info=err if self.logger.isEnabledFor(10) else None,
)
return None
+
+ async def _update_artist_metadata(self, artist: Artist, force_refresh: bool = False) -> None:
+ """Get/update rich metadata for an artist."""
+ # ensure the item is matched to all providers
+ await self.mass.music.artists.match_providers(artist)
+ # collect metadata from all music providers first
+ unique_keys: set[str] = set()
+ for prov_mapping in artist.provider_mappings:
+ if (prov := self.mass.get_provider(prov_mapping.provider_instance)) is None:
+ continue
+ if prov.lookup_key in unique_keys:
+ continue
+ unique_keys.add(prov.lookup_key)
+ with suppress(MediaNotFoundError):
+ prov_item = await self.mass.music.artists.get_provider_item(
+ prov_mapping.item_id, prov_mapping.provider_instance
+ )
+ artist.metadata.update(prov_item.metadata)
+
+ # collect metadata from all (online) metadata providers
+ # NOTE: we only allow this every REFRESH_INTERVAL and a max amount of calls per day
+ # to not overload the (free) metadata providers with api calls
+ # TODO: Utilize a global (cloud) cache for metadata lookups to save on API calls
+
+ if self._online_slots_available and (
+ force_refresh or (time() - (artist.metadata.last_refresh or 0)) > REFRESH_INTERVAL
+ ):
+ self._online_slots_available -= 1
+ # set timestamp, used to determine when this function was last called
+ artist.metadata.last_refresh = int(time())
+
+ # TODO: Use a global cache/proxy for the MB lookups to save on API calls
+ artist.mbid = artist.mbid or await self._get_artist_mbid(artist)
+ if artist.mbid:
+ # The musicbrainz ID is mandatory for all metadata lookups
+ for provider in self.providers:
+ if ProviderFeature.ARTIST_METADATA not in provider.supported_features:
+ continue
+ if metadata := await provider.get_artist_metadata(artist):
+ artist.metadata.update(metadata)
+ self.logger.debug(
+ "Fetched metadata for Artist %s on provider %s",
+ artist.name,
+ provider.name,
+ )
+ # update final item in library database
+ await self.mass.music.artists.update_item_in_library(artist.item_id, artist)
+
+ async def _update_album_metadata(self, album: Album, force_refresh: bool = False) -> None:
+ """Get/update rich metadata for an album."""
+ # ensure the item is matched to all providers (will also get other quality versions)
+ await self.mass.music.albums.match_providers(album)
+ # collect metadata from all music providers first
+ unique_keys: set[str] = set()
+ for prov_mapping in album.provider_mappings:
+ if (prov := self.mass.get_provider(prov_mapping.provider_instance)) is None:
+ continue
+ if prov.lookup_key in unique_keys:
+ continue
+ unique_keys.add(prov.lookup_key)
+ with suppress(MediaNotFoundError):
+ prov_item = await self.mass.music.albums.get_provider_item(
+ prov_mapping.item_id, prov_mapping.provider_instance
+ )
+ album.metadata.update(prov_item.metadata)
+ if album.year is None and prov_item.year:
+ album.year = prov_item
+ if album.album_type == AlbumType.UNKNOWN:
+ album.album_type = prov_item.album_type
+
+ # collect metadata from all (online) metadata providers
+ # NOTE: we only allow this every REFRESH_INTERVAL and a max amount of calls per day
+ # to not overload the (free) metadata providers with api calls
+ # TODO: Utilize a global (cloud) cache for metadata lookups to save on API calls
+ if (
+ self._online_slots_available
+ and (force_refresh or (time() - (album.metadata.last_refresh or 0)) > REFRESH_INTERVAL)
+ and (album.mbid or album.artists)
+ ):
+ self._online_slots_available -= 1
+ # set timestamp, used to determine when this function was last called
+ album.metadata.last_refresh = int(time())
+
+ # collect metadata from all providers
+ for provider in self.providers:
+ if ProviderFeature.ALBUM_METADATA not in provider.supported_features:
+ continue
+ if metadata := await provider.get_album_metadata(album):
+ album.metadata.update(metadata)
+ self.logger.debug(
+ "Fetched metadata for Album %s on provider %s",
+ album.name,
+ provider.name,
+ )
+ # update final item in library database
+ await self.mass.music.artists.update_item_in_library(album.item_id, album)
+
+ async def _update_track_metadata(self, track: Track, force_refresh: bool = False) -> None:
+ """Get/update rich metadata for a track."""
+ # ensure the item is matched to all providers (will also get other quality versions)
+ await self.mass.music.tracks.match_providers(track)
+ # collect metadata from all music providers first
+ unique_keys: set[str] = set()
+ for prov_mapping in track.provider_mappings:
+ if (prov := self.mass.get_provider(prov_mapping.provider_instance)) is None:
+ continue
+ if prov.lookup_key in unique_keys:
+ continue
+ unique_keys.add(prov.lookup_key)
+ with suppress(MediaNotFoundError):
+ prov_item = await self.mass.music.albums.get_provider_item(
+ prov_mapping.item_id, prov_mapping.provider_instance
+ )
+ track.metadata.update(prov_item.metadata)
+
+ # collect metadata from all (online) metadata providers
+ # NOTE: we only allow this every REFRESH_INTERVAL and a max amount of calls per day
+ # to not overload the (free) metadata providers with api calls
+ # TODO: Utilize a global (cloud) cache for metadata lookups to save on API calls
+ if (
+ self._online_slots_available
+ and (force_refresh or (time() - (track.metadata.last_refresh or 0)) > REFRESH_INTERVAL)
+ and (track.mbid or track.artists or track.album)
+ ):
+ self._online_slots_available -= 1
+ # set timestamp, used to determine when this function was last called
+ track.metadata.last_refresh = int(time())
+
+ # collect metadata from all providers
+ for provider in self.providers:
+ if ProviderFeature.TRACK_METADATA not in provider.supported_features:
+ continue
+ if metadata := await provider.get_track_metadata(track):
+ track.metadata.update(metadata)
+ self.logger.debug(
+ "Fetched metadata for Track %s on provider %s",
+ track.name,
+ provider.name,
+ )
+ # update final item in library database
+ await self.mass.music.artists.update_item_in_library(track.item_id, track)
+
+ async def _update_playlist_metadata(
+ self, playlist: Playlist, force_refresh: bool = False
+ ) -> None:
+ """Get/update rich metadata for a playlist."""
+ if not force_refresh and (time() - (playlist.metadata.last_refresh or 0)) < (
+ 60 * 60 * 24 * 5
+ ):
+ return
+ playlist.metadata.genres = set()
+ all_playlist_tracks_images = set()
+ playlist_genres: dict[str, int] = {}
+ # retrieve metadata for the playlist from the tracks (such as genres etc.)
+ # TODO: retrieve style/mood ?
+ playlist_items = await self.mass.music.playlists.tracks(playlist.item_id, playlist.provider)
+ for track in playlist_items:
+ if track.image:
+ all_playlist_tracks_images.add(track.image)
+ if track.metadata.genres:
+ genres = track.metadata.genres
+ elif track.album and isinstance(track.album, Album) and track.album.metadata.genres:
+ genres = track.album.metadata.genres
+ else:
+ genres = set()
+ for genre in genres:
+ if genre not in playlist_genres:
+ playlist_genres[genre] = 0
+ playlist_genres[genre] += 1
+ await asyncio.sleep(0) # yield to eventloop
+
+ playlist_genres_filtered = {genre for genre, count in playlist_genres.items() if count > 5}
+ playlist.metadata.genres.update(playlist_genres_filtered)
+ # create collage images
+ cur_images = playlist.metadata.images or []
+ new_images = []
+ # thumb image
+ thumb_image = next((x for x in cur_images if x.type == ImageType.THUMB), None)
+ if not thumb_image or self._collage_images_dir in thumb_image.path:
+ thumb_image_path = (
+ thumb_image.path
+ if thumb_image
+ else os.path.join(self._collage_images_dir, f"{uuid4().hex}_thumb.jpg")
+ )
+ if collage_thumb_image := await self.create_collage_image(
+ all_playlist_tracks_images, thumb_image_path
+ ):
+ new_images.append(collage_thumb_image)
+ elif thumb_image:
+ # just use old image
+ new_images.append(thumb_image)
+ # fanart image
+ fanart_image = next((x for x in cur_images if x.type == ImageType.FANART), None)
+ if not fanart_image or self._collage_images_dir in fanart_image.path:
+ fanart_image_path = (
+ fanart_image.path
+ if fanart_image
+ else os.path.join(self._collage_images_dir, f"{uuid4().hex}_fanart.jpg")
+ )
+ if collage_fanart_image := await self.create_collage_image(
+ all_playlist_tracks_images, fanart_image_path, fanart=True
+ ):
+ new_images.append(collage_fanart_image)
+ elif fanart_image:
+ # just use old image
+ new_images.append(fanart_image)
+ playlist.metadata.images = new_images
+ # set timestamp, used to determine when this function was last called
+ playlist.metadata.last_refresh = int(time())
+
+ async def _update_radio_metadata(self, radio: Radio, force_refresh: bool = False) -> None:
+ """Get/update rich metadata for a radio station."""
+ # NOTE: we do not have any metadata for radio so consider this future proofing ;-)
+ radio.metadata.last_refresh = int(time())
+
+ async def _get_artist_mbid(self, artist: Artist) -> str | None:
+ """Fetch musicbrainz id by performing search using the artist name, albums and tracks."""
+ if compare_strings(artist.name, VARIOUS_ARTISTS_NAME):
+ return VARIOUS_ARTISTS_ID_MBID
+ ref_albums = await self.mass.music.artists.albums(
+ artist.item_id, artist.provider, in_library_only=False
+ )
+ ref_tracks = await self.mass.music.artists.tracks(
+ artist.item_id, artist.provider, in_library_only=False
+ )
+ # start lookup of musicbrainz id
+ musicbrainz: MusicbrainzProvider = self.mass.get_provider("musicbrainz")
+ assert musicbrainz
+ if mbid := await musicbrainz.get_musicbrainz_artist_id(
+ artist, ref_albums=ref_albums, ref_tracks=ref_tracks
+ ):
+ return mbid
+
+ # lookup failed
+ ref_albums_str = "/".join(x.name for x in ref_albums) or "none"
+ ref_tracks_str = "/".join(x.name for x in ref_tracks) or "none"
+ self.logger.debug(
+ "Unable to get musicbrainz ID for artist %s\n"
+ " - using lookup-album(s): %s\n"
+ " - using lookup-track(s): %s\n",
+ artist.name,
+ ref_albums_str,
+ ref_tracks_str,
+ )
+ return None
+
+ def _reset_online_slots(self) -> None:
+ self._online_slots_available = MAX_ONLINE_CALLS_PER_DAY
+ # reschedule self in 24 hours
+ self.mass.loop.call_later(60 * 60 * 24, self._reset_online_slots)
+
+ async def _metadata_scanner(self) -> None:
+ """Continuously (slow) background scanner for (missing) metadata."""
+ while True:
+ for artist in await self.mass.music.artists.library_items(order_by="random"):
+ if (time() - (artist.metadata.last_refresh or 0)) < REFRESH_INTERVAL:
+ await asyncio.sleep(2)
+ continue
+ await self._update_artist_metadata(artist)
+ await asyncio.sleep(60)
+ for album in await self.mass.music.albums.library_items(order_by="random"):
+ if (time() - (album.metadata.last_refresh or 0)) < REFRESH_INTERVAL:
+ await asyncio.sleep(2)
+ continue
+ await self._update_album_metadata(album)
+ await asyncio.sleep(60)
+ for track in await self.mass.music.tracks.library_items(order_by="random"):
+ if (time() - (track.metadata.last_refresh or 0)) < REFRESH_INTERVAL:
+ await asyncio.sleep(2)
+ continue
+ await self._update_track_metadata(track)
+ await asyncio.sleep(60)
+ for playlist in await self.mass.music.playlists.library_items(order_by="random"):
+ await self._update_playlist_metadata(playlist)
+ await asyncio.sleep(60)
)
from music_assistant.server.helpers.api import api_command
from music_assistant.server.helpers.database import DatabaseConnection
+from music_assistant.server.helpers.util import TaskManager
from music_assistant.server.models.core_controller import CoreController
from .media.albums import AlbumsController
item = await ctrl.get(
db_row["item_id"],
db_row["provider"],
- add_to_library=False,
- lazy=True,
- force_refresh=False,
)
result.append(item)
return result
@api_command("music/item_by_uri")
- async def get_item_by_uri(
- self, uri: str, lazy: bool = True, add_to_library: bool = False
- ) -> MediaItemType:
+ async def get_item_by_uri(self, uri: str) -> MediaItemType:
"""Fetch MediaItem by uri."""
media_type, provider_instance_id_or_domain, item_id = await parse_uri(uri)
return await self.get_item(
media_type=media_type,
item_id=item_id,
provider_instance_id_or_domain=provider_instance_id_or_domain,
- lazy=lazy,
- add_to_library=add_to_library,
)
@api_command("music/item")
media_type: MediaType,
item_id: str,
provider_instance_id_or_domain: str,
- force_refresh: bool = False,
- lazy: bool = True,
- add_to_library: bool = False,
) -> MediaItemType:
"""Get single music item by id and media type."""
if provider_instance_id_or_domain == "database":
return await ctrl.get(
item_id=item_id,
provider_instance_id_or_domain=provider_instance_id_or_domain,
- force_refresh=force_refresh,
- lazy=lazy,
- add_to_library=add_to_library,
)
@api_command("music/favorites/add_item")
item.media_type,
item.item_id,
item.provider,
- lazy=False,
- add_to_library=True,
)
+ if full_item.provider != "library":
+ full_item = await self.add_item_to_library(full_item)
# set favorite in library db
ctrl = self.get_controller(item.media_type)
await ctrl.set_favorite(
provider = self.mass.get_provider(item.provider)
if provider.library_edit_supported(item.media_type):
await provider.library_add(item)
- return await ctrl.get(
- item_id=item.item_id,
- provider_instance_id_or_domain=item.provider,
- details=item,
- add_to_library=True,
- )
+ return await ctrl.add_item_to_library(item)
async def refresh_items(self, items: list[MediaItemType]) -> None:
"""Refresh MediaItems to force retrieval of full info and matches.
Creates background tasks to process the action.
"""
- for media_item in items:
- self.mass.create_task(self.refresh_item(media_item))
+ async with TaskManager(self.mass) as tg:
+ for media_item in items:
+ tg.create_task(self.refresh_item(media_item))
@api_command("music/refresh_item")
async def refresh_item(
self,
- media_item: MediaItemType,
+ media_item: str | MediaItemType,
) -> MediaItemType | None:
"""Try to refresh a mediaitem by requesting it's full object or search for substitutes."""
- try:
- return await self.get_item(
- media_item.media_type,
- media_item.item_id,
- media_item.provider,
- force_refresh=True,
- lazy=False,
- add_to_library=True,
- )
- except MusicAssistantError:
- pass
+ if isinstance(media_item, str):
+ # media item uri given
+ media_item = await self.get_item_by_uri(media_item)
- searchresult = await self.search(media_item.name, [media_item.media_type], 20)
- if media_item.media_type == MediaType.ARTIST:
- result = searchresult.artists
- elif media_item.media_type == MediaType.ALBUM:
- result = searchresult.albums
- elif media_item.media_type == MediaType.TRACK:
- result = searchresult.tracks
- elif media_item.media_type == MediaType.PLAYLIST:
- result = searchresult.playlists
+ media_type = media_item.media_type
+ ctrl = self.get_controller(media_type)
+ is_library_item = media_item.provider == "library"
+
+ # fetch the first (available) provider item
+ for prov_mapping in media_item.provider_mappings:
+ provider = prov_mapping.provider_instance
+ item_id = prov_mapping.item_id
+ if prov_mapping.available:
+ break
else:
- result = searchresult.radio
- for item in result:
- if item.available:
- return await self.get_item(
- item.media_type,
- item.item_id,
- item.provider,
- lazy=False,
- add_to_library=True,
- )
- return None
+ # try to find a substitute
+ searchresult = await self.search(media_item.name, [media_item.media_type], 20)
+ if media_item.media_type == MediaType.ARTIST:
+ result = searchresult.artists
+ elif media_item.media_type == MediaType.ALBUM:
+ result = searchresult.albums
+ elif media_item.media_type == MediaType.TRACK:
+ result = searchresult.tracks
+ elif media_item.media_type == MediaType.PLAYLIST:
+ result = searchresult.playlists
+ else:
+ result = searchresult.radio
+ for item in result:
+ if item.available:
+ provider = item.provider
+ item_id = item.item_id
+ break
+ else:
+ # raise if we didn't find a substitute
+ raise MediaNotFoundError(f"Could not find a substitute for {media_item.name}")
+ # fetch full (provider) item
+ media_item = await ctrl.get_provider_item(item_id, provider, force_refresh=True)
+ # update library item if needed (including refresh of the metadata etc.)
+ if is_library_item:
+ return await ctrl.add_item_to_library(media_item, metadata_lookup=True)
+
+ return media_item
async def set_track_loudness(
self, item_id: str, provider_instance_id_or_domain: str, loudness: LoudnessMeasurement
"""Mark item as played in playlog."""
timestamp = utc_timestamp()
- if provider_instance_id_or_domain == "builtin":
+ if provider_instance_id_or_domain == "builtin" and media_type != MediaType.PLAYLIST:
# we deliberately skip builtin provider items as those are often
# one-off items like TTS or some sound effect etc.
return
# also update playcount in library table
ctrl = self.get_controller(media_type)
- if self.mass.config.get_raw_core_config_value(self.domain, CONF_ADD_LIBRARY_ON_PLAY):
+ db_item = await ctrl.get_library_item_by_prov_id(item_id, provider_instance_id_or_domain)
+ if (
+ not db_item
+ and media_type in (MediaType.TRACK, MediaType.RADIO)
+ and self.mass.config.get_raw_core_config_value(self.domain, CONF_ADD_LIBRARY_ON_PLAY)
+ ):
# handle feature to add to the lib on playback
- db_item = await ctrl.get(
- item_id, provider_instance_id_or_domain, lazy=False, add_to_library=True
- )
- else:
- db_item = await ctrl.get_library_item_by_prov_id(
- item_id, provider_instance_id_or_domain
- )
+ full_item = await ctrl.get(item_id, provider_instance_id_or_domain)
+ db_item = await ctrl.add_item_to_library(full_item)
+
if db_item:
await self.database.execute(
f"UPDATE {ctrl.db_table} SET play_count = play_count + 1, "
radio_source.append(media_item)
elif media_item.media_type == MediaType.PLAYLIST:
tracks += await self.mass.music.playlists.get_all_playlist_tracks(media_item)
- await self.mass.music.mark_item_played(
- media_item.media_type, media_item.item_id, media_item.provider
+ self.mass.create_task(
+ self.mass.music.mark_item_played(
+ media_item.media_type, media_item.item_id, media_item.provider
+ )
)
elif media_item.media_type == MediaType.ARTIST:
tracks += await self.get_artist_tracks(media_item)
- await self.mass.music.mark_item_played(
- media_item.media_type, media_item.item_id, media_item.provider
+ self.mass.create_task(
+ self.mass.music.mark_item_played(
+ media_item.media_type, media_item.item_id, media_item.provider
+ )
)
elif media_item.media_type == MediaType.ALBUM:
tracks += await self.get_album_tracks(media_item)
- await self.mass.music.mark_item_played(
- media_item.media_type, media_item.item_id, media_item.provider
+ self.mass.create_task(
+ self.mass.music.mark_item_played(
+ media_item.media_type, media_item.item_id, media_item.provider
+ )
)
else:
# single track or radio item