def __hash__(self) -> int:
"""Return custom hash."""
- return hash((self.type.value, self.path))
+ return hash((self.type.value, self.provider, self.path))
def __eq__(self, other: object) -> bool:
"""Check equality of two items."""
continue
cur_val = getattr(self, fld.name)
if isinstance(cur_val, list) and isinstance(new_val, list):
- new_val = merge_lists(cur_val, new_val)
+ new_val = UniqueList(merge_lists(cur_val, new_val))
setattr(self, fld.name, new_val)
elif isinstance(cur_val, set) and isinstance(new_val, set | list | tuple):
cur_val.update(new_val)
)
self.manifest.icon = "book-information-variant"
self._scanner_task: asyncio.Task | None = None
+ self._online_slots_available = MAX_ONLINE_CALLS_PER_RUN
async def get_config_entries(
self,
self.logger.warning("%s is not a valid language", lang)
@api_command("metadata/update_metadata")
- async def update_metadata(self, item: str | MediaItemType, force_refresh: bool = False) -> None:
+ async def update_metadata(
+ self, item: str | MediaItemType, force_refresh: bool = False
+ ) -> MediaItemType:
"""Get/update extra/enhanced metadata for/on given MediaItem."""
if isinstance(item, str):
item = await self.mass.music.get_item_by_uri(item)
await self._update_playlist_metadata(item, force_refresh=force_refresh)
if item.media_type == MediaType.RADIO:
await self._update_radio_metadata(item, force_refresh=force_refresh)
+ return item
@api_command("metadata/start_scan")
def start_metadata_scanner(self) -> None:
async def _update_artist_metadata(self, artist: Artist, force_refresh: bool = False) -> None:
"""Get/update rich metadata for an artist."""
+ self.logger.debug("Updating metadata for Artist %s", artist.name)
unique_keys: set[str] = set()
# collect (local) metadata from all local providers
local_provs = get_global_cache_value("non_streaming_providers")
# to not overload the music/metadata providers with api calls
# TODO: Utilize a global (cloud) cache for metadata lookups to save on API calls
- if self.config.get_value(CONF_ENABLE_ONLINE_METADATA) and (
- force_refresh
- or (
- self._online_slots_available
- and ((time() - (artist.metadata.last_refresh or 0)) > REFRESH_INTERVAL)
- )
+ if force_refresh or (
+ self._online_slots_available
+ and ((time() - (artist.metadata.last_refresh or 0)) > REFRESH_INTERVAL)
):
self._online_slots_available -= 1
# set timestamp, used to determine when this function was last called
)
artist.metadata.update(prov_item.metadata)
- # TODO: Use a global cache/proxy for the MB lookups to save on API calls
+ # The musicbrainz ID is mandatory for all metadata lookups
if not artist.mbid:
+ # TODO: Use a global cache/proxy for the MB lookups to save on API calls
if mbid := await self._get_artist_mbid(artist):
artist.mbid = mbid
- if artist.mbid:
- # The musicbrainz ID is mandatory for all metadata lookups
+ # collect metadata from all (online) metadata providers
+ if self.config.get_value(CONF_ENABLE_ONLINE_METADATA) and artist.mbid:
for provider in self.providers:
if ProviderFeature.ARTIST_METADATA not in provider.supported_features:
continue
async def _update_album_metadata(self, album: Album, force_refresh: bool = False) -> None:
"""Get/update rich metadata for an album."""
+ self.logger.debug("Updating metadata for Album %s", album.name)
unique_keys: set[str] = set()
# collect (local) metadata from all local music providers
local_provs = get_global_cache_value("non_streaming_providers")
# NOTE: we only allow this every REFRESH_INTERVAL and a max amount of calls per day
# to not overload the (free) metadata providers with api calls
# TODO: Utilize a global (cloud) cache for metadata lookups to save on API calls
- if self.config.get_value(CONF_ENABLE_ONLINE_METADATA) and (
- force_refresh
- or (
- self._online_slots_available
- and ((time() - (album.metadata.last_refresh or 0)) > REFRESH_INTERVAL)
- and (album.mbid or album.artists)
- )
+ if force_refresh or (
+ self._online_slots_available
+ and ((time() - (album.metadata.last_refresh or 0)) > REFRESH_INTERVAL)
+ and (album.mbid or album.artists)
):
self._online_slots_available -= 1
# set timestamp, used to determine when this function was last called
if album.album_type == AlbumType.UNKNOWN:
album.album_type = prov_item.album_type
- # collect metadata from all providers
- for provider in self.providers:
- if ProviderFeature.ALBUM_METADATA not in provider.supported_features:
- continue
- if metadata := await provider.get_album_metadata(album):
- album.metadata.update(metadata)
- self.logger.debug(
- "Fetched metadata for Album %s on provider %s",
- album.name,
- provider.name,
- )
+ # collect metadata from all (online) metadata providers
+ if self.config.get_value(CONF_ENABLE_ONLINE_METADATA):
+ for provider in self.providers:
+ if ProviderFeature.ALBUM_METADATA not in provider.supported_features:
+ continue
+ if metadata := await provider.get_album_metadata(album):
+ album.metadata.update(metadata)
+ self.logger.debug(
+ "Fetched metadata for Album %s on provider %s",
+ album.name,
+ provider.name,
+ )
# update final item in library database
await self.mass.music.albums.update_item_in_library(album.item_id, album)
async def _update_track_metadata(self, track: Track, force_refresh: bool = False) -> None:
"""Get/update rich metadata for a track."""
+ self.logger.debug("Updating metadata for Track %s", track.name)
unique_keys: set[str] = set()
# collect metadata from all (online) music/metadata providers
# NOTE: we only allow this every REFRESH_INTERVAL and a max amount of calls per day
# to not overload the (free) metadata providers with api calls
# TODO: Utilize a global (cloud) cache for metadata lookups to save on API calls
- if self.config.get_value(CONF_ENABLE_ONLINE_METADATA) and (
- force_refresh
- or (
- self._online_slots_available
- and ((time() - (track.metadata.last_refresh or 0)) > REFRESH_INTERVAL)
- and (track.mbid or track.artists or track.album)
- )
+ if force_refresh or (
+ self._online_slots_available
+ and ((time() - (track.metadata.last_refresh or 0)) > REFRESH_INTERVAL)
+ and (track.mbid or track.artists or track.album)
):
self._online_slots_available -= 1
# set timestamp, used to determine when this function was last called
track.metadata.update(prov_item.metadata)
# collect metadata from all metadata providers
- for provider in self.providers:
- if ProviderFeature.TRACK_METADATA not in provider.supported_features:
- continue
- if metadata := await provider.get_track_metadata(track):
- track.metadata.update(metadata)
- self.logger.debug(
- "Fetched metadata for Track %s on provider %s",
- track.name,
- provider.name,
- )
+ if self.config.get_value(CONF_ENABLE_ONLINE_METADATA):
+ for provider in self.providers:
+ if ProviderFeature.TRACK_METADATA not in provider.supported_features:
+ continue
+ if metadata := await provider.get_track_metadata(track):
+ track.metadata.update(metadata)
+ self.logger.debug(
+ "Fetched metadata for Track %s on provider %s",
+ track.name,
+ provider.name,
+ )
# update final item in library database
await self.mass.music.tracks.update_item_in_library(track.item_id, track)
self, playlist: Playlist, force_refresh: bool = False
) -> None:
"""Get/update rich metadata for a playlist."""
+ self.logger.debug("Updating metadata for Playlist %s", playlist.name)
if not force_refresh and (time() - (playlist.metadata.last_refresh or 0)) < (
60 * 60 * 24 * 5
):
async def _get_artist_mbid(self, artist: Artist) -> str | None:
"""Fetch musicbrainz id by performing search using the artist name, albums and tracks."""
+ if artist.mbid:
+ return artist.mbid
if compare_strings(artist.name, VARIOUS_ARTISTS_NAME):
return VARIOUS_ARTISTS_MBID
+
+ musicbrainz: MusicbrainzProvider = self.mass.get_provider("musicbrainz")
+ if TYPE_CHECKING:
+ musicbrainz = cast(MusicbrainzProvider, musicbrainz)
+ # first try with resource URL (e.g. streaming provider share URL)
+ for prov_mapping in artist.provider_mappings:
+ if prov_mapping.url and prov_mapping.url.startswith("http"):
+ if mb_artist := await musicbrainz.get_artist_details_by_resource_url(
+ prov_mapping.url
+ ):
+ return mb_artist.id
+
+ # start lookup of musicbrainz id using artist name, albums and tracks
ref_albums = await self.mass.music.artists.albums(
artist.item_id, artist.provider, in_library_only=False
)
ref_tracks = await self.mass.music.artists.tracks(
artist.item_id, artist.provider, in_library_only=False
)
- # start lookup of musicbrainz id
- musicbrainz: MusicbrainzProvider = self.mass.get_provider("musicbrainz")
- assert musicbrainz
- if mbid := await musicbrainz.get_musicbrainz_artist_id(
- artist, ref_albums=ref_albums, ref_tracks=ref_tracks
- ):
- return mbid
+ # try with (strict) ref track(s), using recording id
+ for ref_track in ref_tracks:
+ if mb_artist := await musicbrainz.get_artist_details_by_track(artist.name, ref_track):
+ return mb_artist.id
+ # try with (strict) ref album(s), using releasegroup id
+ for ref_album in ref_albums:
+ if mb_artist := await musicbrainz.get_artist_details_by_album(artist.name, ref_album):
+ return mb_artist.id
+ # last restort: track matching by name
+ for ref_track in ref_tracks:
+ if not ref_track.album:
+ continue
+ if result := await musicbrainz.search(
+ artistname=artist.name,
+ albumname=ref_track.album.name,
+ trackname=ref_track.name,
+ trackversion=ref_track.version,
+ ):
+ return result[0].id
# lookup failed
ref_albums_str = "/".join(x.name for x in ref_albums) or "none"
async def _metadata_scanner(self) -> None:
"""Scanner for (missing) metadata."""
+ self.logger.info("Starting metadata scanner")
self._online_slots_available = MAX_ONLINE_CALLS_PER_RUN
timestamp = int(time() - 60 * 60 * 24 * 30)
query = (
limit=2500, order_by="random", extra_query=query
):
await self._update_track_metadata(track)
+ self.logger.info("Metadata scanner finished.")
Destructive! Will remove the item and all dependants.
"""
+ self.mass.metadata.stop_metadata_scanner()
ctrl = self.get_controller(media_type)
item = await ctrl.get_library_item(library_item_id)
# remove from all providers
async def cleanup_provider(self, provider_instance: str) -> None:
"""Cleanup provider records from the database."""
+ self.mass.metadata.stop_metadata_scanner()
if provider_instance.startswith(("filesystem", "jellyfin", "plex", "opensubsonic")):
# removal of a local provider can become messy very fast due to the relations
# such as images pointing at the files etc. so we just reset the whole db
def _set_logger(self, log_level: str | None = None) -> None:
"""Set the logger settings."""
- self.logger = logging.getLogger(f"{MASS_LOGGER_NAME}.{self.domain}")
+ mass_logger = logging.getLogger(MASS_LOGGER_NAME)
+ self.logger = mass_logger.getChild(self.domain)
if log_level is None:
log_level = self.mass.config.get_raw_core_config_value(
self.domain, CONF_LOG_LEVEL, "GLOBAL"
)
if log_level == "GLOBAL":
- mass_logger = logging.getLogger(MASS_LOGGER_NAME)
self.logger.setLevel(mass_logger.level)
else:
self.logger.setLevel(log_level)
self.mass = mass
self.manifest = manifest
self.config = config
- self.logger = logging.getLogger(f"{MASS_LOGGER_NAME}.providers.{self.domain}")
+ mass_logger = logging.getLogger(MASS_LOGGER_NAME)
+ self.logger = mass_logger.getChild(self.domain)
log_level = config.get_value(CONF_LOG_LEVEL)
if log_level == "GLOBAL":
- mass_logger = logging.getLogger(MASS_LOGGER_NAME)
self.logger.setLevel(mass_logger.level)
+ else:
+ self.logger.setLevel(log_level)
if logging.getLogger().level > self.logger.level:
# if the root logger's level is higher, we need to adjust that too
logging.getLogger().setLevel(self.logger.level)
VARIOUS_ARTISTS_MBID,
VARIOUS_ARTISTS_NAME,
)
-from music_assistant.server.controllers.cache import use_cache
from music_assistant.server.helpers.compare import compare_strings, create_safe_string
from music_assistant.server.helpers.playlists import parse_m3u, parse_pls
from music_assistant.server.helpers.tags import AudioTags, parse_tags, split_items
from music_assistant.server.models.music_provider import MusicProvider
-from .helpers import get_album_dir, get_artist_dir, get_disc_dir
+from .helpers import get_album_dir, get_artist_dir
if TYPE_CHECKING:
from collections.abc import AsyncGenerator
for index, track_artist_str in enumerate(tags.artists):
artist = await self._create_artist_itemmapping(
track_artist_str,
+ album_or_track_dir=file_item.path,
sort_name=(
tags.artist_sort_names[index] if index < len(tags.artist_sort_names) else None
),
track.metadata.chapters = UniqueList(tags.chapters)
return track
- @use_cache(300)
+ # @use_cache(300)
async def _create_artist_itemmapping(
self,
name: str,
- album_path: str | None = None,
+ album_or_track_dir: str | None = None,
sort_name: str | None = None,
mbid: str | None = None,
) -> ItemMapping:
"""Create ItemMapping for a track/album artist."""
artist_path = None
- if album_path:
- # try to find (album)artist folder based on album path
- artist_path = get_artist_dir(album_path=album_path, artist_name=name)
+ if album_or_track_dir:
+ # try to find (album)artist folder based on track or album path
+ artist_path = get_artist_dir(album_or_track_dir=album_or_track_dir, artist_name=name)
if not artist_path:
# check if we have an artist folder for this artist at root level
safe_artist_name = create_safe_string(name, lowercase=False, replace_space=False)
if prov_mapping.url:
artist_path = prov_mapping.url
break
+ if artist_path:
+ break
return ItemMapping(
media_type=MediaType.ARTIST,
"""Parse Album metadata from Track tags."""
assert track_tags.album
# work out if we have an album and/or disc folder
- # disc_dir is the folder level where the tracks are located
+ # track_dir is the folder level where the tracks are located
# this may be a separate disc folder (Disc 1, Disc 2 etc) underneath the album folder
# or this is an album folder with the disc attached
- disc_dir = get_disc_dir(track_path, track_tags.album, track_tags.disc)
- album_dir = get_album_dir(track_path, track_tags.album, disc_dir)
+ track_dir = os.path.dirname(track_path)
+ album_dir = get_album_dir(track_dir, track_tags.album)
# album artist(s)
album_artists: UniqueList[Artist | ItemMapping] = UniqueList()
for index, album_artist_str in enumerate(track_tags.album_artists):
artist = await self._create_artist_itemmapping(
album_artist_str,
- album_path=album_dir,
+ album_or_track_dir=album_dir,
sort_name=(
track_tags.album_artist_sort_names[index]
if index < len(track_tags.album_artist_sort_names)
)
album_artists = UniqueList(
[
- await self._create_artist_itemmapping(name=track_artist_str)
+ await self._create_artist_itemmapping(
+ name=track_artist_str, album_or_track_dir=album_dir
+ )
for track_artist_str in track_tags.artists
]
)
if not full_metadata:
return album
- extra_path = os.path.dirname(track_path) if (track_path and not album_dir) else None
- for folder_path in (disc_dir, album_dir, extra_path):
+ for folder_path in (track_dir, album_dir):
if not folder_path or not await self.exists(folder_path):
continue
nfo_file = os.path.join(folder_path, "album.nfo")
return album
- async def _get_local_images(self, folder: str) -> list[MediaItemImage]:
+ async def _get_local_images(self, folder: str) -> UniqueList[MediaItemImage]:
"""Return local images found in a given folderpath."""
- images = []
+ images: UniqueList[MediaItemImage] = UniqueList()
async for item in self.listdir(folder):
if "." not in item.path or item.is_dir:
continue
from music_assistant.server.helpers.compare import compare_strings
-def get_artist_dir(album_path: str, artist_name: str) -> str | None:
- """Look for (Album)Artist directory in path of album."""
- parentdir = os.path.dirname(album_path)
- dirname = parentdir.rsplit(os.sep)[-1]
- if compare_strings(artist_name, dirname, False):
- return parentdir
- return None
-
-
-def get_disc_dir(track_path: str, album_name: str, disc_number: int | None) -> str | None:
- """Look for disc directory in path of album/tracks."""
- parentdir = os.path.dirname(track_path)
- dirname = parentdir.rsplit(os.sep)[-1]
- dirname_lower = dirname.lower()
- if disc_number and compare_strings(f"disc {disc_number}", dirname, False):
- return parentdir
- if dirname_lower.startswith(album_name.lower()) and "disc" in dirname_lower:
- return parentdir
- if dirname_lower.startswith(album_name.lower()) and dirname_lower.endswith(str(disc_number)):
- return parentdir
+def get_artist_dir(album_or_track_dir: str, artist_name: str) -> str | None:
+ """Look for (Album)Artist directory in path of a track (or album)."""
+ parentdir = os.path.dirname(album_or_track_dir)
+ # account for disc or album sublevel by ignoring (max) 2 levels if needed
+ for _ in range(3):
+ dirname = parentdir.rsplit(os.sep)[-1]
+ if compare_strings(artist_name, dirname, False):
+ # literal match
+ return parentdir
+ parentdir = os.path.dirname(parentdir)
return None
-def get_album_dir(track_path: str, album_name: str, disc_dir: str | None) -> str | None:
+def get_album_dir(track_dir: str, album_name: str) -> str | None:
"""Return album/parent directory of a track."""
- parentdir = os.path.dirname(track_path)
+ parentdir = track_dir
# account for disc sublevel by ignoring 1 level if needed
- for _ in range(2 if disc_dir else 1):
+ for _ in range(2):
dirname = parentdir.rsplit(os.sep)[-1]
- dirname_lower = dirname.lower()
if compare_strings(album_name, dirname, False):
+ # literal match
+ return parentdir
+ if compare_strings(album_name, dirname.split(" - ")[-1], False):
+ # account for ArtistName - AlbumName format in the directory name
+ return parentdir
+ if compare_strings(album_name, dirname.split("(")[0], False):
+ # account for ArtistName - AlbumName (Version) format in the directory name
+ return parentdir
+ if compare_strings(album_name.split("(")[0], dirname, False):
+ # account for AlbumName (Version) format in the album name
return parentdir
- if album_name in dirname_lower:
+ if compare_strings(album_name.split("(")[0], dirname.split(" - ")[-1], False):
+ # account for ArtistName - AlbumName (Version) format
return parentdir
- if dirname_lower in album_name:
+ if len(album_name) > 8 and album_name in dirname:
+ # dirname contains album name
+ # (could potentially lead to false positives, hence the length check)
return parentdir
parentdir = os.path.dirname(parentdir)
return None
from music_assistant.common.helpers.json import json_loads
from music_assistant.common.helpers.util import parse_title_and_version
from music_assistant.common.models.enums import ExternalID, ProviderFeature
-from music_assistant.common.models.errors import (
- InvalidDataError,
- MediaNotFoundError,
- ResourceTemporarilyUnavailable,
-)
+from music_assistant.common.models.errors import InvalidDataError, ResourceTemporarilyUnavailable
from music_assistant.server.controllers.cache import use_cache
from music_assistant.server.helpers.compare import compare_strings
from music_assistant.server.helpers.throttle_retry import ThrottlerManager, throttle_with_retries
from music_assistant.server.models.metadata_provider import MetadataProvider
if TYPE_CHECKING:
- from collections.abc import Iterable
-
from music_assistant.common.models.config_entries import (
ConfigEntry,
ConfigValueType,
ProviderConfig,
)
- from music_assistant.common.models.media_items import Album, Artist, Track
+ from music_assistant.common.models.media_items import Album, Track
from music_assistant.common.models.provider import ProviderManifest
from music_assistant.server import MusicAssistant
from music_assistant.server.models import ProviderInstanceType
"""Return the features supported by this Provider."""
return SUPPORTED_FEATURES
- async def get_musicbrainz_artist_id(
- self, artist: Artist, ref_albums: Iterable[Album], ref_tracks: Iterable[Track]
- ) -> str | None:
- """Discover MusicBrainzArtistId for an artist given some reference albums/tracks."""
- if artist.mbid:
- return artist.mbid
- # try with (strict) ref track(s), using recording id
- for ref_track in ref_tracks:
- if mb_artist := await self.get_artist_details_by_track(artist.name, ref_track):
- return mb_artist.id
- # try with (strict) ref album(s), using releasegroup id
- for ref_album in ref_albums:
- if mb_artist := await self.get_artist_details_by_album(artist.name, ref_album):
- return mb_artist.id
- # last restort: track matching by name
- for ref_track in ref_tracks:
- if not ref_track.album:
- continue
- if result := await self.search(
- artistname=artist.name,
- albumname=ref_track.album.name,
- trackname=ref_track.name,
- trackversion=ref_track.version,
- ):
- return result[0].id
- return None
-
async def search(
self, artistname: str, albumname: str, trackname: str, trackversion: str | None = None
) -> tuple[MusicBrainzArtist, MusicBrainzReleaseGroup, MusicBrainzRecording] | None:
if not ref_track.mbid:
return None
result = None
- with suppress(InvalidDataError, MediaNotFoundError):
+ with suppress(InvalidDataError):
result = await self.get_recording_details(ref_track.mbid)
if not (result and result.artist_credit):
return None
return artist_credit.artist
return None
+ async def get_artist_details_by_resource_url(
+ self, resource_url: str
+ ) -> MusicBrainzArtist | None:
+ """
+ Get musicbrainz artist details by providing a resource URL (e.g. Spotify share URL).
+
+ MusicBrainzArtist object that is returned does not contain the optional data.
+ """
+ if result := await self.get_data("url", resource=resource_url, inc="artist-rels"):
+ for relation in result.get("relations", []):
+ if not (artist := relation.get("artist")):
+ continue
+ return MusicBrainzArtist.from_dict(replace_hyphens(artist))
+ return None
+
@use_cache(86400 * 30)
@throttle_with_retries
async def get_data(self, endpoint: str, **kwargs: dict[str, Any]) -> Any:
# handle temporary server error
if response.status in (502, 503):
raise ResourceTemporarilyUnavailable(backoff_time=30)
- # handle 404 not found, convert to MediaNotFoundError
+ # handle 404 not found
if response.status in (400, 401, 404):
- raise MediaNotFoundError(f"{endpoint} not found")
+ return None
response.raise_for_status()
return await response.json(loads=json_loads)
"""Retrieve metadata for album on theaudiodb."""
if not self.config.get_value(CONF_ENABLE_ALBUM_METADATA):
return None
- if (mbid := album.get_external_id(ExternalID.MB_RELEASEGROUP)) is None:
+ if mbid := album.get_external_id(ExternalID.MB_RELEASEGROUP):
+ result = await self._get_data("album-mb.php", i=mbid)
+ if result and result.get("album"):
+ adb_album = result["album"][0]
+ return await self.__parse_album(album, adb_album)
+ # if there was no match on mbid, there will certainly be no match by name
return None
- result = await self._get_data("album-mb.php", i=mbid)
- if result and result.get("album"):
- adb_album = result["album"][0]
- # fill in some missing album info if needed
- if not album.year:
- album.year = int(adb_album.get("intYearReleased", "0"))
- if album.artists and not album.artists[0].mbid:
- album.artists[0].mbid = adb_album["strMusicBrainzArtistID"]
- if album.album_type == AlbumType.UNKNOWN:
- album.album_type = ALBUMTYPE_MAPPING.get(
- adb_album.get("strReleaseFormat"), AlbumType.UNKNOWN
- )
- return self.__parse_album(adb_album)
+ # fallback if no musicbrainzid: lookup by name
+ for album_artist in album.artists:
+ # make sure to include the version in the album name
+ album_name = f"{album.name} {album.version}" if album.version else album.name
+ result = await self._get_data("searchalbum.php?", s=album_artist.name, a=album)
+ if result and result.get("album"):
+ for item in result["album"]:
+ # some safety checks
+ if album_artist.mbid:
+ if album_artist.mbid != item["strMusicBrainzArtistID"]:
+ continue
+ elif not compare_strings(album_artist.name, item["strArtist"]):
+ continue
+ if compare_strings(album_name, item["strAlbum"], strict=False):
+ # match found !
+ return await self.__parse_album(album, item)
return None
async def get_track_metadata(self, track: Track) -> MediaItemMetadata | None:
if track.mbid:
result = await self._get_data("track-mb.php", i=track.mbid)
if result and result.get("track"):
- return self.__parse_track(result["track"][0])
+ return await self.__parse_track(track, result["track"][0])
# if there was no match on mbid, there will certainly be no match by name
return None
# fallback if no musicbrainzid: lookup by name
for track_artist in track.artists:
- # make sure to include the version in the track name
+ # make sure to include the version in the album name
track_name = f"{track.name} {track.version}" if track.version else track.name
result = await self._get_data("searchtrack.php?", s=track_artist.name, t=track_name)
if result and result.get("track"):
for item in result["track"]:
# some safety checks
- if track_artist.mbid and track_artist.mbid != item["strMusicBrainzArtistID"]:
+ if track_artist.mbid:
+ if track_artist.mbid != item["strMusicBrainzArtistID"]:
+ continue
+ elif not compare_strings(track_artist.name, item["strArtist"]):
continue
- if (
+ if ( # noqa: SIM114
track.album
and (mb_rgid := track.album.get_external_id(ExternalID.MB_RELEASEGROUP))
# AudioDb swapped MB Album ID and ReleaseGroup ID ?!
and mb_rgid != item["strMusicBrainzAlbumID"]
):
continue
- if not compare_strings(track_artist.name, item["strArtist"]):
+ elif track.album and not compare_strings(
+ track.album.name, item["strAlbum"], strict=False
+ ):
continue
- if compare_strings(track_name, item["strTrack"]):
- return self.__parse_track(item)
+ if not compare_strings(track_name, item["strTrack"], strict=False):
+ continue
+ return await self.__parse_track(track, item)
return None
def __parse_artist(self, artist_obj: dict[str, Any]) -> MediaItemMetadata:
break
return metadata
- def __parse_album(self, album_obj: dict[str, Any]) -> MediaItemMetadata:
+ async def __parse_album(self, album: Album, adb_album: dict[str, Any]) -> MediaItemMetadata:
"""Parse audiodb album object to MediaItemMetadata."""
metadata = MediaItemMetadata()
# generic data
- metadata.label = album_obj.get("strLabel")
- metadata.style = album_obj.get("strStyle")
- if genre := album_obj.get("strGenre"):
+ metadata.label = adb_album.get("strLabel")
+ metadata.style = adb_album.get("strStyle")
+ if genre := adb_album.get("strGenre"):
metadata.genres = {genre}
- metadata.mood = album_obj.get("strMood")
+ metadata.mood = adb_album.get("strMood")
# links
metadata.links = set()
- if link := album_obj.get("strWikipediaID"):
+ if link := adb_album.get("strWikipediaID"):
metadata.links.add(
MediaItemLink(type=LinkType.WIKIPEDIA, url=f"https://wikipedia.org/wiki/{link}")
)
- if link := album_obj.get("strAllMusicID"):
+ if link := adb_album.get("strAllMusicID"):
metadata.links.add(
MediaItemLink(type=LinkType.ALLMUSIC, url=f"https://www.allmusic.com/album/{link}")
)
# description
lang_code, lang_country = self.mass.metadata.locale.split("_")
- if desc := album_obj.get(f"strDescription{lang_country}") or (
- desc := album_obj.get(f"strDescription{lang_code.upper()}")
+ if desc := adb_album.get(f"strDescription{lang_country}") or (
+ desc := adb_album.get(f"strDescription{lang_code.upper()}")
):
metadata.description = desc
else:
- metadata.description = album_obj.get("strDescriptionEN")
- metadata.review = album_obj.get("strReview")
+ metadata.description = adb_album.get("strDescriptionEN")
+ metadata.review = adb_album.get("strReview")
# images
if not self.config.get_value(CONF_ENABLE_IMAGES):
return metadata
metadata.images = UniqueList()
for key, img_type in IMG_MAPPING.items():
for postfix in ("", "2", "3", "4", "5", "6", "7", "8", "9", "10"):
- if img := album_obj.get(f"{key}{postfix}"):
+ if img := adb_album.get(f"{key}{postfix}"):
metadata.images.append(
MediaItemImage(
type=img_type,
)
else:
break
+ # fill in some missing album info if needed
+ if not album.year:
+ album.year = int(adb_album.get("intYearReleased", "0"))
+ if album.album_type == AlbumType.UNKNOWN and adb_album.get("strReleaseFormat"):
+ releaseformat = cast(str, adb_album.get("strReleaseFormat"))
+ album.album_type = ALBUMTYPE_MAPPING.get(releaseformat, AlbumType.UNKNOWN)
+ # update the artist mbid while at it
+ for album_artist in album.artists:
+ if not compare_strings(album_artist.name, adb_album["strArtist"]):
+ continue
+ if not album_artist.mbid and album_artist.provider == "library":
+ album_artist.mbid = adb_album["strMusicBrainzArtistID"]
+ await self.mass.music.artists.update_item_in_library(
+ album_artist.item_id,
+ album_artist, # type: ignore[arg-type]
+ )
return metadata
- def __parse_track(self, track_obj: dict[str, Any]) -> MediaItemMetadata:
+ async def __parse_track(self, track: Track, adb_track: dict[str, Any]) -> MediaItemMetadata:
"""Parse audiodb track object to MediaItemMetadata."""
metadata = MediaItemMetadata()
# generic data
- metadata.lyrics = track_obj.get("strTrackLyrics")
- metadata.style = track_obj.get("strStyle")
- if genre := track_obj.get("strGenre"):
+ metadata.lyrics = adb_track.get("strTrackLyrics")
+ metadata.style = adb_track.get("strStyle")
+ if genre := adb_track.get("strGenre"):
metadata.genres = {genre}
- metadata.mood = track_obj.get("strMood")
+ metadata.mood = adb_track.get("strMood")
# description
lang_code, lang_country = self.mass.metadata.locale.split("_")
- if desc := track_obj.get(f"strDescription{lang_country}") or (
- desc := track_obj.get(f"strDescription{lang_code.upper()}")
+ if desc := adb_track.get(f"strDescription{lang_country}") or (
+ desc := adb_track.get(f"strDescription{lang_code.upper()}")
):
metadata.description = desc
else:
- metadata.description = track_obj.get("strDescriptionEN")
+ metadata.description = adb_track.get("strDescriptionEN")
# images
if not self.config.get_value(CONF_ENABLE_IMAGES):
return metadata
metadata.images = UniqueList([])
for key, img_type in IMG_MAPPING.items():
for postfix in ("", "2", "3", "4", "5", "6", "7", "8", "9", "10"):
- if img := track_obj.get(f"{key}{postfix}"):
+ if img := adb_track.get(f"{key}{postfix}"):
metadata.images.append(
MediaItemImage(
type=img_type,
)
else:
break
+ # update the artist mbid while at it
+ for album_artist in track.artists:
+ if not compare_strings(album_artist.name, adb_track["strArtist"]):
+ continue
+ if not album_artist.mbid and album_artist.provider == "library":
+ album_artist.mbid = adb_track["strMusicBrainzArtistID"]
+ await self.mass.music.artists.update_item_in_library(
+ album_artist.item_id,
+ album_artist, # type: ignore[arg-type]
+ )
+ # update the album mbid while at it
+ if (
+ track.album
+ and not track.album.get_external_id(ExternalID.MB_RELEASEGROUP)
+ and track.album.provider == "library"
+ and isinstance(track.album, Album)
+ ):
+ track.album.add_external_id(
+ ExternalID.MB_RELEASEGROUP, adb_track["strMusicBrainzAlbumID"]
+ )
+ await self.mass.music.albums.update_item_in_library(track.album.item_id, track.album)
return metadata
@use_cache(86400 * 30)
item_id=str(artist_id),
provider_domain=self.domain,
provider_instance=self.instance_id,
- url=f"{BROWSE_URL}/artist/{artist_id}",
+ # NOTE: don't use the /browse endpoint as it's
+ # not working for musicbrainz lookups
+ url=f"https://tidal.com/artist/{artist_id}",
)
},
)
audio_format=AudioFormat(
content_type=ContentType.FLAC,
),
- url=f"{BROWSE_URL}/album/{album_id}",
+ url=f"https://tidal.com/album/{album_id}",
available=album_obj.available,
)
},
content_type=ContentType.FLAC,
bit_depth=24 if track_obj.is_HiRes else 16,
),
- url=f"{BROWSE_URL}/track/{track_id}",
+ url=f"https://tidal.com/track/{track_id}",
available=track_obj.available,
)
},
from tidalapi.media import Stream as TidalStream
from music_assistant.common.models.enums import MediaType
-from music_assistant.common.models.errors import (
- MediaNotFoundError,
- ResourceTemporarilyUnavailable,
-)
+from music_assistant.common.models.errors import MediaNotFoundError, ResourceTemporarilyUnavailable
DEFAULT_LIMIT = 50
LOGGER = logging.getLogger(__name__)
msg = "Tidal API rate limit reached"
raise ResourceTemporarilyUnavailable(msg)
else:
- all_albums = []
- albums = artist_obj.get_albums(limit=DEFAULT_LIMIT)
- eps_singles = artist_obj.get_ep_singles(limit=DEFAULT_LIMIT)
- compilations = artist_obj.get_other(limit=DEFAULT_LIMIT)
- all_albums.extend(albums)
- all_albums.extend(eps_singles)
- all_albums.extend(compilations)
+ all_albums = artist_obj.get_albums(limit=DEFAULT_LIMIT)
+ # extend with EPs and singles
+ all_albums.extend(artist_obj.get_ep_singles(limit=DEFAULT_LIMIT))
+ # extend with compilations
+ # note that the Tidal API gives back really strange results here so
+ # filter on either various artists or the artist id
+ for album in artist_obj.get_other(limit=DEFAULT_LIMIT):
+ if album.artist.id == artist_obj.id or album.artist.name == "Various Artists":
+ all_albums.append(album)
return all_albums
return await asyncio.to_thread(inner)
if prov := self._providers.get(provider_instance_or_domain):
if return_unavailable or prov.available:
return prov
- if not prov.is_streaming_provider:
+ if not getattr(prov, "is_streaming_provider", None):
# no need to lookup other instances because this provider has unique data
return None
provider_instance_or_domain = prov.domain