"""All enums used by the Music Assistant models."""
from __future__ import annotations
-from enum import Enum
-from typing import Any, TypeVar
-
-# pylint:disable=ungrouped-imports
-try:
- from enum import StrEnum
-except (AttributeError, ImportError):
- # Python 3.10 compatibility for strenum
- _StrEnumSelfT = TypeVar("_StrEnumSelfT", bound="StrEnum")
-
- class StrEnum(str, Enum):
- """Partial backport of Python 3.11's StrEnum for our basic use cases."""
-
- def __new__(
- cls: type[_StrEnumSelfT], value: str, *args: Any, **kwargs: Any
- ) -> _StrEnumSelfT:
- """Create a new StrEnum instance."""
- if not isinstance(value, str):
- raise TypeError(f"{value!r} is not a string")
- return super().__new__(cls, value, *args, **kwargs)
-
- def __str__(self) -> str:
- """Return self."""
- return str(self)
-
- @staticmethod
- def _generate_next_value_(
- name: str, start: int, count: int, last_values: list[Any] # noqa
- ) -> Any:
- """Make `auto()` explicitly unsupported.
-
- We may revisit this when it's very clear that Python 3.11's
- `StrEnum.auto()` behavior will no longer change.
- """
- raise TypeError("auto() is not supported by this implementation")
+from enum import StrEnum
class MediaType(StrEnum):
- """StrEnum for MediaType."""
+ """Enum for MediaType."""
ARTIST = "artist"
ALBUM = "album"
)
+class ExternalID(StrEnum):
+ """Enum with External ID types."""
+
+ # musicbrainz:
+ # for tracks this is the RecordingID
+ # for albums this is the ReleaseGroupID (NOT the release ID!)
+ # for artists this is the ArtistID
+ MUSICBRAINZ = "musicbrainz"
+ ISRC = "isrc" # used to identify unique recordings
+ BARCODE = "barcode" # EAN-13 barcode for identifying albums
+ ACOUSTID = "acoustid" # unique fingerprint (id) for a recording
+ ASIN = "asin" # amazon unique number to identify albums
+ DISCOGS = "discogs" # id for media item on discogs
+ TADB = "tadb" # the audio db id
+
+
class LinkType(StrEnum):
- """StrEnum with link types."""
+ """Enum with link types."""
WEBSITE = "website"
FACEBOOK = "facebook"
class ImageType(StrEnum):
- """StrEnum with image types."""
+ """Enum with image types."""
THUMB = "thumb"
LANDSCAPE = "landscape"
class AlbumType(StrEnum):
- """StrEnum for Album type."""
+ """Enum for Album type."""
ALBUM = "album"
SINGLE = "single"
class QueueOption(StrEnum):
- """StrEnum representation of the queue (play) options.
+ """Enum representation of the queue (play) options.
- PLAY -> Insert new item(s) in queue at the current position and start playing.
- REPLACE -> Replace entire queue contents with the new items and start playing from index 0.
class PlayerState(StrEnum):
- """StrEnum for the (playback)state of a player."""
+ """Enum for the (playback)state of a player."""
IDLE = "idle"
PAUSED = "paused"
ARTIST_METADATA = "artist_metadata"
ALBUM_METADATA = "album_metadata"
TRACK_METADATA = "track_metadata"
- GET_ARTIST_MBID = "get_artist_mbid"
#
# PLUGIN FEATURES
from dataclasses import dataclass, field, fields
from time import time
-from typing import Any
+from typing import Any, Self
from mashumaro import DataClassDictMixin
from music_assistant.common.models.enums import (
AlbumType,
ContentType,
+ ExternalID,
ImageType,
LinkType,
MediaType,
audio_format: AudioFormat = field(default_factory=AudioFormat)
# url = link to provider details page if exists
url: str | None = None
- # isrc (tracks only) - isrc identifier if known
- isrc: str | None = None
- # barcode (albums only) - barcode identifier if known
- barcode: str | None = None
# optional details to store provider specific details
details: str | None = None
mood: str | None = None
style: str | None = None
copyright: str | None = None
- lyrics: str | None = None
- ean: str | None = None
+ lyrics: str | None = None # tracks only
label: str | None = None
links: set[MediaItemLink] | None = None
chapters: list[MediaItemChapter] | None = None
performers: set[str] | None = None
preview: str | None = None
- replaygain: float | None = None
popularity: int | None = None
# last_refresh: timestamp the (full) metadata was last collected
last_refresh: int | None = None
item_id: str
provider: str # provider instance id or provider domain
name: str
- metadata: MediaItemMetadata
provider_mappings: set[ProviderMapping]
# optional fields below
- # provider_mappings: set[ProviderMapping] = field(default_factory=set)
+ external_ids: set[tuple[ExternalID, str]] = field(default_factory=set)
metadata: MediaItemMetadata = field(default_factory=MediaItemMetadata)
favorite: bool = False
media_type: MediaType = MediaType.UNKNOWN
return None
return next((x for x in self.metadata.images if x.type == ImageType.THUMB), None)
+ @property
+ def mbid(self) -> str | None:
+ """Return MusicBrainz ID."""
+ return self.get_external_id(ExternalID.MUSICBRAINZ)
+
+ @mbid.setter
+ def mbid(self, value: str) -> None:
+ """Set MusicBrainz External ID."""
+ if not value:
+ return
+ if len(value.split("-")) != 5:
+ raise RuntimeError("Invalid MusicBrainz identifier")
+ if existing := next((x for x in self.external_ids if x[0] == ExternalID.MUSICBRAINZ), None):
+ # Musicbrainz ID is unique so remove existing entry
+ self.external_ids.remove(existing)
+ self.external_ids.add((ExternalID.MUSICBRAINZ, value))
+
+ def get_external_id(self, external_id_type: ExternalID) -> str | None:
+ """Get (the first instance) of given External ID or None if not found."""
+ for ext_id in self.external_ids:
+ if ext_id[0] != external_id_type:
+ continue
+ return ext_id[1]
+ return None
+
def __hash__(self) -> int:
"""Return custom hash."""
return hash(self.uri)
- def __eq__(self, other: ItemMapping) -> bool:
+ def __eq__(self, other: MediaItem | ItemMapping) -> bool:
"""Check equality of two items."""
return self.uri == other.uri
+ @classmethod
+ def from_item_mapping(cls: type, item: ItemMapping) -> Self:
+ """Instantiate MediaItem from ItemMapping."""
+ # NOTE: This will not work for albums and tracks!
+ return cls.from_dict(
+ {
+ **item.to_dict(),
+ "provider_mappings": {
+ "item_id": item.item_id,
+ "provider_domain": item.provider,
+ "provider_instance": item.provider,
+ "available": item.available,
+ },
+ }
+ )
+
@dataclass(kw_only=True)
class ItemMapping(DataClassDictMixin):
sort_name: str | None = None
uri: str | None = None
available: bool = True
+ external_ids: set[tuple[ExternalID, str]] = field(default_factory=set)
@classmethod
def from_item(cls, item: MediaItem):
"""Create ItemMapping object from regular item."""
- result = cls.from_dict(item.to_dict())
- result.available = item.available
- return result
+ return cls.from_dict(item.to_dict())
def __post_init__(self):
"""Call after init."""
"""Model for an artist."""
media_type: MediaType = MediaType.ARTIST
- mbid: str | None = None
@dataclass(kw_only=True)
year: int | None = None
artists: list[Artist | ItemMapping] = field(default_factory=list)
album_type: AlbumType = AlbumType.UNKNOWN
- mbid: str | None = None # release group id
@dataclass(kw_only=True)
media_type: MediaType = MediaType.TRACK
duration: int = 0
version: str = ""
- mbid: str | None = None # Recording ID
artists: list[Artist | ItemMapping] = field(default_factory=list)
album: Album | ItemMapping | None = None # optional
API_SCHEMA_VERSION: Final[int] = 23
MIN_SCHEMA_VERSION: Final[int] = 23
-DB_SCHEMA_VERSION: Final[int] = 25
+DB_SCHEMA_VERSION: Final[int] = 26
ROOT_LOGGER_NAME: Final[str] = "music_assistant"
# grab additional metadata
if metadata_lookup:
await self.mass.metadata.get_album_metadata(item)
- # actually add (or update) the item in the library db
- # use the lock to prevent a race condition of the same item being added twice
- async with self._db_add_lock:
- library_item = await self._add_library_item(item)
+ # check for existing item first
+ library_item = None
+ if cur_item := await self.get_library_item_by_prov_id(item.item_id, item.provider):
+ # existing item match by provider id
+ library_item = await self.update_item_in_library(cur_item.item_id, item) # noqa: SIM114
+ elif cur_item := await self.get_library_item_by_external_ids(item.external_ids):
+ # existing item match by external id
+ library_item = await self.update_item_in_library(cur_item.item_id, item)
+ else:
+ # search by name
+ async for db_item in self.iter_library_items(search=item.name):
+ if compare_album(db_item, item):
+ # existing item found: update it
+ library_item = await self.update_item_in_library(db_item.item_id, item)
+ break
+ if not library_item:
+ # actually add a new item in the library db
+ # use the lock to prevent a race condition of the same item being added twice
+ async with self._db_add_lock:
+ library_item = await self._add_library_item(item)
# also fetch the same album on all providers
if metadata_lookup:
await self._match(library_item)
else:
album_type = cur_item.album_type
sort_artist = album_artists[0].sort_name
+ cur_item.external_ids.update(update.external_ids)
await self.mass.music.database.update(
self.db_table,
{"item_id": db_id},
"artists": serialize_to_json(album_artists),
"metadata": serialize_to_json(metadata),
"provider_mappings": serialize_to_json(provider_mappings),
- "mbid": update.mbid or cur_item.mbid,
+ "external_ids": serialize_to_json(
+ update.external_ids if overwrite else cur_item.external_ids
+ ),
"timestamp_modified": int(utc_timestamp()),
},
)
async def _add_library_item(self, item: Album) -> Album:
"""Add a new record to the database."""
- # safety guard: check for existing item first
- if cur_item := await self.get_library_item_by_prov_id(item.item_id, item.provider):
- # existing item found: update it
- return await self.update_item_in_library(cur_item.item_id, item)
- if item.mbid:
- match = {"mbid": item.mbid}
- if db_row := await self.mass.music.database.get_row(self.db_table, match):
- cur_item = Album.from_dict(self._parse_db_row(db_row))
- # existing item found: update it
- return await self.update_item_in_library(cur_item.item_id, item)
- # fallback to search and match
- match = {"sort_name": item.sort_name}
- for db_row in await self.mass.music.database.get_rows(self.db_table, match):
- row_album = Album.from_dict(self._parse_db_row(db_row))
- if compare_album(row_album, item):
- cur_item = row_album
- # existing item found: update it
- return await self.update_item_in_library(cur_item.item_id, item)
-
- # insert new item
- album_artists = await self._get_artist_mappings(item, cur_item)
- sort_artist = album_artists[0].sort_name
+ album_artists = await self._get_artist_mappings(item)
+ sort_artist = album_artists[0].sort_name if album_artists else ""
new_item = await self.mass.music.database.insert(
self.db_table,
{
"favorite": item.favorite,
"album_type": item.album_type,
"year": item.year,
- "mbid": item.mbid,
"metadata": serialize_to_json(item.metadata),
"provider_mappings": serialize_to_json(item.provider_mappings),
"artists": serialize_to_json(album_artists),
"sort_artist": sort_artist,
+ "external_ids": serialize_to_json(item.external_ids),
"timestamp_added": int(utc_timestamp()),
"timestamp_modified": int(utc_timestamp()),
},
"""Add artist to library and return the database item."""
if isinstance(item, ItemMapping):
metadata_lookup = False
+ item = Artist.from_item_mapping(item)
# grab musicbrainz id and additional metadata
if metadata_lookup:
await self.mass.metadata.get_artist_metadata(item)
- # actually add (or update) the item in the library db
- # use the lock to prevent a race condition of the same item being added twice
- async with self._db_add_lock:
- library_item = await self._add_library_item(item)
+ # check for existing item first
+ library_item = None
+ if cur_item := await self.get_library_item_by_prov_id(item.item_id, item.provider):
+ # existing item match by provider id
+ library_item = await self.update_item_in_library(cur_item.item_id, item) # noqa: SIM114
+ elif cur_item := await self.get_library_item_by_external_ids(item.external_ids):
+ # existing item match by external id
+ library_item = await self.update_item_in_library(cur_item.item_id, item)
+ else:
+ # search by name
+ async for db_item in self.iter_library_items(search=item.name):
+ if compare_artist(db_item, item):
+ # existing item found: update it
+ # NOTE: if we matched an artist by name this could theoretically lead to
+ # collisions but the chance is so small it is not worth the additional
+ # overhead of grabbing the musicbrainz id upfront
+ library_item = await self.update_item_in_library(db_item.item_id, item)
+ break
+ if not library_item:
+ # actually add (or update) the item in the library db
+ # use the lock to prevent a race condition of the same item being added twice
+ async with self._db_add_lock:
+ library_item = await self._add_library_item(item)
# also fetch same artist on all providers
if metadata_lookup:
await self.match_artist(library_item)
cur_item = await self.get_library_item(db_id)
metadata = cur_item.metadata.update(getattr(update, "metadata", None), overwrite)
provider_mappings = self._get_provider_mappings(cur_item, update, overwrite)
-
+ cur_item.external_ids.update(update.external_ids)
# enforce various artists name + id
mbid = cur_item.mbid
if (not mbid or overwrite) and getattr(update, "mbid", None):
{
"name": update.name if overwrite else cur_item.name,
"sort_name": update.sort_name if overwrite else cur_item.sort_name,
- "mbid": mbid,
+ "external_ids": serialize_to_json(
+ update.external_ids if overwrite else cur_item.external_ids
+ ),
"metadata": serialize_to_json(metadata),
"provider_mappings": serialize_to_json(provider_mappings),
"timestamp_modified": int(utc_timestamp()),
paged_list = await self.mass.music.albums.library_items(extra_query=query)
return paged_list.items
- async def _add_library_item(self, item: Artist | ItemMapping) -> Artist:
+ async def _add_library_item(self, item: Artist) -> Artist:
"""Add a new item record to the database."""
# enforce various artists name + id
- if not isinstance(item, ItemMapping):
- if compare_strings(item.name, VARIOUS_ARTISTS_NAME):
- item.mbid = VARIOUS_ARTISTS_ID_MBID
- if item.mbid == VARIOUS_ARTISTS_ID_MBID:
- item.name = VARIOUS_ARTISTS_NAME
- # safety guard: check for existing item first
- if isinstance(item, ItemMapping) and (
- cur_item := await self.get_library_item_by_prov_id(item.item_id, item.provider)
- ):
- # existing item found: update it
- return await self.update_item_in_library(cur_item.item_id, item)
- if not isinstance(item, ItemMapping) and (
- cur_item := await self.get_library_item_by_prov_mappings(item.provider_mappings)
- ):
- return await self.update_item_in_library(cur_item.item_id, item)
- if mbid := getattr(item, "mbid", None):
- match = {"mbid": mbid}
- if db_row := await self.mass.music.database.get_row(self.db_table, match):
- # existing item found: update it
- cur_item = Artist.from_dict(self._parse_db_row(db_row))
- return await self.update_item_in_library(cur_item.item_id, item)
- # fallback to exact name match
- # NOTE: we match an artist by name which could theoretically lead to collisions
- # but the chance is so small it is not worth the additional overhead of grabbing
- # the musicbrainz id upfront
- match = {"sort_name": item.sort_name}
- for db_row in await self.mass.music.database.get_rows(self.db_table, match):
- row_artist = Artist.from_dict(self._parse_db_row(db_row))
- if row_artist.sort_name == item.sort_name:
- cur_item = row_artist
- # existing item found: update it
- return await self.update_item_in_library(cur_item.item_id, item)
-
+ if compare_strings(item.name, VARIOUS_ARTISTS_NAME):
+ item.mbid = VARIOUS_ARTISTS_ID_MBID
+ if item.mbid == VARIOUS_ARTISTS_ID_MBID:
+ item.name = VARIOUS_ARTISTS_NAME
# no existing item matched: insert item
item.timestamp_added = int(utc_timestamp())
item.timestamp_modified = int(utc_timestamp())
- # edge case: item is an ItemMapping,
- # try to construct (a half baken) Artist object from it
- if isinstance(item, ItemMapping):
- item = Artist.from_dict(item.to_dict())
new_item = await self.mass.music.database.insert(
self.db_table,
{
"name": item.name,
"sort_name": item.sort_name,
"favorite": item.favorite,
- "mbid": item.mbid,
+ "external_ids": serialize_to_json(item.external_ids),
"metadata": serialize_to_json(item.metadata),
"provider_mappings": serialize_to_json(item.provider_mappings),
"timestamp_added": int(utc_timestamp()),
from typing import TYPE_CHECKING, Any, Generic, TypeVar
from music_assistant.common.helpers.json import json_loads, serialize_to_json
-from music_assistant.common.models.enums import EventType, MediaType, ProviderFeature
+from music_assistant.common.models.enums import EventType, ExternalID, MediaType, ProviderFeature
from music_assistant.common.models.errors import InvalidDataError, MediaNotFoundError
from music_assistant.common.models.media_items import (
Album,
ItemCls = TypeVar("ItemCls", bound="MediaItemType")
REFRESH_INTERVAL = 60 * 60 * 24 * 30
-JSON_KEYS = ("artists", "metadata", "provider_mappings")
+JSON_KEYS = ("artists", "metadata", "provider_mappings", "external_ids")
class MediaControllerBase(Generic[ItemCls], metaclass=ABCMeta):
return item
return None
+ async def get_library_item_by_external_id(
+ self, external_id: str, external_id_type: ExternalID | None = None
+ ) -> ItemCls | None:
+ """Get the library item for the given external id."""
+ query = self.base_query + f" WHERE {self.db_table}.external_ids LIKE :external_id_str"
+ if external_id_type:
+ external_id_str = f'%("{external_id_type}", "{external_id}")%'
+ else:
+ external_id_str = f'%"{external_id}"%'
+ for item in await self._get_library_items_by_query(
+ query=query, query_params={"external_id_str": external_id_str}
+ ):
+ return item
+ return None
+
+ async def get_library_item_by_external_ids(
+ self, external_ids: set[tuple[ExternalID, str]]
+ ) -> ItemCls | None:
+ """Get the library item for (one of) the given external ids."""
+ for external_id_type, external_id in external_ids:
+ if match := await self.get_library_item_by_external_id(external_id, external_id_type):
+ return match
+ return None
+
async def get_library_items_by_prov_id(
self,
provider_instance_id_or_domain: str,
ProviderUnavailableError,
UnsupportedFeaturedException,
)
-from music_assistant.common.models.media_items import Playlist, PlaylistTrack, Track
+from music_assistant.common.models.media_items import ItemMapping, Playlist, PlaylistTrack, Track
from music_assistant.constants import DB_TABLE_PLAYLISTS
+from music_assistant.server.helpers.compare import compare_strings
from .base import MediaControllerBase
async def add_item_to_library(self, item: Playlist, metadata_lookup: bool = True) -> Playlist:
"""Add playlist to library and return the new database item."""
+ if isinstance(item, ItemMapping):
+ metadata_lookup = False
+ item = Playlist.from_item_mapping(item)
if not isinstance(item, Playlist):
raise InvalidDataError(
"Not a valid Playlist object (ItemMapping can not be added to db)"
)
if not item.provider_mappings:
raise InvalidDataError("Playlist is missing provider mapping(s)")
-
- # actually add (or update) the item in the library db
- # use the lock to prevent a race condition of the same item being added twice
- async with self._db_add_lock:
- library_item = await self._add_library_item(item)
+ # check for existing item first
+ library_item = None
+ if cur_item := await self.get_library_item_by_prov_id(item.item_id, item.provider):
+ # existing item match by provider id
+ library_item = await self.update_item_in_library(cur_item.item_id, item) # noqa: SIM114
+ elif cur_item := await self.get_library_item_by_external_ids(item.external_ids):
+ # existing item match by external id
+ library_item = await self.update_item_in_library(cur_item.item_id, item)
+ else:
+ # search by name
+ async for db_item in self.iter_library_items(search=item.name):
+ if compare_strings(db_item.name, item.name):
+ # existing item found: update it
+ library_item = await self.update_item_in_library(db_item.item_id, item)
+ break
+ if not library_item:
+ # actually add a new item in the library db
+ # use the lock to prevent a race condition of the same item being added twice
+ async with self._db_add_lock:
+ library_item = await self._add_library_item(item)
# preload playlist tracks listing (do not load them in the db)
async for _ in self.tracks(item.item_id, item.provider):
pass
cur_item = await self.get_library_item(db_id)
metadata = cur_item.metadata.update(getattr(update, "metadata", None), overwrite)
provider_mappings = self._get_provider_mappings(cur_item, update, overwrite)
+ cur_item.external_ids.update(update.external_ids)
await self.mass.music.database.update(
self.db_table,
{"item_id": db_id},
"is_editable": update.is_editable,
"metadata": serialize_to_json(metadata),
"provider_mappings": serialize_to_json(provider_mappings),
+ "external_ids": serialize_to_json(
+ update.external_ids if overwrite else cur_item.external_ids
+ ),
"timestamp_modified": int(utc_timestamp()),
},
)
async def _add_library_item(self, item: Playlist) -> Playlist:
"""Add a new record to the database."""
- # safety guard: check for existing item first
- if cur_item := await self.get_library_item_by_prov_mappings(item.provider_mappings):
- # existing item found: update it
- return await self.update_item_in_library(cur_item.item_id, item)
- # try name matching
- match = {"name": item.name, "owner": item.owner}
- if db_row := await self.mass.music.database.get_row(self.db_table, match):
- cur_item = Playlist.from_dict(self._parse_db_row(db_row))
- # existing item found: update it
- return await self.update_item_in_library(cur_item.item_id, item)
- # insert new item
item.timestamp_added = int(utc_timestamp())
item.timestamp_modified = int(utc_timestamp())
new_item = await self.mass.music.database.insert(
"favorite": item.favorite,
"metadata": serialize_to_json(item.metadata),
"provider_mappings": serialize_to_json(item.provider_mappings),
+ "external_ids": serialize_to_json(item.external_ids),
"timestamp_added": int(utc_timestamp()),
"timestamp_modified": int(utc_timestamp()),
},
from music_assistant.common.helpers.json import serialize_to_json
from music_assistant.common.models.enums import EventType, MediaType
from music_assistant.common.models.errors import InvalidDataError
-from music_assistant.common.models.media_items import Radio, Track
+from music_assistant.common.models.media_items import ItemMapping, Radio, Track
from music_assistant.constants import DB_TABLE_RADIOS
-from music_assistant.server.helpers.compare import loose_compare_strings
+from music_assistant.server.helpers.compare import compare_strings, loose_compare_strings
from .base import MediaControllerBase
async def add_item_to_library(self, item: Radio, metadata_lookup: bool = True) -> Radio:
"""Add radio to library and return the new database item."""
+ if isinstance(item, ItemMapping):
+ metadata_lookup = False
+ item = Radio.from_item_mapping(item)
if not isinstance(item, Radio):
- raise InvalidDataError("Not a valid Radio object (ItemMapping can not be added to db)")
+ raise InvalidDataError("Not a valid Radio object")
if not item.provider_mappings:
raise InvalidDataError("Radio is missing provider mapping(s)")
if metadata_lookup:
await self.mass.metadata.get_radio_metadata(item)
- # actually add (or update) the item in the library db
- # use the lock to prevent a race condition of the same item being added twice
- async with self._db_add_lock:
- library_item = await self._add_library_item(item)
+ # check for existing item first
+ library_item = None
+ if cur_item := await self.get_library_item_by_prov_id(item.item_id, item.provider):
+ # existing item match by provider id
+ library_item = await self.update_item_in_library(cur_item.item_id, item) # noqa: SIM114
+ elif cur_item := await self.get_library_item_by_external_ids(item.external_ids):
+ # existing item match by external id
+ library_item = await self.update_item_in_library(cur_item.item_id, item)
+ else:
+ # search by name
+ async for db_item in self.iter_library_items(search=item.name):
+ if compare_strings(db_item.name, item.name):
+ # existing item found: update it
+ library_item = await self.update_item_in_library(db_item.item_id, item)
+ break
+ if not library_item:
+ # actually add a new item in the library db
+ # use the lock to prevent a race condition of the same item being added twice
+ async with self._db_add_lock:
+ library_item = await self._add_library_item(item)
self.mass.signal_event(
EventType.MEDIA_ITEM_ADDED,
library_item.uri,
cur_item = await self.get_library_item(db_id)
metadata = cur_item.metadata.update(getattr(update, "metadata", None), overwrite)
provider_mappings = self._get_provider_mappings(cur_item, update, overwrite)
+ cur_item.external_ids.update(update.external_ids)
match = {"item_id": db_id}
await self.mass.music.database.update(
self.db_table,
"sort_name": update.sort_name or cur_item.sort_name,
"metadata": serialize_to_json(metadata),
"provider_mappings": serialize_to_json(provider_mappings),
+ "external_ids": serialize_to_json(
+ update.external_ids if overwrite else cur_item.external_ids
+ ),
"timestamp_modified": int(utc_timestamp()),
},
)
async def _add_library_item(self, item: Radio) -> Radio:
"""Add a new item record to the database."""
- cur_item = None
- # safety guard: check for existing item first
- if cur_item := await self.get_library_item_by_prov_id(item.item_id, item.provider):
- # existing item found: update it
- return await self.update_item_in_library(cur_item.item_id, item)
- # try name matching
- match = {"name": item.name}
- if db_row := await self.mass.music.database.get_row(self.db_table, match):
- cur_item = Radio.from_dict(self._parse_db_row(db_row))
- # existing item found: update it
- return await self.update_item_in_library(cur_item.item_id, item)
- # insert new item
item.timestamp_added = int(utc_timestamp())
item.timestamp_modified = int(utc_timestamp())
new_item = await self.mass.music.database.insert(
"favorite": item.favorite,
"metadata": serialize_to_json(item.metadata),
"provider_mappings": serialize_to_json(item.provider_mappings),
+ "external_ids": serialize_to_json(item.external_ids),
"timestamp_added": int(utc_timestamp()),
"timestamp_modified": int(utc_timestamp()),
},
item.metadata.images = []
if item.image and isinstance(item.album, Album) and not item.album.image:
item.album.metadata.images = item.metadata.images
- # actually add (or update) the item in the library db
- # use the lock to prevent a race condition of the same item being added twice
- async with self._db_add_lock:
- library_item = await self._add_library_item(item)
+ # check for existing item first
+ library_item = None
+ if cur_item := await self.get_library_item_by_prov_id(item.item_id, item.provider):
+ # existing item match by provider id
+ library_item = await self.update_item_in_library(cur_item.item_id, item) # noqa: SIM114
+ elif cur_item := await self.get_library_item_by_external_ids(item.external_ids):
+ # existing item match by external id
+ library_item = await self.update_item_in_library(cur_item.item_id, item)
+ else:
+ # search by name
+ async for db_item in self.iter_library_items(search=item.name):
+ if compare_track(db_item, item):
+ # existing item found: update it
+ library_item = await self.update_item_in_library(db_item.item_id, item)
+ break
+ if not library_item:
+ # actually add a new item in the library db
+ # use the lock to prevent a race condition of the same item being added twice
+ async with self._db_add_lock:
+ library_item = await self._add_library_item(item)
# also fetch same track on all providers (will also get other quality versions)
if metadata_lookup:
await self._match(library_item)
metadata = cur_item.metadata.update(getattr(update, "metadata", None), overwrite)
provider_mappings = self._get_provider_mappings(cur_item, update, overwrite)
track_artists = await self._get_artist_mappings(cur_item, update, overwrite=overwrite)
+ cur_item.external_ids.update(update.external_ids)
await self.mass.music.database.update(
self.db_table,
{"item_id": db_id},
"metadata": serialize_to_json(metadata),
"provider_mappings": serialize_to_json(provider_mappings),
"timestamp_modified": int(utc_timestamp()),
+ "external_ids": serialize_to_json(
+ update.external_ids if overwrite else cur_item.external_ids
+ ),
},
)
# update/set provider_mappings table
async def _add_library_item(self, item: Track) -> Track:
"""Add a new item record to the database."""
- # check for existing item first
- if item.provider == "library":
- return await self.update_item_in_library(item.item_id, item)
- if cur_item := await self.get_library_item_by_prov_mappings(item.provider_mappings):
- return await self.update_item_in_library(cur_item.item_id, item)
- if item.mbid:
- match = {"mbid": item.mbid}
- if db_row := await self.mass.music.database.get_row(self.db_table, match):
- cur_item = Track.from_dict(self._parse_db_row(db_row))
- return await self.update_item_in_library(cur_item.item_id, item)
- match = {"sort_name": item.sort_name}
- for db_row in await self.mass.music.database.get_rows(self.db_table, match):
- row_track = Track.from_dict(self._parse_db_row(db_row))
- track_albums = await self.albums(row_track.item_id, row_track.provider)
- if compare_track(row_track, item, strict=True, track_albums=track_albums):
- cur_item = row_track
- return await self.update_item_in_library(cur_item.item_id, item)
track_artists = await self._get_artist_mappings(item)
sort_artist = track_artists[0].sort_name
new_item = await self.mass.music.database.insert(
"version": item.version,
"duration": item.duration,
"favorite": item.favorite,
- "mbid": item.mbid,
+ "external_ids": serialize_to_json(item.external_ids),
"metadata": serialize_to_json(item.metadata),
"provider_mappings": serialize_to_json(item.provider_mappings),
"artists": serialize_to_json(track_artists),
import urllib.parse
from base64 import b64encode
from contextlib import suppress
-from random import shuffle
from time import time
from typing import TYPE_CHECKING
from uuid import uuid4
Radio,
Track,
)
-from music_assistant.constants import ROOT_LOGGER_NAME
+from music_assistant.constants import (
+ ROOT_LOGGER_NAME,
+ VARIOUS_ARTISTS_ID_MBID,
+ VARIOUS_ARTISTS_NAME,
+)
+from music_assistant.server.helpers.compare import compare_strings
from music_assistant.server.helpers.images import create_collage, get_image_thumb
from music_assistant.server.models.core_controller import CoreController
+from music_assistant.server.providers.musicbrainz import MusicbrainzProvider
if TYPE_CHECKING:
from music_assistant.common.models.config_entries import CoreConfig
async def get_artist_metadata(self, artist: Artist) -> None:
"""Get/update rich metadata for an artist."""
if not artist.mbid:
+ # The musicbrainz ID is mandatory for all metadata lookups
artist.mbid = await self.get_artist_mbid(artist)
if not artist.mbid:
return
async def get_artist_mbid(self, artist: Artist) -> str | None:
"""Fetch musicbrainz id by performing search using the artist name, albums and tracks."""
+ if compare_strings(artist.name, VARIOUS_ARTISTS_NAME):
+ return VARIOUS_ARTISTS_ID_MBID
ref_albums = await self.mass.music.artists.albums(artist.item_id, artist.provider)
if len(ref_albums) < 10:
# fetch reference albums from provider(s) attached to the artist
for provider_mapping in artist.provider_mappings:
+ if provider_mapping.provider_instance == artist.provider:
+ continue
ref_albums += await self.mass.music.artists.albums(
provider_mapping.item_id, provider_mapping.provider_instance
)
if len(ref_tracks) < 10:
# fetch reference tracks from provider(s) attached to the artist
for provider_mapping in artist.provider_mappings:
+ if provider_mapping.provider_instance == artist.provider:
+ continue
ref_tracks += await self.mass.music.artists.tracks(
provider_mapping.item_id, provider_mapping.provider_instance
)
-
- # randomize providers to average the load
- providers = self.providers
- shuffle(providers)
-
- # try all providers one by one until we have a match
- for provider in providers:
- if ProviderFeature.GET_ARTIST_MBID not in provider.supported_features:
- continue
- if mbid := await provider.get_musicbrainz_artist_id(
- artist, ref_albums=ref_albums, ref_tracks=ref_tracks
- ):
- LOGGER.debug(
- "Fetched MusicBrainz ID for Artist %s on provider %s",
- artist.name,
- provider.name,
- )
- return mbid
+ # start lookup of musicbrainz id
+ musicbrainz: MusicbrainzProvider = self.mass.get_provider("musicbrainz")
+ assert musicbrainz
+ if mbid := await musicbrainz.get_musicbrainz_artist_id(
+ artist, ref_albums=ref_albums, ref_tracks=ref_tracks
+ ):
+ return mbid
# lookup failed
ref_albums_str = "/".join(x.name for x in ref_albums) or "none"
ref_tracks_str = "/".join(x.name for x in ref_tracks) or "none"
- LOGGER.info(
+ LOGGER.debug(
"Unable to get musicbrainz ID for artist %s\n"
" - using lookup-album(s): %s\n"
" - using lookup-track(s): %s\n",
from music_assistant.common.models.enums import (
ConfigEntryType,
EventType,
+ ExternalID,
MediaType,
ProviderFeature,
ProviderType,
prev_version = 0
if prev_version not in (0, DB_SCHEMA_VERSION):
- self.logger.info(
- "Performing database migration from %s to %s",
- prev_version,
- DB_SCHEMA_VERSION,
- )
+ # db version mismatch - we need to do a migration
# make a backup of db file
db_path_backup = db_path + ".backup"
await asyncio.to_thread(shutil.copyfile, db_path, db_path_backup)
- if prev_version < 22 or prev_version > DB_SCHEMA_VERSION:
- # for now just keep it simple and just recreate the tables
- # if the schema is too old or too new
- # we allow migrations only for up to 2 schema versions behind
- await self.database.execute(f"DROP TABLE IF EXISTS {DB_TABLE_ARTISTS}")
- await self.database.execute(f"DROP TABLE IF EXISTS {DB_TABLE_ALBUMS}")
- await self.database.execute(f"DROP TABLE IF EXISTS {DB_TABLE_TRACKS}")
- await self.database.execute(f"DROP TABLE IF EXISTS {DB_TABLE_PLAYLISTS}")
- await self.database.execute(f"DROP TABLE IF EXISTS {DB_TABLE_RADIOS}")
- # recreate missing tables
- await self.__create_database_tables()
-
- if prev_version in (22, 23):
- # reset albums, artists, tracks, impossible to migrate in a clean way
+ # handle db migration from previous schema to this one
+ if prev_version == DB_SCHEMA_VERSION - 1:
+ self.logger.info(
+ "Performing database migration from %s to %s",
+ prev_version,
+ DB_SCHEMA_VERSION,
+ )
+ self.logger.warning("DATABASE MIGRATION IN PROGRESS - THIS CAN TAKE A WHILE")
+ # migrate external id(s)
for table in (
DB_TABLE_ARTISTS,
DB_TABLE_ALBUMS,
DB_TABLE_TRACKS,
):
- self.logger.warning(
- "Resetting %s library/database - a full rescan will be performed!", table
- )
- await self.database.execute(f"DROP TABLE IF EXISTS {table}")
- # recreate missing tables
- await self.__create_database_tables()
-
- # migrate in_library --> favorite
- for table in (
- DB_TABLE_PLAYLISTS,
- DB_TABLE_RADIOS,
- ):
- # rename in_library --> favorite
+ # create new external_ids column
await self.database.execute(
- f"ALTER TABLE {table} RENAME COLUMN in_library TO favorite;"
+ f"ALTER TABLE {table} "
+ "ADD COLUMN external_ids "
+ "json NOT NULL DEFAULT '[]'"
)
- # clean out all non favorites from library db
- item_ids_to_delete = set()
+ # migrate existing ids into the new external_ids column
async for item in self.database.iter_items(table):
- if not (item["favorite"] or '"url' in item["provider_mappings"]):
- item_ids_to_delete.add(item["item_id"])
- continue
- # migrate provider_mapping column (audio_format)
- prov_mappings = json_loads(item["provider_mappings"])
- needs_update = False
- for mapping in prov_mappings:
- if "content_type" in mapping:
- needs_update = True
- mapping["audio_format"] = {
- "content_type": mapping.pop("content_type"),
- "sample_rate": mapping.pop("sample_rate"),
- "bit_depth": mapping.pop("bit_depth"),
- "channels": mapping.pop("channels", 2),
- "bit_rate": mapping.pop("bit_rate", 320),
- }
- if needs_update:
+ external_ids: set[tuple[str, str]] = set()
+ if mbid := item["mbid"]:
+ external_ids.add((ExternalID.MUSICBRAINZ, mbid))
+ for prov_mapping in json_loads(item["provider_mappings"]):
+ if isrc := prov_mapping.get("isrc"):
+ external_ids.add((ExternalID.ISRC, isrc))
+ if barcode := prov_mapping.get("barcode"):
+ external_ids.add((ExternalID.BARCODE, barcode))
+ if external_ids:
await self.database.update(
table,
{
"item_id": item["item_id"],
},
{
- "provider_mappings": json_dumps(prov_mappings),
+ "external_ids": json_dumps(external_ids),
},
)
- for item_id in item_ids_to_delete:
- await self.database.delete(table, {"item_id": item_id})
-
- if prev_version > 22 and prev_version < 25:
- # extend playlog table with media_type column
- await self.database.execute(
- f"ALTER TABLE {DB_TABLE_PLAYLOG} "
- "ADD COLUMN media_type TEXT NOT NULL DEFAULT 'track'"
+ # drop mbid column
+ await self.database.execute(f"DROP INDEX IF EXISTS {table}_mbid_idx")
+ await self.database.execute(f"ALTER TABLE {table} DROP COLUMN mbid")
+ # db migration succeeded
+ self.logger.info(
+ "Database migration to version %s completed",
+ DB_SCHEMA_VERSION,
)
-
- self.logger.info(
- "Database migration to version %s completed",
- DB_SCHEMA_VERSION,
- )
+ # handle all other schema versions
+ else:
+ # we keep it simple and just recreate the tables
+ # if the schema is too old (or too new)
+ # we do migrations only for up to 1 schema version behind
+ self.logger.warning(
+ "Database schema too old - Resetting library/database - "
+ "a full rescan will be performed!"
+ )
+ for table in (
+ DB_TABLE_TRACKS,
+ DB_TABLE_ALBUMS,
+ DB_TABLE_ARTISTS,
+ DB_TABLE_TRACKS,
+ DB_TABLE_PLAYLISTS,
+ DB_TABLE_RADIOS,
+ DB_TABLE_PROVIDER_MAPPINGS,
+ ):
+ await self.database.execute(f"DROP TABLE IF EXISTS {table}")
+ # recreate missing tables
+ await self.__create_database_tables()
# store current schema version
await self.database.insert_or_replace(
year INTEGER,
version TEXT,
favorite BOOLEAN DEFAULT 0,
- mbid TEXT,
artists json NOT NULL,
metadata json NOT NULL,
provider_mappings json NOT NULL,
+ external_ids json NOT NULL,
timestamp_added INTEGER NOT NULL,
timestamp_modified INTEGER NOT NULL
);"""
item_id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
sort_name TEXT NOT NULL,
- mbid TEXT,
favorite BOOLEAN DEFAULT 0,
metadata json NOT NULL,
provider_mappings json NOT NULL,
+ external_ids json NOT NULL,
timestamp_added INTEGER NOT NULL,
timestamp_modified INTEGER NOT NULL
);"""
version TEXT,
duration INTEGER,
favorite BOOLEAN DEFAULT 0,
- mbid TEXT,
artists json NOT NULL,
metadata json NOT NULL,
provider_mappings json NOT NULL,
+ external_ids json NOT NULL,
timestamp_added INTEGER NOT NULL,
timestamp_modified INTEGER NOT NULL
);"""
favorite BOOLEAN DEFAULT 0,
metadata json,
provider_mappings json,
+ external_ids json NOT NULL,
timestamp_added INTEGER NOT NULL,
timestamp_modified INTEGER NOT NULL
);"""
favorite BOOLEAN DEFAULT 0,
metadata json,
provider_mappings json,
+ external_ids json NOT NULL,
timestamp_added INTEGER NOT NULL,
timestamp_modified INTEGER NOT NULL
);"""
await self.database.execute(
"CREATE INDEX IF NOT EXISTS radios_sort_name_idx on radios(sort_name);"
)
- await self.database.execute("CREATE INDEX IF NOT EXISTS artists_mbid_idx on artists(mbid);")
- await self.database.execute("CREATE INDEX IF NOT EXISTS albums_mbid_idx on albums(mbid);")
- await self.database.execute("CREATE INDEX IF NOT EXISTS tracks_mbid_idx on tracks(mbid);")
import unidecode
from music_assistant.common.helpers.util import create_sort_name
+from music_assistant.common.models.enums import ExternalID
from music_assistant.common.models.media_items import (
Album,
AlbumTrack,
ItemMapping,
MediaItem,
MediaItemMetadata,
- ProviderMapping,
Track,
)
)
-def create_safe_string(input_str: str) -> str:
- """Return clean lowered string for compare actions."""
- input_str = input_str.lower().strip()
- unaccented_string = unidecode.unidecode(input_str)
- return re.sub(r"[^a-zA-Z0-9]", "", unaccented_string)
-
-
-def loose_compare_strings(base: str, alt: str) -> bool:
- """Compare strings and return True even on partial match."""
- # this is used to display 'versions' of the same track/album
- # where we account for other spelling or some additional wording in the title
- word_count = len(base.split(" "))
- if word_count == 1 and len(base) < 10:
- return compare_strings(base, alt, False)
- base_comp = create_safe_string(base)
- alt_comp = create_safe_string(alt)
- if base_comp in alt_comp:
- return True
- if alt_comp in base_comp:
- return True
- return False
-
-
-def compare_strings(str1: str, str2: str, strict: bool = True) -> bool:
- """Compare strings and return True if we have an (almost) perfect match."""
- if not str1 or not str2:
- return False
- # return early if total length mismatch
- if abs(len(str1) - len(str2)) > 4:
- return False
- if not strict:
- # handle '&' vs 'And'
- if " & " in str1 and " and " in str2.lower():
- str2 = str2.lower().replace(" and ", " & ")
- elif " and " in str1.lower() and " & " in str2:
- str2 = str2.replace(" & ", " and ")
- return create_safe_string(str1) == create_safe_string(str2)
-
- return create_sort_name(str1) == create_sort_name(str2)
-
-
-def compare_version(base_version: str, compare_version: str) -> bool:
- """Compare version string."""
- if not base_version and not compare_version:
- return True
- if not base_version and compare_version.lower() in IGNORE_VERSIONS:
- return True
- if not compare_version and base_version.lower() in IGNORE_VERSIONS:
- return True
- if not base_version and compare_version:
- return False
- if base_version and not compare_version:
- return False
- if " " not in base_version:
- return compare_strings(base_version, compare_version)
- # do this the hard way as sometimes the version string is in the wrong order
- base_versions = base_version.lower().split(" ").sort()
- compare_versions = compare_version.lower().split(" ").sort()
- return base_versions == compare_versions
-
-
-def compare_explicit(base: MediaItemMetadata, compare: MediaItemMetadata) -> bool:
- """Compare if explicit is same in metadata."""
- if base.explicit is None or compare.explicit is None:
- # explicitness info is not always present in metadata
- # only strict compare them if both have the info set
- return True
- return base == compare
-
-
def compare_artist(
base_item: Artist | ItemMapping,
compare_item: Artist | ItemMapping,
-) -> bool:
+ allow_name_match: bool = True,
+) -> bool | None:
"""Compare two artist items and return True if they match."""
if base_item is None or compare_item is None:
return False
# return early on exact item_id match
if compare_item_ids(base_item, compare_item):
return True
-
- # prefer match on mbid
- if getattr(base_item, "mbid", None) and getattr(compare_item, "mbid", None):
- return base_item.mbid == compare_item.mbid
-
- # fallback to comparing
- return compare_strings(base_item.name, compare_item.name, False)
-
-
-def compare_artists(
- base_items: list[Artist | ItemMapping],
- compare_items: list[Artist | ItemMapping],
- any_match: bool = True,
-) -> bool:
- """Compare two lists of artist and return True if both lists match (exactly)."""
- matches = 0
- for base_item in base_items:
- for compare_item in compare_items:
- if compare_artist(base_item, compare_item):
- if any_match:
- return True
- matches += 1
- return len(base_items) == matches
-
-
-def compare_item_ids(
- base_item: MediaItem | ItemMapping, compare_item: MediaItem | ItemMapping
-) -> bool:
- """Compare item_id(s) of two media items."""
- if not base_item.provider or not compare_item.provider:
- return False
- if not base_item.item_id or not compare_item.item_id:
+ # return early on (un)matched external id
+ external_id_match = compare_external_ids(base_item.external_ids, compare_item.external_ids)
+ if external_id_match is not None:
+ return external_id_match
+ ## fallback to comparing on attributes
+ name_match = compare_strings(base_item.name, compare_item.name, strict=True)
+ if name_match is False:
return False
- if base_item.provider == compare_item.provider and base_item.item_id == compare_item.item_id:
- return True
-
- base_prov_ids = getattr(base_item, "provider_mappings", None)
- compare_prov_ids = getattr(compare_item, "provider_mappings", None)
-
- if base_prov_ids is not None:
- for prov_l in base_item.provider_mappings:
- if (
- prov_l.provider_domain == compare_item.provider
- and prov_l.item_id == compare_item.item_id
- ):
- return True
-
- if compare_prov_ids is not None:
- for prov_r in compare_item.provider_mappings:
- if prov_r.provider_domain == base_item.provider and prov_r.item_id == base_item.item_id:
- return True
-
- if base_prov_ids is not None and compare_prov_ids is not None:
- for prov_l in base_item.provider_mappings:
- for prov_r in compare_item.provider_mappings:
- if prov_l.provider_domain != prov_r.provider_domain:
- continue
- if prov_l.item_id == prov_r.item_id:
- return True
- return False
-
-
-def compare_albums(
- base_items: list[Album | ItemMapping],
- compare_items: list[Album | ItemMapping],
-):
- """Compare two lists of albums and return True if a match was found."""
- for base_item in base_items:
- for compare_item in compare_items:
- if compare_album(base_item, compare_item):
- return True
- return False
-
-
-def compare_barcode(
- base_mappings: set[ProviderMapping],
- compare_mappings: set[ProviderMapping],
-):
- """Compare barcode within provider mappings and return True if a match was found."""
- for base_mapping in base_mappings:
- if not base_mapping.barcode:
- continue
- for compare_mapping in compare_mappings:
- if not compare_mapping.barcode:
- continue
- # convert EAN-13 to UPC-A by stripping off the leading zero
- base_upc = (
- base_mapping.barcode[1:]
- if base_mapping.barcode.startswith("0")
- else base_mapping.barcode
- )
- compare_upc = (
- compare_mapping.barcode[1:]
- if compare_mapping.barcode.startswith("0")
- else compare_mapping.barcode
- )
- if compare_strings(base_upc, compare_upc):
- return True
- return False
-
-
-def compare_isrc(
- base_mappings: set[ProviderMapping],
- compare_mappings: set[ProviderMapping],
-):
- """Compare isrc within provider mappings and return True if a match was found."""
- for base_mapping in base_mappings:
- if not base_mapping.isrc:
- continue
- for compare_mapping in compare_mappings:
- if not compare_mapping.isrc:
- continue
- if compare_strings(base_mapping.isrc, compare_mapping.isrc):
- return True
- return False
+ return name_match if allow_name_match else None
def compare_album(
base_item: Album | ItemMapping,
compare_item: Album | ItemMapping,
-):
+ allow_name_match: bool = True,
+) -> bool | None:
"""Compare two album items and return True if they match."""
if base_item is None or compare_item is None:
return False
# return early on exact item_id match
if compare_item_ids(base_item, compare_item):
return True
- # prefer match on mbid (not present on ItemMapping)
- if getattr(base_item, "mbid", None) and getattr(compare_item, "mbid", None):
- return compare_strings(base_item.mbid, compare_item.mbid)
- # prefer match on barcode/upc
- # not present on ItemMapping
- if (
- isinstance(base_item, Album)
- and isinstance(compare_item, Album)
- and compare_barcode(base_item.provider_mappings, compare_item.provider_mappings)
- ):
- return True
- # fallback to comparing
- if not compare_strings(base_item.name, compare_item.name, True):
- return False
+ # return early on (un)matched external id
+ external_id_match = compare_external_ids(base_item.external_ids, compare_item.external_ids)
+ if external_id_match is not None:
+ return external_id_match
+ ## fallback to comparing on attributes
+ # compare version
if not compare_version(base_item.version, compare_item.version):
return False
+ # compare name
+ name_match = compare_strings(base_item.name, compare_item.name, strict=True)
+ if name_match is False:
+ return False
+ # compare explicitness
if (
hasattr(base_item, "metadata")
and hasattr(compare_item, "metadata")
and not compare_artists(base_item.artists, compare_item.artists, True)
):
return False
- return base_item.sort_name == compare_item.sort_name
+ return name_match if allow_name_match else None
def compare_track(
compare_item: Track | AlbumTrack,
strict: bool = True,
track_albums: list[Album | ItemMapping] | None = None,
-):
+) -> bool:
"""Compare two track items and return True if they match."""
if base_item is None or compare_item is None:
return False
# return early on exact item_id match
if compare_item_ids(base_item, compare_item):
return True
- if compare_isrc(base_item.provider_mappings, compare_item.provider_mappings):
- return True
- if compare_strings(base_item.mbid, compare_item.mbid):
- return True
- # track name must match
- if not compare_strings(base_item.name, compare_item.name, False):
+ # return early on (un)matched external id
+ external_id_match = compare_external_ids(base_item.external_ids, compare_item.external_ids)
+ if external_id_match is not None:
+ return external_id_match
+ ## fallback to comparing on attributes
+ # compare name
+ if not compare_strings(base_item.name, compare_item.name, strict=True):
return False
# track artist(s) must match
- if not compare_artists(base_item.artists, compare_item.artists):
+ if not compare_artists(base_item.artists, compare_item.artists, any_match=not strict):
return False
# track version must match
if strict and not compare_version(base_item.version, compare_item.version):
# all efforts failed, this is NOT a match
return False
+
+
+def compare_artists(
+ base_items: list[Artist | ItemMapping],
+ compare_items: list[Artist | ItemMapping],
+ any_match: bool = True,
+) -> bool:
+ """Compare two lists of artist and return True if both lists match (exactly)."""
+ matches = 0
+ for base_item in base_items:
+ for compare_item in compare_items:
+ if compare_artist(base_item, compare_item):
+ if any_match:
+ return True
+ matches += 1
+ return len(base_items) == matches
+
+
+def compare_albums(
+ base_items: list[Album | ItemMapping],
+ compare_items: list[Album | ItemMapping],
+ any_match: bool = True,
+) -> bool:
+ """Compare two lists of albums and return True if a match was found."""
+ matches = 0
+ for base_item in base_items:
+ for compare_item in compare_items:
+ if compare_album(base_item, compare_item):
+ if any_match:
+ return True
+ matches += 1
+ return len(base_items) == matches
+
+
+def compare_item_ids(
+ base_item: MediaItem | ItemMapping, compare_item: MediaItem | ItemMapping
+) -> bool:
+ """Compare item_id(s) of two media items."""
+ if not base_item.provider or not compare_item.provider:
+ return False
+ if not base_item.item_id or not compare_item.item_id:
+ return False
+ if base_item.provider == compare_item.provider and base_item.item_id == compare_item.item_id:
+ return True
+
+ base_prov_ids = getattr(base_item, "provider_mappings", None)
+ compare_prov_ids = getattr(compare_item, "provider_mappings", None)
+
+ if base_prov_ids is not None:
+ for prov_l in base_item.provider_mappings:
+ if (
+ prov_l.provider_domain == compare_item.provider
+ and prov_l.item_id == compare_item.item_id
+ ):
+ return True
+
+ if compare_prov_ids is not None:
+ for prov_r in compare_item.provider_mappings:
+ if prov_r.provider_domain == base_item.provider and prov_r.item_id == base_item.item_id:
+ return True
+
+ if base_prov_ids is not None and compare_prov_ids is not None:
+ for prov_l in base_item.provider_mappings:
+ for prov_r in compare_item.provider_mappings:
+ if prov_l.provider_domain != prov_r.provider_domain:
+ continue
+ if prov_l.item_id == prov_r.item_id:
+ return True
+ return False
+
+
+def compare_external_ids(
+ external_ids_base: set[tuple[ExternalID, str]],
+ external_ids_compare: set[tuple[ExternalID, str]],
+) -> bool | None:
+ """Compare external ids and return True if a match was found."""
+ for external_id_base in external_ids_base:
+ for external_id_compare in external_ids_compare:
+ if external_id_compare[0] != external_id_base[0]:
+ continue
+ # handle upc stored as EAN-13 barcode
+ if external_id_base[0] == ExternalID.BARCODE and len(external_id_base[1]) == 12:
+ external_id_base[1] = f"0{external_id_base}"
+ if external_id_compare[0] == ExternalID.BARCODE and len(external_id_compare[1]) == 12:
+ external_id_compare[1] = f"0{external_id_compare}"
+ # external id is exact match. either it is a match or it isn't
+ return external_id_compare[0] == external_id_base[0]
+ # return None to define we did not find the same external id type in both sets
+ return None
+
+
+def create_safe_string(input_str: str) -> str:
+ """Return clean lowered string for compare actions."""
+ input_str = input_str.lower().strip()
+ unaccented_string = unidecode.unidecode(input_str)
+ return re.sub(r"[^a-zA-Z0-9]", "", unaccented_string)
+
+
+def loose_compare_strings(base: str, alt: str) -> bool:
+ """Compare strings and return True even on partial match."""
+ # this is used to display 'versions' of the same track/album
+ # where we account for other spelling or some additional wording in the title
+ word_count = len(base.split(" "))
+ if word_count == 1 and len(base) < 10:
+ return compare_strings(base, alt, False)
+ base_comp = create_safe_string(base)
+ alt_comp = create_safe_string(alt)
+ if base_comp in alt_comp:
+ return True
+ if alt_comp in base_comp:
+ return True
+ return False
+
+
+def compare_strings(str1: str, str2: str, strict: bool = True) -> bool:
+ """Compare strings and return True if we have an (almost) perfect match."""
+ if not str1 or not str2:
+ return False
+ # return early if total length mismatch
+ if abs(len(str1) - len(str2)) > 4:
+ return False
+ if not strict:
+ # handle '&' vs 'And'
+ if " & " in str1 and " and " in str2.lower():
+ str2 = str2.lower().replace(" and ", " & ")
+ elif " and " in str1.lower() and " & " in str2:
+ str2 = str2.replace(" & ", " and ")
+ return create_safe_string(str1) == create_safe_string(str2)
+
+ return create_sort_name(str1) == create_sort_name(str2)
+
+
+def compare_version(base_version: str, compare_version: str) -> bool:
+ """Compare version string."""
+ if not base_version and not compare_version:
+ return True
+ if not base_version and compare_version.lower() in IGNORE_VERSIONS:
+ return True
+ if not compare_version and base_version.lower() in IGNORE_VERSIONS:
+ return True
+ if not base_version and compare_version:
+ return False
+ if base_version and not compare_version:
+ return False
+ if " " not in base_version:
+ return compare_strings(base_version, compare_version)
+ # do this the hard way as sometimes the version string is in the wrong order
+ base_versions = base_version.lower().split(" ").sort()
+ compare_versions = compare_version.lower().split(" ").sort()
+ return base_versions == compare_versions
+
+
+def compare_explicit(base: MediaItemMetadata, compare: MediaItemMetadata) -> bool | None:
+ """Compare if explicit is same in metadata."""
+ if base.explicit is not None and compare.explicit is not None:
+ # explicitness info is not always present in metadata
+ # only strict compare them if both have the info set
+ return base.explicit == compare.explicit
+ return None
@property
def musicbrainz_albumartistids(self) -> tuple[str, ...]:
"""Return musicbrainz_albumartistid tag if present."""
- return split_items(self.tags.get("musicbrainzalbumartistid"), True)
+ if tag := self.tags.get("musicbrainzalbumartistid"):
+ return split_items(tag, True)
+ return split_items(self.tags.get("musicbrainzreleaseartistid"), True)
@property
def musicbrainz_releasegroupid(self) -> str | None:
return self.tags.get("musicbrainzreleasegroupid")
@property
- def musicbrainz_trackid(self) -> str | None:
- """Return musicbrainz_trackid tag if present."""
- if tag := self.tags.get("musicbrainztrackid"):
+ def musicbrainz_releaseid(self) -> str | None:
+ """Return musicbrainz_releaseid tag if present."""
+ return self.tags.get("musicbrainzreleaseid", self.tags.get("musicbrainzalbumid"))
+
+ @property
+ def musicbrainz_recordingid(self) -> str | None:
+ """Return musicbrainz_recordingid tag if present."""
+ if tag := self.tags.get("UFID:http://musicbrainz.org"):
+ return tag
+ if tag := self.tags.get("musicbrainz.org"):
+ return tag
+ if tag := self.tags.get("musicbrainzrecordingid"):
+ return tag
+ if tag := self.tags.get("musicbrainzreleasetrackid"):
+ return tag
+ return self.tags.get("musicbrainztrackid")
+
+ @property
+ def title_sort(self) -> str | None:
+ """Return sort title tag (if exists)."""
+ if tag := self.tags.get("titlesort"):
return tag
- return self.tags.get("musicbrainzreleasetrackid")
+ return None
+
+ @property
+ def album_sort(self) -> str | None:
+ """Return album sort title tag (if exists)."""
+ if tag := self.tags.get("albumsort"):
+ return tag
+ return None
+
+ @property
+ def artist_sort_names(self) -> tuple[str, ...]:
+ """Return artist sort name tag(s) if present."""
+ return split_items(self.tags.get("artistsort"), False)
+
+ @property
+ def album_artist_sort_names(self) -> tuple[str, ...]:
+ """Return artist sort name tag(s) if present."""
+ return split_items(self.tags.get("albumartistsort"), False)
@property
def album_type(self) -> AlbumType:
return AlbumType.AUDIOBOOK
if "podcast" in self.tags.get("genre", "").lower() and len(self.chapters) > 1:
return AlbumType.PODCAST
+ if self.tags.get("compilation", "") == "1":
+ return AlbumType.COMPILATION
tag = (
self.tags.get("musicbrainzalbumtype")
or self.tags.get("albumtype")
return AlbumType.UNKNOWN
@property
- def isrc(self) -> str | None:
- """Return isrc tag."""
- for tag in ("isrc", "tsrc"):
- if tag := self.tags.get("isrc"):
- # sometyimes the field contains multiple values
- # we only need one
- return split_items(tag, True)[0]
- return None
+ def isrc(self) -> tuple[str]:
+ """Return isrc tag(s)."""
+ for tag_name in ("isrc", "tsrc"):
+ if tag := self.tags.get(tag_name):
+ # sometimes the field contains multiple values
+ return split_items(tag, True)
+ return tuple()
@property
def barcode(self) -> str | None:
"""Return barcode (upc/ean) tag(s)."""
- for tag in ("barcode", "upc", "ean"):
- if tag := self.tags.get("isrc"):
- # sometyimes the field contains multiple values
+ for tag_name in ("barcode", "upc", "ean"):
+ if tag := self.tags.get(tag_name):
+ # sometimes the field contains multiple values
# we only need one
- return split_items(tag, True)[0]
+ for item in split_items(tag, True):
+ if len(item) == 12:
+ # convert UPC barcode to EAN-13
+ return f"0{item}"
+ return item
return None
@property
)
return chapters
+ @property
+ def lyrics(self) -> str | None:
+ """Return lyrics tag (if exists)."""
+ for key, value in self.tags.items():
+ if key.startswith("lyrics"):
+ return value
+ return None
+
@classmethod
def parse(cls, raw: dict) -> AudioTags:
"""Parse instance from raw ffmpeg info output."""
tags = {}
for stream in raw["streams"] + [raw["format"]]:
for key, value in stream.get("tags", {}).items():
- key = key.lower().replace(" ", "").replace("_", "") # noqa: PLW2901
- tags[key] = value
+ alt_key = (
+ key.lower().replace(" ", "").replace("_", "").replace("-", "")
+ ) # noqa: PLW2901
+ tags[alt_key] = value
return AudioTags(
raw=raw,
"""Model/base for a Metadata Provider implementation."""
from __future__ import annotations
-from collections.abc import Iterable
from typing import TYPE_CHECKING
from music_assistant.common.models.enums import ProviderFeature
ProviderFeature.ARTIST_METADATA,
ProviderFeature.ALBUM_METADATA,
ProviderFeature.TRACK_METADATA,
- ProviderFeature.GET_ARTIST_MBID,
)
"""Retrieve metadata for a track on this Metadata provider."""
if ProviderFeature.TRACK_METADATA in self.supported_features:
raise NotImplementedError
-
- async def get_musicbrainz_artist_id(
- self, artist: Artist, ref_albums: Iterable[Album], ref_tracks: Iterable[Track]
- ) -> str | None:
- """Discover MusicBrainzArtistId for an artist given some reference albums/tracks."""
- if ProviderFeature.GET_ARTIST_MBID in self.supported_features:
- raise NotImplementedError
AlbumType,
ConfigEntryType,
ContentType,
+ ExternalID,
ImageType,
MediaType,
ProviderFeature,
extra_init_kwargs: dict[str, Any] | None = None,
) -> Track | PlaylistTrack | AlbumTrack:
"""Parse the deezer-python track to a MASS track."""
- isrc = track.isrc if hasattr(track, "isrc") else None
if hasattr(track, "artist"):
artist = ItemMapping(
media_type=MediaType.ARTIST,
track_class = AlbumTrack
else:
track_class = Track
- return track_class(
+ item = track_class(
item_id=str(track.id),
provider=self.domain,
name=track.title,
provider_instance=self.instance_id,
available=self.track_available(track=track, user_country=user_country),
url=track.link,
- isrc=isrc,
)
},
metadata=self.parse_metadata_track(track=track),
**extra_init_kwargs,
)
+ if isrc := getattr(track, "isrc", None):
+ item.external_ids.add((ExternalID.ISRC, isrc))
+ return item
def get_short_title(self, track: deezer.Track):
"""Short names only returned, if available."""
-<svg xmlns="http://www.w3.org/2000/svg" width="512" height="512" viewBox="0 0 48 48" fill="none" shape-rendering="geometricPrecision" text-rendering="geometricPrecision" image-rendering="optimizeQuality" fill-rule="evenodd" clip-rule="evenodd" >\r
+<svg xmlns="http://www.w3.org/2000/svg" viewBox="0 0 48 48" fill="none" shape-rendering="geometricPrecision" text-rendering="geometricPrecision" image-rendering="optimizeQuality" fill-rule="evenodd" clip-rule="evenodd" >\r
<path fill-rule="evenodd" clip-rule="evenodd" d="M41.0955 7.32313C41.5396 4.74914 42.1912 3.13054 42.913 3.12744H42.9146C44.2606 3.13208 45.3517 8.7454 45.3517 15.6759C45.3517 22.6063 44.259 28.2243 42.9115 28.2243C42.3591 28.2243 41.8494 27.2704 41.4389 25.6719C40.7903 31.5233 39.4443 35.5459 37.8862 35.5459C36.6806 35.5459 35.5986 33.1296 34.8722 29.3188C34.3762 36.5662 33.1279 41.708 31.6689 41.708C30.7533 41.708 29.9185 39.6705 29.3005 36.3529C28.5573 43.2014 26.8405 48 24.8382 48C22.836 48 21.1162 43.2029 20.376 36.3529C19.7625 39.6705 18.9278 41.708 18.0075 41.708C16.5486 41.708 15.3033 36.5662 14.8043 29.3188C14.0779 33.1296 12.999 35.5459 11.7903 35.5459C10.2337 35.5459 8.88621 31.5249 8.23763 25.6719C7.83017 27.2751 7.31741 28.2243 6.76497 28.2243C5.41745 28.2243 4.32478 22.6063 4.32478 15.6759C4.32478 8.7454 5.41745 3.12744 6.76497 3.12744C7.48833 3.12744 8.13538 4.75068 8.58405 7.32313C9.30283 2.88473 10.4703 0 11.7903 0C13.3576 0 14.7158 4.07975 15.3583 10.0038C15.987 5.69216 16.9408 2.94348 18.0091 2.94348C19.5061 2.94348 20.7789 8.34964 21.2505 15.8908C22.1371 12.0243 23.4205 9.59876 24.8413 9.59876C26.2621 9.59876 27.5455 12.0259 28.4306 15.8908C28.9037 8.34964 30.1749 2.94348 31.672 2.94348C32.7387 2.94348 33.691 5.69216 34.3228 10.0038C34.9637 4.07975 36.3219 0 37.8892 0C39.2047 0 40.3767 2.88628 41.0955 7.32313ZM0.837891 14.4417C0.837891 11.3436 1.45748 8.83142 2.22204 8.83142C2.9866 8.83142 3.60619 11.3436 3.60619 14.4417C3.60619 17.5397 2.9866 20.0519 2.22204 20.0519C1.45748 20.0519 0.837891 17.5397 0.837891 14.4417ZM46.0693 14.4417C46.0693 11.3436 46.6888 8.83142 47.4534 8.83142C48.218 8.83142 48.8376 11.3436 48.8376 14.4417C48.8376 17.5397 48.218 20.0519 47.4534 20.0519C46.6888 20.0519 46.0693 17.5397 46.0693 14.4417Z" fill="#A238FF"/>\r
-</svg>
+</svg>\r
ConfigEntryType,
ConfigValueOption,
)
-from music_assistant.common.models.enums import ProviderFeature
+from music_assistant.common.models.enums import ExternalID, ProviderFeature
from music_assistant.common.models.errors import (
InvalidDataError,
MediaNotFoundError,
StreamDetails,
Track,
)
-from music_assistant.constants import VARIOUS_ARTISTS_ID_MBID, VARIOUS_ARTISTS_NAME
+from music_assistant.constants import VARIOUS_ARTISTS_NAME
from music_assistant.server.controllers.cache import use_cache
from music_assistant.server.controllers.music import DB_SCHEMA_VERSION
from music_assistant.server.helpers.compare import compare_strings
from music_assistant.server.helpers.playlists import parse_m3u, parse_pls
from music_assistant.server.helpers.tags import parse_tags, split_items
from music_assistant.server.models.music_provider import MusicProvider
+from music_assistant.server.providers.musicbrainz import MusicbrainzProvider
from .helpers import get_parentdir
description="Music Assistant prefers information stored in ID3 tags and only uses"
" online sources for additional metadata. This means that the ID3 tags need to be "
"accurate, preferably tagged with MusicBrainz Picard.",
- advanced=True,
+ advanced=False,
required=False,
options=(
ConfigValueOption("Skip track and log warning", "skip"),
bit_depth=tags.bits_per_sample,
bit_rate=tags.bit_rate,
),
- isrc=tags.isrc,
)
},
}
**base_details,
)
+ if isrc_tags := tags.isrc:
+ for isrsc in isrc_tags:
+ track.external_ids.add((ExternalID.ISRC, isrsc))
+
+ if acoustid := tags.get("acoustidid"):
+ track.external_ids.add((ExternalID.ACOUSTID, acoustid))
+
# album
if tags.album:
# work out if we have an album and/or disc folder
# disc_dir is the folder level where the tracks are located
# this may be a separate disc folder (Disc 1, Disc 2 etc) underneath the album folder
# or this is an album folder with the disc attached
- disc_dir = get_parentdir(file_item.path, f"disc {tags.disc or ''}")
+ disc_dir = get_parentdir(file_item.path, f"disc {tags.disc or 1}")
album_dir = get_parentdir(disc_dir or file_item.path, tags.album)
# album artist(s)
+ album_artists = []
if tags.album_artists:
- album_artists = []
for index, album_artist_str in enumerate(tags.album_artists):
# work out if we have an artist folder
artist_dir = get_parentdir(album_dir, album_artist_str, 1)
if not artist.mbid:
with contextlib.suppress(IndexError):
artist.mbid = tags.musicbrainz_albumartistids[index]
+ # album artist sort name
+ with contextlib.suppress(IndexError):
+ artist.sort_name = tags.album_artist_sort_names[index]
album_artists.append(artist)
else:
# album artist tag is missing, determine fallback
fallback_action = self.config.get_value(CONF_MISSING_ALBUM_ARTIST_ACTION)
- if fallback_action == "various_artists":
+ musicbrainz: MusicbrainzProvider = self.mass.get_provider("musicbrainz")
+ assert musicbrainz
+ # lookup track details on musicbrainz first
+ if mb_search_details := await musicbrainz.search(
+ tags.artists[0], tags.album, tags.title, tags.version
+ ):
+ # get full releasegroup details and get the releasegroup artist(s)
+ mb_details = await musicbrainz.get_releasegroup_details(mb_search_details[1].id)
+ for mb_artist in mb_details.artist_credit:
+ artist = await self._parse_artist(
+ mb_artist.artist.name, mb_artist.artist.sort_name
+ )
+ artist.mbid = mb_artist.artist.id
+ album_artists.append(artist)
+ if not tags.musicbrainz_recordingid:
+ tags.tags["musicbrainzrecordingid"] = mb_search_details[2].id
+ if not tags.musicbrainz_releasegroupid:
+ tags.tags["musicbrainzreleasegroupid"] = mb_search_details[1].id
+ # fallback to various artists (if defined by user)
+ elif fallback_action == "various_artists":
self.logger.warning(
"%s is missing ID3 tag [albumartist], using %s as fallback",
file_item.path,
VARIOUS_ARTISTS_NAME,
)
album_artists = [await self._parse_artist(name=VARIOUS_ARTISTS_NAME)]
+ # fallback to track artists (if defined by user)
elif fallback_action == "track_artist":
self.logger.warning(
"%s is missing ID3 tag [albumartist], using track artist(s) as fallback",
await self._parse_artist(name=track_artist_str)
for track_artist_str in tags.artists
]
+ # fallback to just log error and add track without album
else:
# default action is to skip the track
raise InvalidDataError("missing ID3 tag [albumartist]")
if not artist.mbid:
with contextlib.suppress(IndexError):
artist.mbid = tags.musicbrainz_artistids[index]
+ # artist sort name
+ with contextlib.suppress(IndexError):
+ artist.sort_name = tags.artist_sort_names[index]
track.artists.append(artist)
# handle embedded cover image
track.disc_number = tags.disc
track.track_number = tags.track
track.metadata.copyright = tags.get("copyright")
- track.metadata.lyrics = tags.get("lyrics")
+ track.metadata.lyrics = tags.lyrics
explicit_tag = tags.get("itunesadvisory")
if explicit_tag is not None:
track.metadata.explicit = explicit_tag == "1"
- track.mbid = tags.musicbrainz_trackid
+ track.mbid = tags.musicbrainz_recordingid
track.metadata.chapters = tags.chapters
if track.album:
if not track.album.mbid:
self,
name: str | None = None,
artist_path: str | None = None,
+ sort_name: str | None = None,
) -> Artist | None:
"""Lookup metadata in Artist folder."""
assert name or artist_path
if not artist_path:
# check if we have an existing item
- sort_name = create_sort_name(name)
- async for item in self.mass.music.artists.iter_library_items(search=sort_name):
- if not compare_strings(sort_name, item.sort_name):
+ async for item in self.mass.music.artists.iter_library_items(search=name):
+ if not compare_strings(name, item.name):
continue
for prov_mapping in item.provider_mappings:
if prov_mapping.provider_instance == self.instance_id:
item_id=artist_path,
provider=self.instance_id,
name=name,
+ sort_name=sort_name or create_sort_name(name),
provider_mappings={
ProviderMapping(
item_id=artist_path,
url=artist_path,
)
},
- mbid=VARIOUS_ARTISTS_ID_MBID if compare_strings(name, VARIOUS_ARTISTS_NAME) else None,
)
if not await self.exists(artist_path):
disc_path: str | None,
artists: list[Artist],
barcode: str | None = None,
+ sort_name: str | None = None,
) -> Album | None:
"""Lookup metadata in Album folder."""
- assert (name or album_path) and artists
- if not album_path:
- # create fake path
+ assert name or album_path
+ # create fake path if needed
+ if not album_path and artists:
album_path = artists[0].name + os.sep + name
+ elif not album_path:
+ album_path = name
if not name:
name = album_path.split(os.sep)[-1]
item_id=album_path,
provider=self.instance_id,
name=name,
+ sort_name=sort_name or create_sort_name(name),
artists=artists,
provider_mappings={
ProviderMapping(
provider_domain=self.instance_id,
provider_instance=self.instance_id,
url=album_path,
- barcode=barcode,
)
},
)
+ if barcode:
+ album.external_ids.add((ExternalID.BARCODE, barcode))
if not await self.exists(album_path):
# return basic object if there is no dedicated album folder
import re
from collections.abc import Iterable
+from contextlib import suppress
+from dataclasses import dataclass, field
from json import JSONDecodeError
from typing import TYPE_CHECKING, Any
import aiohttp.client_exceptions
from asyncio_throttle import Throttler
+from mashumaro import DataClassDictMixin
-from music_assistant.common.helpers.util import create_sort_name
+from music_assistant.common.helpers.util import parse_title_and_version
from music_assistant.common.models.config_entries import ConfigEntry, ConfigValueType
-from music_assistant.common.models.enums import ProviderFeature
+from music_assistant.common.models.enums import ExternalID, ProviderFeature
+from music_assistant.common.models.errors import InvalidDataError
from music_assistant.server.controllers.cache import use_cache
from music_assistant.server.helpers.compare import compare_strings
from music_assistant.server.models.metadata_provider import MetadataProvider
LUCENE_SPECIAL = r'([+\-&|!(){}\[\]\^"~*?:\\\/])'
-SUPPORTED_FEATURES = (ProviderFeature.GET_ARTIST_MBID,)
+SUPPORTED_FEATURES = tuple()
async def setup(
return tuple() # we do not have any config entries (yet)
+def replace_hyphens(data: dict[str, Any]) -> dict[str, Any]:
+ """Change all hyphens to underscores."""
+ new_values = {}
+ for key, value in data.items():
+ new_key = key.replace("-", "_")
+ if isinstance(value, dict):
+ new_values[new_key] = replace_hyphens(value)
+ elif isinstance(value, list):
+ new_values[new_key] = [replace_hyphens(x) if isinstance(x, dict) else x for x in value]
+ else:
+ new_values[new_key] = value
+ return new_values
+
+
+@dataclass
+class MusicBrainzTag(DataClassDictMixin):
+ """Model for a (basic) Tag object as received from the MusicBrainz API."""
+
+ count: int
+ name: str
+
+
+@dataclass
+class MusicBrainzAlias(DataClassDictMixin):
+ """Model for a (basic) Alias object from MusicBrainz."""
+
+ name: str
+ sort_name: str
+
+ # optional fields
+ locale: str | None = None
+ type: str | None = None
+ primary: bool | None = None
+ begin_date: str | None = None
+ end_date: str | None = None
+
+
+@dataclass
+class MusicBrainzArtist(DataClassDictMixin):
+ """Model for a (basic) Artist object from MusicBrainz."""
+
+ id: str
+ name: str
+ sort_name: str
+
+ # optional fields
+ aliases: list[MusicBrainzAlias] | None = None
+ tags: list[MusicBrainzTag] | None = None
+
+
+@dataclass
+class MusicBrainzArtistCredit(DataClassDictMixin):
+ """Model for a (basic) ArtistCredit object from MusicBrainz."""
+
+ name: str
+ artist: MusicBrainzArtist
+
+
+@dataclass
+class MusicBrainzReleaseGroup(DataClassDictMixin):
+ """Model for a (basic) ReleaseGroup object from MusicBrainz."""
+
+ id: str
+ primary_type_id: str
+ title: str
+ primary_type: str
+
+ # optional fields
+ secondary_types: list[str] | None = None
+ secondary_type_ids: list[str] | None = None
+ artist_credit: list[MusicBrainzArtistCredit] | None = None
+
+
+@dataclass
+class MusicBrainzTrack(DataClassDictMixin):
+ """Model for a (basic) Track object from MusicBrainz."""
+
+ id: str
+ number: str
+ title: str
+ length: int
+
+
+@dataclass
+class MusicBrainzMedia(DataClassDictMixin):
+ """Model for a (basic) Media object from MusicBrainz."""
+
+ position: int
+ format: str
+ track: list[MusicBrainzTrack]
+ track_count: int
+ track_offset: int
+
+
+@dataclass
+class MusicBrainzRelease(DataClassDictMixin):
+ """Model for a (basic) Release object from MusicBrainz."""
+
+ id: str
+ status_id: str
+ count: int
+ title: str
+ status: str
+ artist_credit: list[MusicBrainzArtistCredit]
+ release_group: MusicBrainzReleaseGroup
+ track_count: int
+
+ # optional fields
+ media: list[MusicBrainzMedia] = field(default_factory=list)
+ date: str | None = None
+ country: str | None = None
+ disambiguation: str | None = None # version
+ # TODO (if needed): release-events
+
+
+@dataclass
+class MusicBrainzRecording(DataClassDictMixin):
+ """Model for a (basic) Recording object as received from the MusicBrainz API."""
+
+ id: str
+ title: str
+ length: int | None
+ first_release_date: str | None
+ artist_credit: list[MusicBrainzArtistCredit]
+ # optional fields
+ isrcs: list[str] | None = None
+ tags: list[MusicBrainzTag] | None = None
+ disambiguation: str | None = None # version (e.g. live, karaoke etc.)
+
+
class MusicbrainzProvider(MetadataProvider):
"""The Musicbrainz Metadata provider."""
self, artist: Artist, ref_albums: Iterable[Album], ref_tracks: Iterable[Track]
) -> str | None:
"""Discover MusicBrainzArtistId for an artist given some reference albums/tracks."""
- for ref_album in ref_albums:
- # try matching on album musicbrainz id
- if ref_album.mbid: # noqa: SIM102
- if mbid := await self._search_artist_by_album_mbid(
- artistname=artist.name, album_mbid=ref_album.mbid
- ):
- return mbid
- # try matching on album barcode
- for provider_mapping in ref_album.provider_mappings:
- if not provider_mapping.barcode:
- continue
- if mbid := await self._search_artist_by_album(
- artistname=artist.name,
- album_barcode=provider_mapping.barcode,
- ):
- return mbid
-
- # try again with matching on track isrc
+ if artist.mbid:
+ return artist.mbid
+ # try with (strict) ref track(s), using recording id or isrc
for ref_track in ref_tracks:
- for provider_mapping in ref_track.provider_mappings:
- if not provider_mapping.isrc:
- continue
- if mbid := await self._search_artist_by_track(
- artistname=artist.name,
- track_isrc=provider_mapping.isrc,
- ):
- return mbid
-
+ if mb_artist := await self.get_artist_details_by_track(artist.name, ref_track):
+ return mb_artist.id
+ # try with (strict) ref album(s), using releasegroup id or barcode
+ for ref_album in ref_albums:
+ if mb_artist := await self.get_artist_details_by_album(artist.name, ref_album):
+ return mb_artist.id
# last restort: track matching by name
for ref_track in ref_tracks:
- if mbid := await self._search_artist_by_track(
+ if result := await self.search(
artistname=artist.name,
+ albumname=ref_track.album.name,
trackname=ref_track.name,
+ trackversion=ref_track.version,
):
- return mbid
-
+ return result[0].id
return None
- async def _search_artist_by_album(
- self,
- artistname: str,
- albumname: str | None = None,
- album_barcode: str | None = None,
- ) -> str | None:
- """Retrieve musicbrainz artist id by providing the artist name and albumname or barcode."""
- if not (albumname or album_barcode):
- return None # may not happen, but guard just in case
- for searchartist in (
- artistname,
- re.sub(LUCENE_SPECIAL, r"\\\1", artistname),
- create_sort_name(artistname),
- ):
- if album_barcode:
- # search by album barcode (EAN or UPC)
- query = f"barcode:{album_barcode}"
- elif albumname:
- # search by name
- searchalbum = re.sub(LUCENE_SPECIAL, r"\\\1", albumname)
- query = f'artist:"{searchartist}" AND release:"{searchalbum}"'
- result = await self.get_data("release", query=query)
- if result and "releases" in result:
- for strict in (True, False):
- for item in result["releases"]:
- if not (
- album_barcode
- or (albumname and compare_strings(item["title"], albumname, strict))
- ):
- continue
- for artist in item["artist-credit"]:
- if compare_strings(artist["artist"]["name"], artistname, strict):
- return artist["artist"]["id"] # type: ignore[no-any-return]
- for alias in artist.get("aliases", []):
- if compare_strings(alias["name"], artistname, strict):
- return artist["id"] # type: ignore[no-any-return]
- return None
+ async def search(
+ self, artistname: str, albumname: str, trackname: str, trackversion: str | None = None
+ ) -> tuple[MusicBrainzArtist, MusicBrainzReleaseGroup, MusicBrainzRecording] | None:
+ """
+ Search MusicBrainz details by providing the artist, album and track name.
- async def _search_artist_by_track(
- self,
- artistname: str,
- trackname: str | None = None,
- track_isrc: str | None = None,
- ) -> str | None:
- """Retrieve artist id by providing the artist name and trackname or track isrc."""
- if not (trackname or track_isrc):
- return None # may not happen, but guard just in case
+ NOTE: The MusicBrainz objects returned are simplified objects without the optional data.
+ """
+ trackname, trackversion = parse_title_and_version(trackname, trackversion)
searchartist = re.sub(LUCENE_SPECIAL, r"\\\1", artistname)
- if track_isrc:
- result = await self.get_data(f"isrc/{track_isrc}", inc="artist-credits")
- elif trackname:
- searchtrack = re.sub(LUCENE_SPECIAL, r"\\\1", trackname)
- result = await self.get_data(
- "recording", query=f'"{searchtrack}" AND artist:"{searchartist}"'
- )
- if result and "recordings" in result:
- for strict in (True, False):
+ searchalbum = re.sub(LUCENE_SPECIAL, r"\\\1", albumname)
+ searchtracks: list[str] = []
+ if trackversion:
+ searchtracks.append(f"{trackname} ({trackversion})")
+ searchtracks.append(trackname)
+ # the version is sometimes appended to the title and sometimes stored
+ # in disambiguation, so we try both
+ for strict in (True, False):
+ for searchtrack in searchtracks:
+ searchstr = re.sub(LUCENE_SPECIAL, r"\\\1", searchtrack)
+ result = await self.get_data(
+ "recording",
+ query=f'"{searchstr}" AND artist:"{searchartist}" AND release:"{searchalbum}"',
+ )
+ if not result or "recordings" not in result:
+ continue
for item in result["recordings"]:
- if not (
- track_isrc
- or (trackname and compare_strings(item["title"], trackname, strict))
+ # compare track title
+ if not compare_strings(item["title"], searchtrack, strict):
+ continue
+ # compare track version if needed
+ if (
+ trackversion
+ and trackversion not in searchtrack
+ and not compare_strings(item.get("disambiguation"), trackversion, strict)
):
continue
+ # match (primary) track artist
+ artist_match: MusicBrainzArtist | None = None
for artist in item["artist-credit"]:
if compare_strings(artist["artist"]["name"], artistname, strict):
- return artist["artist"]["id"] # type: ignore[no-any-return]
- for alias in artist["artist"].get("aliases", []):
- if compare_strings(alias["name"], artistname, strict):
- return artist["artist"]["id"] # type: ignore[no-any-return]
+ artist_match = MusicBrainzArtist.from_dict(
+ replace_hyphens(artist["artist"])
+ )
+ else:
+ for alias in artist["artist"].get("aliases", []):
+ if compare_strings(alias["name"], artistname, strict):
+ artist_match = MusicBrainzArtist.from_dict(
+ replace_hyphens(artist["artist"])
+ )
+ if not artist_match:
+ continue
+ # match album/release
+ album_match: MusicBrainzReleaseGroup | None = None
+ for release in item["releases"]:
+ if compare_strings(release["title"], albumname, strict) or compare_strings(
+ release["release-group"]["title"], albumname, strict
+ ):
+ album_match = MusicBrainzReleaseGroup.from_dict(
+ replace_hyphens(release["release-group"])
+ )
+ break
+ else:
+ continue
+ # if we reach this point, we got a match on recording,
+ # artist and release(group)
+ recording = MusicBrainzRecording.from_dict(replace_hyphens(item))
+ return (artist_match, album_match, recording)
+
return None
- async def _search_artist_by_album_mbid(self, artistname: str, album_mbid: str) -> str | None:
- """Retrieve musicbrainz artist id by providing the artist name or album id."""
- result = await self.get_data(f"release-group/{album_mbid}?inc=artist-credits")
- if result and "artist-credit" in result:
- for item in result["artist-credit"]:
- if (artist := item.get("artist")) and compare_strings(artistname, artist["name"]):
- return artist["id"] # type: ignore[no-any-return]
+ async def get_artist_details(self, artist_id: str) -> MusicBrainzArtist:
+ """Get (full) Artist details by providing a MusicBrainz artist id."""
+ endpoint = (
+ f"artist/{artist_id}?inc=aliases+annotation+tags+ratings+genres+url-rels+work-rels"
+ )
+ if result := await self.get_data(endpoint):
+ # TODO: Parse all the optional data like relations and such
+ return MusicBrainzArtist.from_dict(replace_hyphens(result))
+ raise InvalidDataError("Invalid MusicBrainz Artist ID provided")
+
+ async def get_recording_details(
+ self, recording_id: str | None = None, isrsc: str | None = None
+ ) -> MusicBrainzRecording:
+ """Get Recording details by providing a MusicBrainz recording id OR isrc."""
+ assert recording_id or isrsc, "Provider either Recording ID or ISRC"
+ if not recording_id:
+ # lookup recording id first by isrc
+ if (result := await self.get_data(f"isrc/{isrsc}")) and result.get("recordings"):
+ recording_id = result["recordings"][0]["id"]
+ else:
+ raise InvalidDataError("Invalid ISRC provided")
+ if result := await self.get_data(f"recording/{recording_id}?inc=artists+releases"):
+ return MusicBrainzRecording.from_dict(replace_hyphens(result))
+ raise InvalidDataError("Invalid ISRC provided")
+
+ async def get_releasegroup_details(
+ self, releasegroup_id: str | None = None, barcode: str | None = None
+ ) -> MusicBrainzReleaseGroup:
+ """Get ReleaseGroup details by providing a MusicBrainz ReleaseGroup id OR barcode."""
+ assert releasegroup_id or barcode, "Provider either ReleaseGroup ID or barcode"
+ if not releasegroup_id:
+ # lookup releasegroup id first by barcode
+ endpoint = f"release?query=barcode:{barcode}"
+ if (result := await self.get_data(endpoint)) and result.get("releases"):
+ releasegroup_id = result["releases"][0]["release-group"]["id"]
+ else:
+ raise InvalidDataError("Invalid barcode provided")
+ endpoint = f"release-group/{releasegroup_id}?inc=artists+aliases"
+ if result := await self.get_data(endpoint):
+ return MusicBrainzReleaseGroup.from_dict(replace_hyphens(result))
+ raise InvalidDataError("Invalid MusicBrainz ReleaseGroup ID or barcode provided")
+
+ async def get_artist_details_by_album(
+ self, artistname: str, ref_album: Album
+ ) -> MusicBrainzArtist | None:
+ """
+ Get musicbrainz artist details by providing the artist name and a reference album.
+
+ MusicBrainzArtist object that is returned does not contain the optional data.
+ """
+ barcodes = [x[1] for x in ref_album.external_ids if x[0] == ExternalID.BARCODE]
+ if not (ref_album.mbid or barcodes):
+ return None
+ for barcode in barcodes:
+ result = None
+ with suppress(InvalidDataError):
+ result = await self.get_releasegroup_details(ref_album.mbid, barcode)
+ if not (result and result.artist_credit):
+ return None
+ for strict in (True, False):
+ for artist_credit in result.artist_credit:
+ if compare_strings(artist_credit.artist.name, artistname, strict):
+ return artist_credit.artist
+ for alias in artist_credit.artist.aliases or []:
+ if compare_strings(alias.name, artistname, strict):
+ return artist_credit.artist
+ return None
+
+ async def get_artist_details_by_track(
+ self, artistname: str, ref_track: Track
+ ) -> MusicBrainzArtist | None:
+ """
+ Get musicbrainz artist details by providing the artist name and a reference track.
+
+ MusicBrainzArtist object that is returned does not contain the optional data.
+ """
+ isrcs = [x[1] for x in ref_track.external_ids if x[0] == ExternalID.ISRC]
+ if not (ref_track.mbid or isrcs):
+ return None
+ for isrc in isrcs:
+ result = None
+ with suppress(InvalidDataError):
+ result = await self.get_recording_details(ref_track.mbid, isrc)
+ if not (result and result.artist_credit):
+ return None
+ for strict in (True, False):
+ for artist_credit in result.artist_credit:
+ if compare_strings(artist_credit.artist.name, artistname, strict):
+ return artist_credit.artist
+ for alias in artist_credit.artist.aliases or []:
+ if compare_strings(alias.name, artistname, strict):
+ return artist_credit.artist
return None
@use_cache(86400 * 30)
async def get_data(self, endpoint: str, **kwargs: dict[str, Any]) -> Any:
"""Get data from api."""
url = f"http://musicbrainz.org/ws/2/{endpoint}"
- headers = {"User-Agent": "Music Assistant/1.0.0 https://github.com/music-assistant"}
+ headers = {
+ "User-Agent": f"Music Assistant/{self.mass.version} ( https://github.com/music-assistant )" # noqa: E501
+ }
kwargs["fmt"] = "json" # type: ignore[assignment]
async with self.throttler, self.mass.http_session.get(
url, headers=headers, params=kwargs, ssl=False
from music_assistant.common.helpers.util import parse_title_and_version, try_parse_int
from music_assistant.common.models.config_entries import ConfigEntry, ConfigValueType
-from music_assistant.common.models.enums import ConfigEntryType, ProviderFeature
+from music_assistant.common.models.enums import ConfigEntryType, ExternalID, ProviderFeature
from music_assistant.common.models.errors import LoginFailed, MediaNotFoundError
from music_assistant.common.models.media_items import (
Album,
provider_domain=self.domain,
provider_instance=self.instance_id,
available=album_obj["streamable"] and album_obj["displayable"],
- barcode=album_obj["upc"],
audio_format=AudioFormat(
content_type=ContentType.FLAC,
sample_rate=album_obj["maximum_sampling_rate"] * 1000,
)
},
)
+ album.external_ids.add((ExternalID.BARCODE, album_obj["upc"]))
album.artists.append(await self._parse_artist(artist_obj or album_obj["artist"]))
if (
album_obj.get("product_type", "") == "single"
bit_depth=track_obj["maximum_bit_depth"],
),
url=track_obj.get("url", f'https://open.qobuz.com/track/{track_obj["id"]}'),
- isrc=track_obj.get("isrc"),
)
},
**extra_init_kwargs,
)
+ if isrc := track_obj.get("isrc"):
+ track.external_ids.add((ExternalID.ISRC, isrc))
if track_obj.get("performer") and "Various " not in track_obj["performer"]:
artist = await self._parse_artist(track_obj["performer"])
if artist:
track.metadata.performers = {x.strip() for x in track_obj["performers"].split("-")}
if track_obj.get("copyright"):
track.metadata.copyright = track_obj["copyright"]
- if track_obj.get("audio_info"):
- track.metadata.replaygain = track_obj["audio_info"]["replaygain_track_gain"]
if track_obj.get("parental_warning"):
track.metadata.explicit = True
if img := self.__get_image(track_obj):
jump = int(number.split("+")[1])
self.mass.create_task(self.mass.player_queues.skip, player_queue.queue_id, jump)
else:
- self.mass.create_task(self.mass.player_queues.seek, player_queue.queue_id, number)
+ self.mass.create_task(self.mass.player_queues.seek, player_queue.queue_id, int(number))
def _handle_power(self, player_id: str, value: str | int, *args, **kwargs) -> int | None:
"""Handle player `time` command."""
from music_assistant.common.helpers.util import parse_title_and_version
from music_assistant.common.models.config_entries import ConfigEntry, ConfigValueType
-from music_assistant.common.models.enums import ConfigEntryType, ProviderFeature
+from music_assistant.common.models.enums import ConfigEntryType, ExternalID, ProviderFeature
from music_assistant.common.models.errors import LoginFailed, MediaNotFoundError
from music_assistant.common.models.media_items import (
Album,
async def _parse_album(self, album_obj: dict):
"""Parse spotify album object to generic layout."""
name, version = parse_title_and_version(album_obj["name"])
- barcode = None
- if "external_ids" in album_obj and album_obj["external_ids"].get("upc"):
- barcode = album_obj["external_ids"]["upc"]
- if "external_ids" in album_obj and album_obj["external_ids"].get("ean"):
- barcode = album_obj["external_ids"]["ean"]
album = Album(
item_id=album_obj["id"],
provider=self.domain,
provider_instance=self.instance_id,
audio_format=AudioFormat(content_type=ContentType.OGG, bit_rate=320),
url=album_obj["external_urls"]["spotify"],
- barcode=barcode,
)
},
)
+ if "external_ids" in album_obj and album_obj["external_ids"].get("upc"):
+ album.external_ids.add((ExternalID.BARCODE, "0" + album_obj["external_ids"]["upc"]))
+ if "external_ids" in album_obj and album_obj["external_ids"].get("ean"):
+ album.external_ids.add((ExternalID.BARCODE, album_obj["external_ids"]["ean"]))
+
for artist_obj in album_obj["artists"]:
album.artists.append(await self._parse_artist(artist_obj))
content_type=ContentType.OGG,
bit_rate=320,
),
- isrc=track_obj.get("external_ids", {}).get("isrc"),
url=track_obj["external_urls"]["spotify"],
available=not track_obj["is_local"] and track_obj["is_playable"],
)
},
**extra_init_kwargs,
)
+ if isrc := track_obj.get("external_ids", {}).get("isrc"):
+ track.external_ids.add((ExternalID.ISRC, isrc))
if artist:
track.artists.append(artist)
from music_assistant.server.models.metadata_provider import MetadataProvider
if TYPE_CHECKING:
- from collections.abc import Iterable
-
from music_assistant.common.models.config_entries import ProviderConfig
from music_assistant.common.models.provider import ProviderManifest
from music_assistant.server import MusicAssistant
ProviderFeature.ARTIST_METADATA,
ProviderFeature.ALBUM_METADATA,
ProviderFeature.TRACK_METADATA,
- ProviderFeature.GET_ARTIST_MBID,
)
IMG_MAPPING = {
async def get_artist_metadata(self, artist: Artist) -> MediaItemMetadata | None:
"""Retrieve metadata for artist on theaudiodb."""
+ if not artist.mbid:
+ # for 100% accuracy we require the musicbrainz id for all lookups
+ return None
if data := await self._get_data("artist-mb.php", i=artist.mbid): # noqa: SIM102
if data.get("artists"):
return self.__parse_artist(data["artists"][0])
async def get_album_metadata(self, album: Album) -> MediaItemMetadata | None:
"""Retrieve metadata for album on theaudiodb."""
- adb_album = None
- if album.mbid:
- result = await self._get_data("album-mb.php", i=album.mbid)
- if result and result.get("album"):
- adb_album = result["album"][0]
- elif album.artists:
- # lookup by name
- artist = album.artists[0]
- result = await self._get_data("searchalbum.php", s=artist.name, a=album.name)
- if result and result.get("album"):
- for item in result["album"]:
- assert isinstance(artist, Artist)
- if artist.mbid:
- if artist.mbid != item["strMusicBrainzArtistID"]:
- continue
- elif not compare_strings(artist.name, item["strArtistStripped"]):
- continue
- if compare_strings(album.name, item["strAlbumStripped"]):
- adb_album = item
- break
- if adb_album:
+ if not album.mbid:
+ # for 100% accuracy we require the musicbrainz id for all lookups
+ return None
+ result = await self._get_data("album-mb.php", i=album.mbid)
+ if result and result.get("album"):
+ adb_album = result["album"][0]
+ # fill in some missing album info if needed
if not album.year:
album.year = int(adb_album.get("intYearReleased", "0"))
- if not album.mbid:
- album.mbid = adb_album["strMusicBrainzID"]
- assert isinstance(album.artists[0], Artist)
if album.artists and not album.artists[0].mbid:
album.artists[0].mbid = adb_album["strMusicBrainzArtistID"]
if album.album_type == AlbumType.UNKNOWN:
result = await self._get_data("track-mb.php", i=track.mbid)
if result and result.get("track"):
return self.__parse_track(result["track"][0])
-
- # lookup by name
+ # if there was no match on mbid, there will certainly be no match by name
+ return None
+ # fallback if no musicbrainzid: lookup by name
for track_artist in track.artists:
assert isinstance(track_artist, Artist)
# make sure to include the version in the track name
- search_name = track.name
- if track.version:
- search_name += f" {track.version}"
- result = await self._get_data("searchtrack.php?", s=track_artist.name, t=search_name)
+ track_name = f"{track.name} {track.version}" if track.version else track.name
+ result = await self._get_data("searchtrack.php?", s=track_artist.name, t=track_name)
if result and result.get("track"):
for item in result["track"]:
- if track_artist.mbid:
- if track_artist.mbid != item["strMusicBrainzArtistID"]:
- continue
- elif not compare_strings(track_artist.name, item["strArtist"]):
+ # some safety checks
+ if track_artist.mbid and track_artist.mbid != item["strMusicBrainzArtistID"]:
continue
- if compare_strings(track.name, item["strTrack"]):
- adb_track = item
- break
- if adb_track:
- if not track.mbid:
- track.mbid = adb_track["strMusicBrainzID"]
- assert isinstance(track.album, Album)
- if track.album and not track.album.mbid:
- track.album.mbid = adb_track["strMusicBrainzAlbumID"]
- if not track_artist.mbid:
- track_artist.mbid = adb_track["strMusicBrainzArtistID"]
-
- return self.__parse_track(adb_track)
- return None
-
- async def get_musicbrainz_artist_id(
- self,
- artist: Artist,
- ref_albums: Iterable[Album],
- ref_tracks: Iterable[Track], # noqa: ARG002
- ) -> str | None:
- """Discover MusicBrainzArtistId for an artist given some reference albums/tracks."""
- mbid = None
- if data := await self._get_data("searchalbum.php", s=artist.name):
- # NOTE: object is 'null' when no records found instead of empty array
- albums = data.get("album") or []
- for item in albums:
- if not compare_strings(item["strArtistStripped"], artist.name):
- continue
- for ref_album in ref_albums:
- if not compare_strings(item["strAlbumStripped"], ref_album.name):
+ if track.album.mbid and track.album.mbid != item["strMusicBrainzAlbumID"]:
continue
- # found match - update album metadata too while we're here
- if ref_album.provider == "library" and not ref_album.mbid:
- ref_album.metadata = self.__parse_album(item)
- await self.mass.music.albums.update_item_in_library(
- ref_album.item_id, ref_album
- )
- mbid = item["strMusicBrainzArtistID"]
-
- return mbid
+ if not compare_strings(track_artist.name, item["strArtist"]):
+ continue
+ if compare_strings(track_name, item["strTrack"]):
+ return self.__parse_track(adb_track)
+ return None
def __parse_artist(self, artist_obj: dict[str, Any]) -> MediaItemMetadata:
"""Parse audiodb artist object to MediaItemMetadata."""
from music_assistant.common.models.enums import (
AlbumType,
ConfigEntryType,
+ ExternalID,
ImageType,
MediaType,
ProviderFeature,
content_type=ContentType.FLAC,
bit_depth=24 if self._is_hi_res(track_obj=track_obj) else 16,
),
- isrc=track_obj.isrc,
url=f"http://www.tidal.com/tracks/{track_id}",
available=track_obj.available,
)
},
**extra_init_kwargs,
)
+ if track_obj.isrc:
+ track.external_ids.add((ExternalID.ISRC, track_obj.isrc))
# Here we use an ItemMapping as Tidal return minimal data when getting an Album from a Track
track.album = self.get_item_mapping(
media_type=MediaType.ALBUM,
assert _tags.musicbrainz_albumartistids == ("abcdefg",)
assert _tags.musicbrainz_artistids == ("abcdefg",)
assert _tags.musicbrainz_releasegroupid == "abcdefg"
- assert _tags.musicbrainz_trackid == "abcdefg"
+ assert _tags.musicbrainz_recordingid == "abcdefg"
# test parsing disc/track number
_tags.tags["disc"] = "1"
assert _tags.disc == 1
assert _tags.musicbrainz_albumartistids == tuple()
assert _tags.musicbrainz_artistids == tuple()
assert _tags.musicbrainz_releasegroupid is None
- assert _tags.musicbrainz_trackid is None
+ assert _tags.musicbrainz_recordingid is None
async def test_parse_metadata_from_invalid_filename():
assert _tags.musicbrainz_albumartistids == tuple()
assert _tags.musicbrainz_artistids == tuple()
assert _tags.musicbrainz_releasegroupid is None
- assert _tags.musicbrainz_trackid is None
+ assert _tags.musicbrainz_recordingid is None