Playlist,
Track,
)
+from music_assistant_models.unique_list import UniqueList
from music_assistant.constants import (
CONF_LANGUAGE,
)
self.manifest.icon = "book-information-variant"
self._lookup_jobs: MetadataLookupQueue = MetadataLookupQueue(100)
- self._lookup_task: asyncio.Task | None = None
+ self._lookup_task: asyncio.Task[None] | None = None
self._throttler = Throttler(1, 30)
- self._missing_metadata_scan_task: asyncio.Task | None = None
+ self._missing_metadata_scan_task: asyncio.Task[None] | None = None
async def get_config_entries(
self,
@property
def providers(self) -> list[MetadataProvider]:
"""Return all loaded/running MetadataProviders."""
- if TYPE_CHECKING:
- return cast("list[MetadataProvider]", self.mass.get_providers(ProviderType.METADATA))
- return self.mass.get_providers(ProviderType.METADATA)
+ return cast("list[MetadataProvider]", self.mass.get_providers(ProviderType.METADATA))
@property
def preferred_language(self) -> str:
@property
def locale(self) -> str:
"""Return preferred language for metadata (as full locale code 'en_EN')."""
- return self.mass.config.get_raw_core_config_value(
+ value = self.mass.config.get_raw_core_config_value(
self.domain, CONF_LANGUAGE, DEFAULT_LANGUAGE
)
+ return str(value)
@api_command("metadata/set_default_preferred_language")
def set_default_preferred_language(self, lang: str) -> None:
) -> MediaItemType:
"""Get/update extra/enhanced metadata for/on given MediaItem."""
if isinstance(item, str):
- item = await self.mass.music.get_item_by_uri(item)
+ retrieved_item = await self.mass.music.get_item_by_uri(item)
+ if isinstance(retrieved_item, BrowseFolder):
+ raise TypeError("Cannot update metadata on a BrowseFolder item.")
+ item = retrieved_item
+
if item.provider != "library":
# this shouldn't happen but just in case.
raise RuntimeError("Metadata can only be updated for library items")
+
# just in case it was in the queue, prevent duplicate lookups
- self._lookup_jobs.pop(item.uri)
+ if item.uri:
+ self._lookup_jobs.pop(item.uri)
async with self._throttler:
if item.media_type == MediaType.ARTIST:
- await self._update_artist_metadata(item, force_refresh=force_refresh)
+ await self._update_artist_metadata(
+ cast("Artist", item), force_refresh=force_refresh
+ )
if item.media_type == MediaType.ALBUM:
- await self._update_album_metadata(item, force_refresh=force_refresh)
+ await self._update_album_metadata(cast("Album", item), force_refresh=force_refresh)
if item.media_type == MediaType.TRACK:
- await self._update_track_metadata(item, force_refresh=force_refresh)
+ await self._update_track_metadata(cast("Track", item), force_refresh=force_refresh)
if item.media_type == MediaType.PLAYLIST:
- await self._update_playlist_metadata(item, force_refresh=force_refresh)
+ await self._update_playlist_metadata(
+ cast("Playlist", item), force_refresh=force_refresh
+ )
return item
def schedule_update_metadata(self, uri: str) -> None:
)
if not img_path:
return None
- return await self.get_thumbnail(img_path, size)
+ thumbnail = await self.get_thumbnail(img_path, provider="builtin", size=size)
+
+ return cast("bytes", thumbnail)
async def get_image_url_for_item(
self,
if isinstance(media_item, ItemMapping):
# Check if the ItemMapping already has an image - avoid expensive API call
if media_item.image and media_item.image.type == img_type:
- if not media_item.image.remotely_accessible and resolve:
+ if media_item.image.remotely_accessible and resolve:
return self.get_image_url(media_item.image)
- return media_item.image.path
+ elif not media_item.image.remotely_accessible:
+ return media_item.image.path
# Only retrieve full item if we don't have the image we need
- assert media_item.uri is not None # guard for type checker
+ if not media_item.uri:
+ return None
retrieved_item = await self.mass.music.get_item_by_uri(media_item.uri)
if isinstance(retrieved_item, BrowseFolder):
return None # can not happen, but guard for type checker
media_item = cast("MediaItemType", retrieved_item)
+ if media_item and media_item.metadata.images:
+ for img in media_item.metadata.images:
+ if img.type != img_type:
+ continue
+ if not img.remotely_accessible and not resolve:
+ # ignore image if its not remotely accessible and we don't allow resolving
+ continue
+ return self.get_image_url(img, prefer_proxy=not img.remotely_accessible)
+
# retry with track's album
- if media_item.media_type == MediaType.TRACK and media_item.album:
+ if isinstance(media_item, Track) and media_item.album:
return await self.get_image_url_for_item(media_item.album, img_type, resolve)
# try artist instead for albums
- if media_item.media_type == MediaType.ALBUM and media_item.artists:
+ if isinstance(media_item, Album) and media_item.artists:
return await self.get_image_url_for_item(media_item.artists[0], img_type, resolve)
# last resort: track artist(s)
- if media_item.media_type == MediaType.TRACK and media_item.artists:
+ if isinstance(media_item, Track) and media_item.artists:
for artist in media_item.artists:
return await self.get_image_url_for_item(artist, img_type, resolve)
if provider == "builtin" and path.startswith("/collage/"):
# special case for collage images
path = os.path.join(self._collage_images_dir, path.split("/collage/")[-1])
- thumbnail = await get_image_thumb(
+ thumbnail_bytes = await get_image_thumb(
self.mass, path, size=size, provider=provider, image_format=image_format
)
if base64:
- enc_image = b64encode(thumbnail).decode()
- thumbnail = f"data:image/{image_format};base64,{enc_image}"
- return thumbnail
+ enc_image = b64encode(thumbnail_bytes).decode()
+ return f"data:image/{image_format};base64,{enc_image}"
+ return thumbnail_bytes
async def handle_imageproxy(self, request: web.Request) -> web.Response:
"""Handle request for image proxy."""
track.metadata.update(prov_item.metadata)
# collect metadata from all [metadata] providers
- # there is only little metadata available for tracks so we only fetch metadata
- # from other sources if the force flag is set
- if force_refresh and self.config.get_value(CONF_ENABLE_ONLINE_METADATA):
+ # Only fetch metadata from these sources if force_refresh is set OR
+ # if the track needs a refresh (based on REFRESH_INTERVAL_TRACKS) AND
+ # online metadata is enabled.
+ if (force_refresh or needs_refresh) and self.config.get_value(CONF_ENABLE_ONLINE_METADATA):
for provider in self.providers:
if ProviderFeature.TRACK_METADATA not in provider.supported_features:
continue
+
if metadata := await provider.get_track_metadata(track):
track.metadata.update(metadata)
self.logger.debug(
await asyncio.sleep(0) # yield to eventloop
playlist_genres_filtered = {genre for genre, count in playlist_genres.items() if count > 5}
- playlist_genres_filtered = list(playlist_genres_filtered)[:8]
+ playlist_genres_filtered = set(list(playlist_genres_filtered)[:8])
playlist.metadata.genres.update(playlist_genres_filtered)
# create collage images
- cur_images = playlist.metadata.images or []
+ cur_images: list[MediaItemImage] = playlist.metadata.images or []
new_images = []
# thumb image
thumb_image = next((x for x in cur_images if x.type == ImageType.THUMB), None)
elif fanart_image:
# just use old image
new_images.append(fanart_image)
- playlist.metadata.images = new_images
+ playlist.metadata.images = UniqueList(new_images) if new_images else None
# set timestamp, used to determine when this function was last called
playlist.metadata.last_refresh = int(time())
# update final item in library database
if compare_strings(artist.name, VARIOUS_ARTISTS_NAME):
return VARIOUS_ARTISTS_MBID
- musicbrainz: MusicbrainzProvider = self.mass.get_provider("musicbrainz")
+ musicbrainz_provider = self.mass.get_provider("musicbrainz")
+ if not musicbrainz_provider:
+ return None
+ musicbrainz: MusicbrainzProvider = cast("MusicbrainzProvider", musicbrainz_provider)
if TYPE_CHECKING:
- musicbrainz = cast("MusicbrainzProvider", musicbrainz)
+ assert isinstance(musicbrainz, MusicbrainzProvider)
# first try with resource URL (e.g. streaming provider share URL)
for prov_mapping in artist.provider_mappings:
if prov_mapping.url and prov_mapping.url.startswith("http"):
item_uri = await self._lookup_jobs.get()
try:
item = await self.mass.music.get_item_by_uri(item_uri)
- await self.update_metadata(item)
+ # Type check to ensure it's a valid MediaItemType
+ if hasattr(item, "provider") and hasattr(item, "media_type"):
+ await self.update_metadata(cast("MediaItemType", item))
+ else:
+ self.logger.warning(
+ "Retrieved item is not a valid MediaItemType: %s", type(item)
+ )
except MediaNotFoundError:
# this can happen when the item is removed from the library
pass
f"OR json_extract({DB_TABLE_ARTISTS}.metadata,'$.images') = '[]')"
)
for artist in await self.mass.music.artists.library_items(extra_query=query):
- self.schedule_update_metadata(artist.uri)
+ if artist.uri:
+ self.schedule_update_metadata(artist.uri)
# Scan for missing album images
self.logger.debug("Start lookup for missing album images...")
for album in await self.mass.music.albums.library_items(
limit=50, order_by="random", extra_query=query
):
- self.schedule_update_metadata(album.uri)
+ if album.uri:
+ self.schedule_update_metadata(album.uri)
# Force refresh playlist metadata every refresh interval
# this will e.g. update the playlist image and genres if the tracks have changed
for playlist in await self.mass.music.playlists.library_items(
limit=10, order_by="random", extra_query=query
):
- self.schedule_update_metadata(playlist.uri)
+ if playlist.uri:
+ self.schedule_update_metadata(playlist.uri)
-class MetadataLookupQueue(asyncio.Queue):
+class MetadataLookupQueue(asyncio.Queue[str]):
"""Representation of a queue for metadata lookups."""
def _init(self, maxlen: int) -> None: