from __future__ import annotations
-import asyncio
import contextlib
from collections.abc import Iterable
from random import choice, random
from music_assistant.common.helpers.global_cache import get_global_cache_value
from music_assistant.common.helpers.json import serialize_to_json
-from music_assistant.common.models.enums import EventType, ProviderFeature
+from music_assistant.common.models.enums import ProviderFeature
from music_assistant.common.models.errors import (
InvalidDataError,
MediaNotFoundError,
- MusicAssistantError,
UnsupportedFeaturedException,
)
from music_assistant.common.models.media_items import (
"""Initialize class."""
super().__init__(*args, **kwargs)
self.base_query = f"""
- SELECT
+ SELECT DISTINCT
{self.db_table}.*
FROM {self.db_table}
LEFT JOIN {DB_TABLE_ALBUM_ARTISTS} on {DB_TABLE_ALBUM_ARTISTS}.album_id = {self.db_table}.item_id
LEFT JOIN {DB_TABLE_ARTISTS} on {DB_TABLE_ARTISTS}.item_id = {DB_TABLE_ALBUM_ARTISTS}.artist_id
""" # noqa: E501
- self._db_add_lock = asyncio.Lock()
- # register api handlers
- self.mass.register_api_command("music/albums/library_items", self.library_items)
- self.mass.register_api_command(
- "music/albums/update_item_in_library", self.update_item_in_library
- )
- self.mass.register_api_command(
- "music/albums/remove_item_from_library", self.remove_item_from_library
- )
- self.mass.register_api_command("music/albums/get_album", self.get)
- self.mass.register_api_command("music/albums/album_tracks", self.tracks)
- self.mass.register_api_command("music/albums/album_versions", self.versions)
+ # register (extra) api handlers
+ api_base = self.api_base
+ self.mass.register_api_command(f"music/{api_base}/album_tracks", self.tracks)
+ self.mass.register_api_command(f"music/{api_base}/album_versions", self.versions)
async def get(
self,
if not isinstance(artist, ItemMapping):
album_artists.append(artist)
continue
- try:
+ with contextlib.suppress(MediaNotFoundError):
album_artists.append(
await self.mass.music.artists.get(
artist.item_id,
artist.provider,
lazy=lazy,
- add_to_library=False, # TODO: make this configurable
+ details=artist,
+ add_to_library=False,
)
)
- except MusicAssistantError as err:
- # edge case where playlist track has invalid artistdetails
- self.logger.warning("Unable to fetch artist details %s - %s", artist.uri, str(err))
album.artists = album_artists
if not force_refresh:
return album
== prov_album_track.metadata.cache_checksum
):
continue
- await self.mass.music.tracks.update_item_in_library(
+ await self.mass.music.tracks._update_library_item(
prov_album_track.item_id, prov_track, True
)
break
return album
- async def add_item_to_library(
- self,
- item: Album,
- metadata_lookup: bool = True,
- overwrite_existing: bool = False,
- add_album_tracks: bool = False,
- ) -> Album:
- """Add album to library and return the database item."""
- if not isinstance(item, Album):
- msg = "Not a valid Album object (ItemMapping can not be added to db)"
- raise InvalidDataError(msg)
- if not item.provider_mappings:
- msg = "Album is missing provider mapping(s)"
- raise InvalidDataError(msg)
- # grab additional metadata
- if metadata_lookup:
- await self.mass.metadata.get_album_metadata(item)
- # check for existing item first
- library_item = None
- if cur_item := await self.get_library_item_by_prov_id(item.item_id, item.provider):
- # existing item match by provider id
- library_item = await self.update_item_in_library(
- cur_item.item_id, item, overwrite=overwrite_existing
- )
- elif cur_item := await self.get_library_item_by_external_ids(item.external_ids):
- # existing item match by external id
- library_item = await self.update_item_in_library(
- cur_item.item_id, item, overwrite=overwrite_existing
- )
- else:
- # search by (exact) name match
- query = f"WHERE {self.db_table}.name = :name OR {self.db_table}.sort_name = :sort_name"
- query_params = {"name": item.name, "sort_name": item.sort_name}
- async for db_item in self.iter_library_items(
- extra_query=query, extra_query_params=query_params
- ):
- if compare_album(db_item, item):
- # existing item found: update it
- library_item = await self.update_item_in_library(
- db_item.item_id, item, overwrite=overwrite_existing
- )
- break
- if not library_item:
- # actually add a new item in the library db
- # use the lock to prevent a race condition of the same item being added twice
- async with self._db_add_lock:
- library_item = await self._add_library_item(item)
- # also fetch the same album on all providers
- if metadata_lookup:
- await self._match(library_item)
- library_item = await self.get_library_item(library_item.item_id)
- # also add album tracks
- # TODO: make this configurable
- if add_album_tracks and item.provider != "library":
- async with asyncio.TaskGroup() as tg:
- for track in await self._get_provider_album_tracks(item.item_id, item.provider):
- track.album = library_item
- tg.create_task(
- self.mass.music.tracks.add_item_to_library(track, metadata_lookup=False)
- )
- self.mass.signal_event(
- EventType.MEDIA_ITEM_ADDED,
- library_item.uri,
- library_item,
- )
- return library_item
-
- async def update_item_in_library(
- self, item_id: str | int, update: Album, overwrite: bool = False
- ) -> Album:
- """Update existing record in the database."""
- db_id = int(item_id) # ensure integer
- cur_item = await self.get_library_item(db_id)
- metadata = update.metadata if overwrite else cur_item.metadata.update(update.metadata)
- if getattr(update, "album_type", AlbumType.UNKNOWN) != AlbumType.UNKNOWN:
- album_type = update.album_type
- else:
- album_type = cur_item.album_type
- cur_item.external_ids.update(update.external_ids)
- provider_mappings = (
- update.provider_mappings
- if overwrite
- else {*cur_item.provider_mappings, *update.provider_mappings}
- )
- await self.mass.music.database.update(
- self.db_table,
- {"item_id": db_id},
- {
- "name": update.name if overwrite else cur_item.name,
- "sort_name": update.sort_name
- if overwrite
- else cur_item.sort_name or update.sort_name,
- "version": update.version if overwrite else cur_item.version,
- "year": update.year if overwrite else cur_item.year or update.year,
- "album_type": album_type.value,
- "metadata": serialize_to_json(metadata),
- "external_ids": serialize_to_json(
- update.external_ids if overwrite else cur_item.external_ids
- ),
- },
- )
- self.logger.debug("updated %s in database: %s", update.name, db_id)
- # update/set provider_mappings table
- await self._set_provider_mappings(db_id, provider_mappings, overwrite)
- # set album artist(s)
- artists = update.artists if overwrite else cur_item.artists + update.artists
- await self._set_album_artists(db_id, artists, overwrite=overwrite)
- # get full created object
- library_item = await self.get_library_item(db_id)
- self.mass.signal_event(
- EventType.MEDIA_ITEM_UPDATED,
- library_item.uri,
- library_item,
- )
- # return the full item we just updated
- return library_item
-
async def remove_item_from_library(self, item_id: str | int) -> None:
"""Delete record from the database."""
db_id = int(item_id) # ensure integer
item_id: str,
provider_instance_id_or_domain: str,
in_library_only: bool = False,
- ) -> list[AlbumTrack]:
+ ) -> UniqueList[AlbumTrack]:
"""Return album tracks for the given provider album id."""
full_album = await self.get(item_id, provider_instance_id_or_domain)
db_items = (
if full_album.provider == "library"
else []
)
+ # return all (unique) items from all providers
+ result: UniqueList[AlbumTrack] = UniqueList(db_items)
if full_album.provider == "library" and in_library_only:
# return in-library items only
return sorted(db_items, key=lambda x: (x.disc_number, x.track_number))
- # return all (unique) items from all providers
- result: list[AlbumTrack] = [*db_items]
unique_ids: set[str] = {f"{x.disc_number or 1}.{x.track_number}" for x in db_items}
for provider_mapping in full_album.provider_mappings:
provider_tracks = await self._get_provider_album_tracks(
self,
item_id: str,
provider_instance_id_or_domain: str,
- ) -> list[Album]:
+ ) -> UniqueList[Album]:
"""Return all versions of an album we can find on all providers."""
album = await self.get(item_id, provider_instance_id_or_domain, add_to_library=False)
search_query = f"{album.artists[0].name} - {album.name}" if album.artists else album.name
- result: list[Album] = []
+ result: UniqueList[Album] = UniqueList()
for provider_id in self.mass.music.get_unique_providers():
provider = self.mass.get_provider(provider_id)
if not provider:
continue
if not provider.library_supported(MediaType.ALBUM):
continue
- result += [
+ result.extend(
prov_item
for prov_item in await self.search(search_query, provider_id)
if loose_compare_strings(album.name, prov_item.name)
and compare_artists(prov_item.artists, album.artists, any_match=True)
# make sure that the 'base' version is NOT included
and not album.provider_mappings.intersection(prov_item.provider_mappings)
- ]
+ )
return result
async def get_library_album_tracks(
return cast(list[AlbumTrack], result)
return result
- async def _add_library_item(self, item: Album) -> Album:
+ async def _add_library_item(self, item: Album) -> int:
"""Add a new record to the database."""
+ if not isinstance(item, Album):
+ msg = "Not a valid Album object (ItemMapping can not be added to db)"
+ raise InvalidDataError(msg)
+ if not item.artists:
+ msg = "Album is missing artist(s)"
+ raise InvalidDataError(msg)
new_item = await self.mass.music.database.insert(
self.db_table,
{
await self._set_provider_mappings(db_id, item.provider_mappings)
# set track artist(s)
await self._set_album_artists(db_id, item.artists)
- self.logger.debug("added %s to database (item id %s)", item.name, db_id)
- # return the full item we just added
- return await self.get_library_item(db_id)
+ self.logger.debug("added %s to database (id: %s)", item.name, db_id)
+ return db_id
+
+ async def _update_library_item(
+ self, item_id: str | int, update: Album, overwrite: bool = False
+ ) -> None:
+ """Update existing record in the database."""
+ db_id = int(item_id) # ensure integer
+ cur_item = await self.get_library_item(db_id)
+ metadata = update.metadata if overwrite else cur_item.metadata.update(update.metadata)
+ if getattr(update, "album_type", AlbumType.UNKNOWN) != AlbumType.UNKNOWN:
+ album_type = update.album_type
+ else:
+ album_type = cur_item.album_type
+ cur_item.external_ids.update(update.external_ids)
+ provider_mappings = (
+ update.provider_mappings
+ if overwrite
+ else {*cur_item.provider_mappings, *update.provider_mappings}
+ )
+ await self.mass.music.database.update(
+ self.db_table,
+ {"item_id": db_id},
+ {
+ "name": update.name if overwrite else cur_item.name,
+ "sort_name": update.sort_name
+ if overwrite
+ else cur_item.sort_name or update.sort_name,
+ "version": update.version if overwrite else cur_item.version,
+ "year": update.year if overwrite else cur_item.year or update.year,
+ "album_type": album_type.value,
+ "metadata": serialize_to_json(metadata),
+ "external_ids": serialize_to_json(
+ update.external_ids if overwrite else cur_item.external_ids
+ ),
+ },
+ )
+ # update/set provider_mappings table
+ await self._set_provider_mappings(db_id, provider_mappings, overwrite)
+ # set album artist(s)
+ artists = update.artists if overwrite else cur_item.artists + update.artists
+ await self._set_album_artists(db_id, artists, overwrite=overwrite)
+ self.logger.debug("updated %s in database: (id %s)", update.name, db_id)
async def _get_provider_album_tracks(
self, item_id: str, provider_instance_id_or_domain: str
"album_id": db_id,
},
)
- artist_mappings: list[ItemMapping] = []
+ artist_mappings: UniqueList[ItemMapping] = UniqueList()
for artist in artists:
mapping = await self._set_album_artist(db_id, artist=artist, overwrite=overwrite)
artist_mappings.append(mapping)
from typing import TYPE_CHECKING, Any
from music_assistant.common.helpers.json import serialize_to_json
-from music_assistant.common.models.enums import EventType, ProviderFeature
+from music_assistant.common.models.enums import ProviderFeature
from music_assistant.common.models.errors import MediaNotFoundError, UnsupportedFeaturedException
from music_assistant.common.models.media_items import (
Album,
AlbumType,
Artist,
- ItemMapping,
MediaType,
PagedItems,
Track,
+ UniqueList,
)
from music_assistant.constants import (
DB_TABLE_ALBUM_ARTISTS,
"""Initialize class."""
super().__init__(*args, **kwargs)
self._db_add_lock = asyncio.Lock()
- # register api handlers
- self.mass.register_api_command("music/artists/library_items", self.library_items)
- self.mass.register_api_command(
- "music/artists/update_item_in_library", self.update_item_in_library
- )
- self.mass.register_api_command(
- "music/artists/remove_item_from_library", self.remove_item_from_library
- )
- self.mass.register_api_command("music/artists/get_artist", self.get)
- self.mass.register_api_command("music/artists/artist_albums", self.albums)
- self.mass.register_api_command("music/artists/artist_tracks", self.tracks)
-
- async def add_item_to_library(
- self,
- item: Artist | ItemMapping,
- metadata_lookup: bool = True,
- overwrite_existing: bool = False,
- ) -> Artist:
- """Add artist to library and return the database item."""
- if isinstance(item, ItemMapping):
- metadata_lookup = False
- item = Artist.from_item_mapping(item)
- # grab musicbrainz id and additional metadata
- if metadata_lookup:
- await self.mass.metadata.get_artist_metadata(item)
- # check for existing item first
- library_item = None
- if cur_item := await self.get_library_item_by_prov_id(item.item_id, item.provider):
- # existing item match by provider id
- library_item = await self.update_item_in_library(
- cur_item.item_id, item, overwrite=overwrite_existing
- )
- elif cur_item := await self.get_library_item_by_external_ids(item.external_ids):
- # existing item match by external id
- library_item = await self.update_item_in_library(
- cur_item.item_id, item, overwrite=overwrite_existing
- )
- else:
- # search by (exact) name match
- query = f"WHERE {self.db_table}.name = :name OR {self.db_table}.sort_name = :sort_name"
- query_params = {"name": item.name, "sort_name": item.sort_name}
- async for db_item in self.iter_library_items(
- extra_query=query, extra_query_params=query_params
- ):
- if compare_artist(db_item, item):
- # existing item found: update it
- # NOTE: if we matched an artist by name this could theoretically lead to
- # collisions but the chance is so small it is not worth the additional
- # overhead of grabbing the musicbrainz id upfront
- library_item = await self.update_item_in_library(
- db_item.item_id, item, overwrite=overwrite_existing
- )
- break
- if not library_item:
- # actually add (or update) the item in the library db
- # use the lock to prevent a race condition of the same item being added twice
- async with self._db_add_lock:
- library_item = await self._add_library_item(item)
- # also fetch same artist on all providers
- if metadata_lookup:
- await self.match_artist(library_item)
- library_item = await self.get_library_item(library_item.item_id)
- self.mass.signal_event(
- EventType.MEDIA_ITEM_ADDED,
- library_item.uri,
- library_item,
- )
- # return final library_item after all match/metadata actions
- return library_item
-
- async def update_item_in_library(
- self, item_id: str | int, update: Artist, overwrite: bool = False
- ) -> Artist:
- """Update existing record in the database."""
- db_id = int(item_id) # ensure integer
- cur_item = await self.get_library_item(db_id)
- metadata = update.metadata if overwrite else cur_item.metadata.update(update.metadata)
- cur_item.external_ids.update(update.external_ids)
- # enforce various artists name + id
- mbid = cur_item.mbid
- if (not mbid or overwrite) and getattr(update, "mbid", None):
- if compare_strings(update.name, VARIOUS_ARTISTS_NAME):
- update.mbid = VARIOUS_ARTISTS_ID_MBID
- if update.mbid == VARIOUS_ARTISTS_ID_MBID:
- update.name = VARIOUS_ARTISTS_NAME
-
- await self.mass.music.database.update(
- self.db_table,
- {"item_id": db_id},
- {
- "name": update.name if overwrite else cur_item.name,
- "sort_name": update.sort_name
- if overwrite
- else cur_item.sort_name or update.sort_name,
- "external_ids": serialize_to_json(
- update.external_ids if overwrite else cur_item.external_ids
- ),
- "metadata": serialize_to_json(metadata),
- },
- )
- self.logger.debug("updated %s in database: %s", update.name, db_id)
- # update/set provider_mappings table
- provider_mappings = (
- update.provider_mappings
- if overwrite
- else {*cur_item.provider_mappings, *update.provider_mappings}
- )
- await self._set_provider_mappings(db_id, provider_mappings, overwrite)
- # get full created object
- library_item = await self.get_library_item(db_id)
- self.mass.signal_event(
- EventType.MEDIA_ITEM_UPDATED,
- library_item.uri,
- library_item,
- )
- # return the full item we just updated
- return library_item
+ # register (extra) api handlers
+ api_base = self.api_base
+ self.mass.register_api_command(f"music/{api_base}/artist_albums", self.albums)
+ self.mass.register_api_command(f"music/{api_base}/artist_tracks", self.tracks)
async def library_items(
self,
item_id: str,
provider_instance_id_or_domain: str,
in_library_only: bool = False,
- ) -> list[Track]:
+ ) -> UniqueList[Track]:
"""Return all/top tracks for an artist."""
full_artist = await self.get(item_id, provider_instance_id_or_domain)
db_items = (
if full_artist.provider == "library"
else []
)
+ result: UniqueList[Track] = UniqueList(db_items)
if full_artist.provider == "library" and in_library_only:
# return in-library items only
- return db_items
+ return result
# return all (unique) items from all providers
- result: list[Track] = [*db_items]
unique_ids: set[str] = set()
for provider_mapping in full_artist.provider_mappings:
provider_tracks = await self.get_provider_artist_toptracks(
if db_item := await self.mass.music.tracks.get_library_item_by_prov_id(
provider_track.item_id, provider_track.provider
):
- if db_item not in db_items:
- result.append(db_item)
- elif not in_library_only and provider_track not in result:
+ result.append(db_item)
+ elif not in_library_only:
result.append(provider_track)
return result
item_id: str,
provider_instance_id_or_domain: str,
in_library_only: bool = False,
- ) -> list[Album]:
+ ) -> UniqueList[Album]:
"""Return (all/most popular) albums for an artist."""
full_artist = await self.get(item_id, provider_instance_id_or_domain)
db_items = (
if full_artist.provider == "library"
else []
)
+ result: UniqueList[Album] = UniqueList(db_items)
if full_artist.provider == "library" and in_library_only:
# return in-library items only
- return db_items
+ return result
# return all (unique) items from all providers
- result: list[Album] = [*db_items]
unique_ids: set[str] = set()
for provider_mapping in full_artist.provider_mappings:
provider_albums = await self.get_provider_artist_albums(
if db_item := await self.mass.music.albums.get_library_item_by_prov_id(
provider_album.item_id, provider_album.provider
):
- if db_item not in db_items:
- result.append(db_item)
- elif not in_library_only and provider_album not in result:
+ result.append(db_item)
+ elif not in_library_only:
result.append(provider_album)
return result
# delete the artist itself from db
await super().remove_item_from_library(db_id)
- async def match_artist(self, db_artist: Artist) -> None:
- """Try to find matching artists on all providers for the provided (database) item_id.
-
- This is used to link objects of different providers together.
- """
- assert db_artist.provider == "library", "Matching only supported for database items!"
- cur_provider_domains = {x.provider_domain for x in db_artist.provider_mappings}
- for provider in self.mass.music.providers:
- if provider.domain in cur_provider_domains:
- continue
- if ProviderFeature.SEARCH not in provider.supported_features:
- continue
- if not provider.library_supported(MediaType.ARTIST):
- continue
- if not provider.is_streaming_provider:
- # matching on unique providers is pointless as they push (all) their content to MA
- continue
- if await self._match(db_artist, provider):
- cur_provider_domains.add(provider.domain)
- else:
- self.logger.debug(
- "Could not find match for Artist %s on provider %s",
- db_artist.name,
- provider.name,
- )
-
async def get_provider_artist_toptracks(
self,
item_id: str,
query = f"WHERE {DB_TABLE_ALBUM_ARTISTS}.artist_id = {item_id}"
return await self.mass.music.albums._get_library_items_by_query(extra_query=query)
- async def _add_library_item(self, item: Artist) -> Artist:
+ async def _add_library_item(self, item: Artist) -> int:
"""Add a new item record to the database."""
# enforce various artists name + id
if compare_strings(item.name, VARIOUS_ARTISTS_NAME):
db_id = new_item["item_id"]
# update/set provider_mappings table
await self._set_provider_mappings(db_id, item.provider_mappings)
- self.logger.debug("added %s to database", item.name)
- # return the full item we just added
- return await self.get_library_item(db_id)
+ self.logger.debug("added %s to database (id: %s)", item.name, db_id)
+ return db_id
+
+ async def _update_library_item(
+ self, item_id: str | int, update: Artist, overwrite: bool = False
+ ) -> None:
+ """Update existing record in the database."""
+ db_id = int(item_id) # ensure integer
+ cur_item = await self.get_library_item(db_id)
+ metadata = update.metadata if overwrite else cur_item.metadata.update(update.metadata)
+ cur_item.external_ids.update(update.external_ids)
+ # enforce various artists name + id
+ mbid = cur_item.mbid
+ if (not mbid or overwrite) and getattr(update, "mbid", None):
+ if compare_strings(update.name, VARIOUS_ARTISTS_NAME):
+ update.mbid = VARIOUS_ARTISTS_ID_MBID
+ if update.mbid == VARIOUS_ARTISTS_ID_MBID:
+ update.name = VARIOUS_ARTISTS_NAME
+
+ await self.mass.music.database.update(
+ self.db_table,
+ {"item_id": db_id},
+ {
+ "name": update.name if overwrite else cur_item.name,
+ "sort_name": update.sort_name
+ if overwrite
+ else cur_item.sort_name or update.sort_name,
+ "external_ids": serialize_to_json(
+ update.external_ids if overwrite else cur_item.external_ids
+ ),
+ "metadata": serialize_to_json(metadata),
+ },
+ )
+ self.logger.debug("updated %s in database: %s", update.name, db_id)
+ # update/set provider_mappings table
+ provider_mappings = (
+ update.provider_mappings
+ if overwrite
+ else {*cur_item.provider_mappings, *update.provider_mappings}
+ )
+ await self._set_provider_mappings(db_id, provider_mappings, overwrite)
+ self.logger.debug("updated %s in database: (id %s)", update.name, db_id)
async def _get_provider_dynamic_tracks(
self,
msg = "No Music Provider found that supports requesting similar tracks."
raise UnsupportedFeaturedException(msg)
- async def _match(self, db_artist: Artist, provider: MusicProvider) -> bool:
+ async def _match(self, db_artist: Artist) -> None:
+ """Try to find matching artists on all providers for the provided (database) item_id.
+
+ This is used to link objects of different providers together.
+ """
+ assert db_artist.provider == "library", "Matching only supported for database items!"
+ cur_provider_domains = {x.provider_domain for x in db_artist.provider_mappings}
+ for provider in self.mass.music.providers:
+ if provider.domain in cur_provider_domains:
+ continue
+ if ProviderFeature.SEARCH not in provider.supported_features:
+ continue
+ if not provider.library_supported(MediaType.ARTIST):
+ continue
+ if not provider.is_streaming_provider:
+ # matching on unique providers is pointless as they push (all) their content to MA
+ continue
+ if await self._match_provider(db_artist, provider):
+ cur_provider_domains.add(provider.domain)
+ else:
+ self.logger.debug(
+ "Could not find match for Artist %s on provider %s",
+ db_artist.name,
+ provider.name,
+ )
+
+ async def _match_provider(self, db_artist: Artist, provider: MusicProvider) -> bool:
"""Try to find matching artists on given provider for the provided (database) artist."""
self.logger.debug("Trying to match artist %s on provider %s", db_artist.name, provider.name)
# try to get a match with some reference tracks of this artist
if not compare_strings(search_result_item.name, ref_album.name):
continue
# artist must match 100%
- if not compare_artist(
- db_artist, search_result_item.artists[0], allow_name_match=True
- ):
+ if not compare_artist(db_artist, search_result_item.artists[0]):
continue
# 100% match
# get full artist details so we have all metadata
search_result_item.artists[0].provider,
fallback=search_result_item,
)
- await self.update_item_in_library(db_artist.item_id, prov_artist)
+ await self._update_library_item(db_artist.item_id, prov_artist)
return True
return False
from __future__ import annotations
+import asyncio
import logging
from abc import ABCMeta, abstractmethod
from collections.abc import Iterable
from music_assistant.common.helpers.json import json_loads, serialize_to_json
from music_assistant.common.models.enums import EventType, ExternalID, MediaType, ProviderFeature
-from music_assistant.common.models.errors import MediaNotFoundError, ProviderUnavailableError
+from music_assistant.common.models.errors import (
+ InvalidDataError,
+ MediaNotFoundError,
+ ProviderUnavailableError,
+)
from music_assistant.common.models.media_items import (
Album,
ItemMapping,
DB_TABLE_PROVIDER_MAPPINGS,
MASS_LOGGER_NAME,
)
+from music_assistant.server.helpers.compare import compare_media_item
if TYPE_CHECKING:
from collections.abc import AsyncGenerator, Mapping
self.mass = mass
self.base_query = f"SELECT * FROM {self.db_table}"
self.logger = logging.getLogger(f"{MASS_LOGGER_NAME}.music.{self.media_type.value}")
+ # register (base) api handlers
+ self.api_base = api_base = f"{self.media_type}s"
+ self.mass.register_api_command(f"music/{api_base}/library_items", self.library_items)
+ self.mass.register_api_command(f"music/{api_base}/get", self.get)
+ self.mass.register_api_command(f"music/{api_base}/get_{self.media_type}", self.get)
+ self.mass.register_api_command(f"music/{api_base}/add", self.add_item_to_library)
+ self.mass.register_api_command(f"music/{api_base}/update", self.update_item_in_library)
+ self.mass.register_api_command(f"music/{api_base}/remove", self.remove_item_from_library)
+ self._db_add_lock = asyncio.Lock()
- @abstractmethod
async def add_item_to_library(
- self, item: ItemCls, metadata_lookup: bool = True, overwrite_existing: bool = False
+ self, item: Track, metadata_lookup: bool = True, overwrite_existing: bool = False
) -> ItemCls:
- """Add item to library and return the database item."""
- raise NotImplementedError
+ """Add item to library and return the new (or updated) database item."""
+ new_item = False
+ # grab additional metadata
+ if metadata_lookup:
+ await self.mass.metadata.get_metadata(item)
+ # check for existing item first
+ library_id = None
+ if cur_item := await self.get_library_item_by_prov_id(item.item_id, item.provider):
+ # existing item match by provider id
+ await self._update_library_item(cur_item.item_id, item, overwrite=overwrite_existing)
+ library_id = cur_item.item_id
+ elif cur_item := await self.get_library_item_by_external_ids(item.external_ids):
+ # existing item match by external id
+ await self._update_library_item(cur_item.item_id, item, overwrite=overwrite_existing)
+ library_id = cur_item.item_id
+ else:
+ # search by (exact) name match
+ query = f"WHERE {self.db_table}.name = :name OR {self.db_table}.sort_name = :sort_name"
+ query_params = {"name": item.name, "sort_name": item.sort_name}
+ async for db_item in self.iter_library_items(
+ extra_query=query, extra_query_params=query_params
+ ):
+ if compare_media_item(db_item, item, True):
+ # existing item found: update it
+ await self._update_library_item(
+ db_item.item_id, item, overwrite=overwrite_existing
+ )
+ library_id = db_item.item_id
+ break
+ if library_id is None:
+ # actually add a new item in the library db
+ if not item.provider_mappings:
+ msg = "Item is missing provider mapping(s)"
+ raise InvalidDataError(msg)
+ async with self._db_add_lock:
+ library_id = await self._add_library_item(item)
+ new_item = True
+ # also fetch same track on all providers (will also get other quality versions)
+ if metadata_lookup:
+ library_item = await self.get_library_item(library_id)
+ await self._match(library_item)
+ # return final library_item after all match/metadata actions
+ library_item = await self.get_library_item(library_id)
+ self.mass.signal_event(
+ EventType.MEDIA_ITEM_ADDED if new_item else EventType.MEDIA_ITEM_UPDATED,
+ library_item.uri,
+ library_item,
+ )
+ return library_item
- @abstractmethod
async def update_item_in_library(
self, item_id: str | int, update: ItemCls, overwrite: bool = False
) -> ItemCls:
"""Update existing library record in the database."""
+ await self._update_library_item(item_id, update, overwrite=overwrite)
+ # return the updated object
+ library_item = await self.get_library_item(item_id)
+ self.mass.signal_event(
+ EventType.MEDIA_ITEM_UPDATED,
+ library_item.uri,
+ library_item,
+ )
+ return library_item
async def remove_item_from_library(self, item_id: str | int) -> None:
"""Delete library record from the database."""
# Fallback to the default implementation
return await self._get_dynamic_tracks(ref_item)
+ @abstractmethod
+ async def _add_library_item(
+ self,
+ item: ItemCls,
+ metadata_lookup: bool = True,
+ overwrite_existing: bool = False,
+ ) -> int:
+ """Add artist to library and return the database id."""
+
+ @abstractmethod
+ async def _update_library_item(
+ self, item_id: str | int, update: ItemCls, overwrite: bool = False
+ ) -> None:
+ """Update existing library record in the database."""
+
+ async def _match(self, db_item: ItemCls) -> None:
+ """
+ Try to find match on all (streaming) providers for the provided (database) item.
+
+ This is used to link objects of different providers/qualities together.
+ """
+
@abstractmethod
async def _get_provider_dynamic_tracks(
self,
db_row_dict["item_id"] = str(db_row_dict["item_id"])
for key in JSON_KEYS:
- if key in db_row_dict and db_row_dict[key] not in (None, ""):
- db_row_dict[key] = json_loads(db_row_dict[key])
+ if key not in db_row_dict:
+ continue
+ if not (raw_value := db_row_dict[key]):
+ continue
+ db_row_dict[key] = json_loads(raw_value)
# copy album image to itemmapping single image
if (album := db_row_dict.get("album")) and (images := album.get("images")):
from __future__ import annotations
-import asyncio
import random
from collections.abc import AsyncGenerator
from typing import TYPE_CHECKING, Any, cast
from music_assistant.common.helpers.json import serialize_to_json
from music_assistant.common.helpers.uri import create_uri, parse_uri
-from music_assistant.common.models.enums import EventType, MediaType, ProviderFeature
+from music_assistant.common.models.enums import MediaType, ProviderFeature
from music_assistant.common.models.errors import (
InvalidDataError,
MediaNotFoundError,
ProviderUnavailableError,
UnsupportedFeaturedException,
)
-from music_assistant.common.models.media_items import ItemMapping, Playlist, PlaylistTrack, Track
+from music_assistant.common.models.media_items import Playlist, PlaylistTrack, Track
from music_assistant.constants import DB_TABLE_PLAYLISTS
from music_assistant.server.models.music_provider import MusicProvider
def __init__(self, *args, **kwargs) -> None:
"""Initialize class."""
super().__init__(*args, **kwargs)
- self._db_add_lock = asyncio.Lock()
- # register api handlers
- self.mass.register_api_command("music/playlists/library_items", self.library_items)
- self.mass.register_api_command(
- "music/playlists/update_item_in_library", self.update_item_in_library
- )
- self.mass.register_api_command(
- "music/playlists/remove_item_from_library", self.remove_item_from_library
- )
- self.mass.register_api_command("music/playlists/create_playlist", self.create_playlist)
-
- self.mass.register_api_command("music/playlists/get_playlist", self.get)
+ # register (extra) api handlers
+ api_base = self.api_base
+ self.mass.register_api_command(f"music/{api_base}/create_playlist", self.create_playlist)
self.mass.register_api_command("music/playlists/playlist_tracks", self.tracks)
self.mass.register_api_command(
"music/playlists/add_playlist_tracks", self.add_playlist_tracks
"music/playlists/remove_playlist_tracks", self.remove_playlist_tracks
)
- async def add_item_to_library(
- self, item: Playlist, metadata_lookup: bool = True, overwrite_existing: bool = False
- ) -> Playlist:
- """Add playlist to library and return the new database item."""
- if isinstance(item, ItemMapping):
- metadata_lookup = False
- item = Playlist.from_item_mapping(item)
- if not isinstance(item, Playlist):
- msg = "Not a valid Playlist object (ItemMapping can not be added to db)"
- raise InvalidDataError(msg)
- if not item.provider_mappings:
- msg = "Playlist is missing provider mapping(s)"
- raise InvalidDataError(msg)
- # check for existing item first
- library_item = None
- if cur_item := await self.get_library_item_by_prov_id(item.item_id, item.provider):
- # existing item match by provider id
- library_item = await self.update_item_in_library(
- cur_item.item_id, item, overwrite=overwrite_existing
- )
- elif cur_item := await self.get_library_item_by_external_ids(item.external_ids):
- # existing item match by external id
- library_item = await self.update_item_in_library(
- cur_item.item_id, item, overwrite=overwrite_existing
- )
- if not library_item:
- # actually add a new item in the library db
- # use the lock to prevent a race condition of the same item being added twice
- async with self._db_add_lock:
- library_item = await self._add_library_item(item)
- # preload playlist tracks listing (do not load them in the db)
- async for _ in self.tracks(item.item_id, item.provider):
- await asyncio.sleep(0) # yield to eventloop
- # metadata lookup we need to do after adding it to the db
- if metadata_lookup:
- await self.mass.metadata.get_playlist_metadata(library_item)
- library_item = await self.update_item_in_library(library_item.item_id, library_item)
- self.mass.signal_event(
- EventType.MEDIA_ITEM_ADDED,
- library_item.uri,
- library_item,
- )
- return library_item
-
- async def update_item_in_library(
- self, item_id: int, update: Playlist, overwrite: bool = False
- ) -> Playlist:
- """Update existing record in the database."""
- db_id = int(item_id) # ensure integer
- cur_item = await self.get_library_item(db_id)
- metadata = update.metadata if overwrite else cur_item.metadata.update(update.metadata)
- cur_item.external_ids.update(update.external_ids)
- await self.mass.music.database.update(
- self.db_table,
- {"item_id": db_id},
- {
- # always prefer name/owner from updated item here
- "name": update.name if overwrite else cur_item.name,
- "sort_name": update.sort_name
- if overwrite
- else cur_item.sort_name or update.sort_name,
- "owner": update.owner or cur_item.owner,
- "is_editable": update.is_editable,
- "metadata": serialize_to_json(metadata),
- "external_ids": serialize_to_json(
- update.external_ids if overwrite else cur_item.external_ids
- ),
- },
- )
- # update/set provider_mappings table
- provider_mappings = (
- update.provider_mappings
- if overwrite
- else {*cur_item.provider_mappings, *update.provider_mappings}
- )
- await self._set_provider_mappings(db_id, provider_mappings, overwrite)
- self.logger.debug("updated %s in database: %s", update.name, db_id)
- # get full created object
- library_item = await self.get_library_item(db_id)
- self.mass.signal_event(
- EventType.MEDIA_ITEM_UPDATED,
- library_item.uri,
- library_item,
- )
- # return the full item we just updated
- return library_item
-
async def tracks(
self,
item_id: str,
cache_key = f"{provider.lookup_key}.playlist.{prov_mapping.item_id}.tracks"
await self.mass.cache.delete(cache_key)
- async def _add_library_item(self, item: Playlist) -> Playlist:
+ async def _add_library_item(self, item: Playlist) -> int:
"""Add a new record to the database."""
new_item = await self.mass.music.database.insert(
self.db_table,
db_id = new_item["item_id"]
# update/set provider_mappings table
await self._set_provider_mappings(db_id, item.provider_mappings)
- self.logger.debug("added %s to database", item.name)
- # return the full item we just added
- return await self.get_library_item(db_id)
+ self.logger.debug("added %s to database (id: %s)", item.name, db_id)
+ return db_id
+
+ async def _update_library_item(
+ self, item_id: int, update: Playlist, overwrite: bool = False
+ ) -> None:
+ """Update existing record in the database."""
+ db_id = int(item_id) # ensure integer
+ cur_item = await self.get_library_item(db_id)
+ metadata = update.metadata if overwrite else cur_item.metadata.update(update.metadata)
+ cur_item.external_ids.update(update.external_ids)
+ await self.mass.music.database.update(
+ self.db_table,
+ {"item_id": db_id},
+ {
+ # always prefer name/owner from updated item here
+ "name": update.name if overwrite else cur_item.name,
+ "sort_name": update.sort_name
+ if overwrite
+ else cur_item.sort_name or update.sort_name,
+ "owner": update.owner or cur_item.owner,
+ "is_editable": update.is_editable,
+ "metadata": serialize_to_json(metadata),
+ "external_ids": serialize_to_json(
+ update.external_ids if overwrite else cur_item.external_ids
+ ),
+ },
+ )
+ # update/set provider_mappings table
+ provider_mappings = (
+ update.provider_mappings
+ if overwrite
+ else {*cur_item.provider_mappings, *update.provider_mappings}
+ )
+ await self._set_provider_mappings(db_id, provider_mappings, overwrite)
+ self.logger.debug("updated %s in database: (id %s)", update.name, db_id)
async def _get_provider_playlist_tracks(
self,
import asyncio
from music_assistant.common.helpers.json import serialize_to_json
-from music_assistant.common.models.enums import EventType, MediaType
-from music_assistant.common.models.errors import InvalidDataError
-from music_assistant.common.models.media_items import ItemMapping, Radio, Track
+from music_assistant.common.models.enums import MediaType
+from music_assistant.common.models.media_items import Radio, Track
from music_assistant.constants import DB_TABLE_RADIOS
-from music_assistant.server.helpers.compare import compare_strings, loose_compare_strings
+from music_assistant.server.helpers.compare import loose_compare_strings
from .base import MediaControllerBase
def __init__(self, *args, **kwargs) -> None:
"""Initialize class."""
super().__init__(*args, **kwargs)
- self._db_add_lock = asyncio.Lock()
- # register api handlers
- self.mass.register_api_command("music/radio/library_items", self.library_items)
- self.mass.register_api_command("music/radio/get_radio", self.get)
- self.mass.register_api_command(
- "music/radio/update_item_in_library", self.update_item_in_library
- )
- self.mass.register_api_command(
- "music/radio/remove_item_from_library", self.remove_item_from_library
- )
- self.mass.register_api_command("music/radio/radio_versions", self.versions)
+ # register (extra) api handlers
+ api_base = self.api_base
+ self.mass.register_api_command(f"music/{api_base}/radio_versions", self.versions)
async def versions(
self,
# return the aggregated result
return all_versions.values()
- async def add_item_to_library(
- self, item: Radio, metadata_lookup: bool = True, overwrite_existing: bool = False
- ) -> Radio:
- """Add radio to library and return the new database item."""
- if isinstance(item, ItemMapping):
- metadata_lookup = False
- item = Radio.from_item_mapping(item)
- if not isinstance(item, Radio):
- msg = "Not a valid Radio object"
- raise InvalidDataError(msg)
- if not item.provider_mappings:
- msg = "Radio is missing provider mapping(s)"
- raise InvalidDataError(msg)
- if metadata_lookup:
- await self.mass.metadata.get_radio_metadata(item)
- # check for existing item first
- library_item = None
- if cur_item := await self.get_library_item_by_prov_id(item.item_id, item.provider):
- # existing item match by provider id
- library_item = await self.update_item_in_library(
- cur_item.item_id, item, overwrite=overwrite_existing
- )
- elif cur_item := await self.get_library_item_by_external_ids(item.external_ids):
- # existing item match by external id
- library_item = await self.update_item_in_library(
- cur_item.item_id, item, overwrite=overwrite_existing
- )
- else:
- # search by (exact) name match
- query = f"WHERE {self.db_table}.name = :name OR {self.db_table}.sort_name = :sort_name"
- query_params = {"name": item.name, "sort_name": item.sort_name}
- async for db_item in self.iter_library_items(
- extra_query=query, extra_query_params=query_params
- ):
- if compare_strings(db_item.name, item.name, strict=True):
- # existing item found: update it
- library_item = await self.update_item_in_library(db_item.item_id, item)
- break
- if not library_item:
- # actually add a new item in the library db
- # use the lock to prevent a race condition of the same item being added twice
- async with self._db_add_lock:
- library_item = await self._add_library_item(item)
- self.mass.signal_event(
- EventType.MEDIA_ITEM_ADDED,
- library_item.uri,
- library_item,
+ async def _add_library_item(self, item: Radio) -> int:
+ """Add a new item record to the database."""
+ new_item = await self.mass.music.database.insert(
+ self.db_table,
+ {
+ "name": item.name,
+ "sort_name": item.sort_name,
+ "favorite": item.favorite,
+ "metadata": serialize_to_json(item.metadata),
+ "external_ids": serialize_to_json(item.external_ids),
+ },
)
- return library_item
+ db_id = new_item["item_id"]
+ # update/set provider_mappings table
+ await self._set_provider_mappings(db_id, item.provider_mappings)
+ self.logger.debug("added %s to database (id: %s)", item.name, db_id)
+ return db_id
- async def update_item_in_library(
+ async def _update_library_item(
self, item_id: str | int, update: Radio, overwrite: bool = False
- ) -> Radio:
+ ) -> None:
"""Update existing record in the database."""
db_id = int(item_id) # ensure integer
cur_item = await self.get_library_item(db_id)
else {*cur_item.provider_mappings, *update.provider_mappings}
)
await self._set_provider_mappings(db_id, provider_mappings, overwrite)
- self.logger.debug("updated %s in database: %s", update.name, db_id)
- # get full created object
- library_item = await self.get_library_item(db_id)
- self.mass.signal_event(
- EventType.MEDIA_ITEM_UPDATED,
- library_item.uri,
- library_item,
- )
- # return the full item we just updated
- return library_item
-
- async def _add_library_item(self, item: Radio) -> Radio:
- """Add a new item record to the database."""
- new_item = await self.mass.music.database.insert(
- self.db_table,
- {
- "name": item.name,
- "sort_name": item.sort_name,
- "favorite": item.favorite,
- "metadata": serialize_to_json(item.metadata),
- "external_ids": serialize_to_json(item.external_ids),
- },
- )
- db_id = new_item["item_id"]
- # update/set provider_mappings table
- await self._set_provider_mappings(db_id, item.provider_mappings)
- self.logger.debug("added %s to database", item.name)
- # return the full item we just added
- return await self.get_library_item(db_id)
+ self.logger.debug("updated %s in database: (id %s)", update.name, db_id)
async def _get_provider_dynamic_tracks(
self,
from __future__ import annotations
-import asyncio
import urllib.parse
from collections.abc import Iterable
from contextlib import suppress
from music_assistant.common.helpers.json import serialize_to_json
-from music_assistant.common.models.enums import AlbumType, EventType, MediaType, ProviderFeature
+from music_assistant.common.models.enums import MediaType, ProviderFeature
from music_assistant.common.models.errors import (
InvalidDataError,
MediaNotFoundError,
MusicAssistantError,
UnsupportedFeaturedException,
)
-from music_assistant.common.models.media_items import Album, Artist, ItemMapping, Track
+from music_assistant.common.models.media_items import Album, Artist, ItemMapping, Track, UniqueList
from music_assistant.constants import (
DB_TABLE_ALBUM_TRACKS,
DB_TABLE_ALBUMS,
"""Initialize class."""
super().__init__(*args, **kwargs)
self.base_query = f"""
- SELECT
+ SELECT DISTINCT
{self.db_table}.*,
CASE WHEN albums.item_id IS NULL THEN NULL ELSE
json_object(
LEFT JOIN {DB_TABLE_TRACK_ARTISTS} on {DB_TABLE_TRACK_ARTISTS}.track_id = {self.db_table}.item_id
LEFT JOIN {DB_TABLE_ARTISTS} on {DB_TABLE_ARTISTS}.item_id = {DB_TABLE_TRACK_ARTISTS}.artist_id
""" # noqa: E501
- self._db_add_lock = asyncio.Lock()
- # register api handlers
- self.mass.register_api_command("music/tracks/library_items", self.library_items)
- self.mass.register_api_command("music/tracks/get_track", self.get)
- self.mass.register_api_command("music/tracks/track_versions", self.versions)
- self.mass.register_api_command("music/tracks/track_albums", self.albums)
- self.mass.register_api_command(
- "music/tracks/update_item_in_library", self.update_item_in_library
- )
- self.mass.register_api_command(
- "music/tracks/remove_item_from_library", self.remove_item_from_library
- )
- self.mass.register_api_command("music/tracks/preview", self.get_preview_url)
+ # register (extra) api handlers
+ api_base = self.api_base
+ self.mass.register_api_command(f"music/{api_base}/track_versions", self.versions)
+ self.mass.register_api_command(f"music/{api_base}/track_albums", self.albums)
+ self.mass.register_api_command(f"music/{api_base}/preview", self.get_preview_url)
async def get(
self,
track.artists = track_artists
return track
- async def add_item_to_library(
- self, item: Track, metadata_lookup: bool = True, overwrite_existing: bool = False
- ) -> Track:
- """Add track to library and return the new database item."""
- if not isinstance(item, Track):
- msg = "Not a valid Track object (ItemMapping can not be added to db)"
- raise InvalidDataError(msg)
- if not item.artists:
- msg = "Track is missing artist(s)"
- raise InvalidDataError(msg)
- if not item.provider_mappings:
- msg = "Track is missing provider mapping(s)"
- raise InvalidDataError(msg)
- # grab additional metadata
- if metadata_lookup:
- await self.mass.metadata.get_track_metadata(item)
- # copy album image from track (only if albumtype != single)
- # this deals with embedded images from filesystem providers
- if (
- isinstance(item.album, Album)
- and not item.album.image
- and item.image
- and item.album.album_type == AlbumType.SINGLE
- ):
- item.album.metadata.images = [item.image]
- # check for existing item first
- library_item = None
- if cur_item := await self.get_library_item_by_prov_id(item.item_id, item.provider):
- # existing item match by provider id
- library_item = await self.update_item_in_library(
- cur_item.item_id, item, overwrite=overwrite_existing
- )
- elif cur_item := await self.get_library_item_by_external_ids(item.external_ids):
- # existing item match by external id
- library_item = await self.update_item_in_library(
- cur_item.item_id, item, overwrite=overwrite_existing
- )
- else:
- # search by (exact) name match
- query = f"WHERE {self.db_table}.name = :name OR {self.db_table}.sort_name = :sort_name"
- query_params = {"name": item.name, "sort_name": item.sort_name}
- async for db_item in self.iter_library_items(
- extra_query=query, extra_query_params=query_params
- ):
- if compare_track(db_item, item):
- # existing item found: update it
- library_item = await self.update_item_in_library(
- db_item.item_id, item, overwrite=overwrite_existing
- )
- break
- if not library_item:
- # actually add a new item in the library db
- # use the lock to prevent a race condition of the same item being added twice
- async with self._db_add_lock:
- library_item = await self._add_library_item(item)
- # also fetch same track on all providers (will also get other quality versions)
- if metadata_lookup:
- await self._match(library_item)
- library_item = await self.get_library_item(library_item.item_id)
- self.mass.signal_event(
- EventType.MEDIA_ITEM_ADDED,
- library_item.uri,
- library_item,
- )
- # return final library_item after all match/metadata actions
- return library_item
-
- async def update_item_in_library(
- self, item_id: str | int, update: Track, overwrite: bool = False
- ) -> Track:
- """Update Track record in the database, merging data."""
- db_id = int(item_id) # ensure integer
- cur_item = await self.get_library_item(db_id)
- metadata = update.metadata if overwrite else cur_item.metadata.update(update.metadata)
- cur_item.external_ids.update(update.external_ids)
-
- await self.mass.music.database.update(
- self.db_table,
- {"item_id": db_id},
- {
- "name": update.name if overwrite else cur_item.name,
- "sort_name": update.sort_name
- if overwrite
- else cur_item.sort_name or update.sort_name,
- "version": update.version if overwrite else cur_item.version or update.version,
- "duration": update.duration if overwrite else cur_item.duration or update.duration,
- "metadata": serialize_to_json(metadata),
- "external_ids": serialize_to_json(
- update.external_ids if overwrite else cur_item.external_ids
- ),
- },
- )
-
- # update/set provider_mappings table
- provider_mappings = (
- update.provider_mappings
- if overwrite
- else {*cur_item.provider_mappings, *update.provider_mappings}
- )
- await self._set_provider_mappings(db_id, provider_mappings, overwrite)
- # set track artist(s)
- artists = update.artists if overwrite else cur_item.artists + update.artists
- await self._set_track_artists(db_id, artists, overwrite=overwrite)
-
- # update/set track album
- if update.album:
- await self._set_track_album(
- db_id=db_id,
- album=update.album,
- disc_number=getattr(update, "disc_number", None) or 0,
- track_number=getattr(update, "track_number", None) or 1,
- overwrite=overwrite,
- )
-
- # get full/final created object
- library_item = await self.get_library_item(db_id)
- self.mass.signal_event(
- EventType.MEDIA_ITEM_UPDATED,
- library_item.uri,
- library_item,
- )
- self.logger.debug("updated %s in database: %s", update.name, db_id)
- # return the full item we just updated
- return library_item
-
async def versions(
self,
item_id: str,
provider_instance_id_or_domain: str,
- ) -> list[Track]:
+ ) -> UniqueList[Track]:
"""Return all versions of a track we can find on all providers."""
track = await self.get(item_id, provider_instance_id_or_domain, add_to_library=False)
search_query = f"{track.artist_str} - {track.name}"
- result: list[Track] = []
+ result: UniqueList[Track] = UniqueList()
for provider_id in self.mass.music.get_unique_providers():
provider = self.mass.get_provider(provider_id)
if not provider:
continue
if not provider.library_supported(MediaType.TRACK):
continue
- result += [
+ result.extend(
prov_item
for prov_item in await self.search(search_query, provider_id)
if loose_compare_strings(track.name, prov_item.name)
and compare_artists(prov_item.artists, track.artists, any_match=True)
# make sure that the 'base' version is NOT included
and not track.provider_mappings.intersection(prov_item.provider_mappings)
- ]
+ )
return result
async def albums(
item_id: str,
provider_instance_id_or_domain: str,
in_library_only: bool = False,
- ) -> list[Album]:
+ ) -> UniqueList[Album]:
"""Return all albums the track appears on."""
full_track = await self.get(item_id, provider_instance_id_or_domain)
db_items = (
if full_track.provider == "library"
else []
)
+ # return all (unique) items from all providers
+ result: UniqueList[Album] = UniqueList(db_items)
if full_track.provider == "library" and in_library_only:
# return in-library items only
- return db_items
- # return all (unique) items from all providers
- result: list[Album] = [*db_items]
+ return result
# use search to get all items on the provider
search_query = f"{full_track.artist_str} - {full_track.name}"
# TODO: we could use musicbrainz info here to get a list of all releases known
- result: list[Track] = [*db_items]
unique_ids: set[str] = set()
for prov_item in (await self.mass.music.search(search_query, [MediaType.TRACK])).tracks:
if not loose_compare_strings(full_track.name, prov_item.name):
if db_item := await self.mass.music.albums.get_library_item_by_prov_id(
prov_item.album.item_id, prov_item.album.provider
):
- if db_item not in db_items:
- result.append(db_item)
+ result.append(db_item)
elif not in_library_only:
result.append(prov_item.album)
return result
msg = "No Music Provider found that supports requesting similar tracks."
raise UnsupportedFeaturedException(msg)
- async def _add_library_item(self, item: Track) -> Track:
+ async def _add_library_item(self, item: Track) -> int:
"""Add a new item record to the database."""
+ if not isinstance(item, Track):
+ msg = "Not a valid Track object (ItemMapping can not be added to db)"
+ raise InvalidDataError(msg)
+ if not item.artists:
+ msg = "Track is missing artist(s)"
+ raise InvalidDataError(msg)
new_item = await self.mass.music.database.insert(
self.db_table,
{
disc_number=getattr(item, "disc_number", None) or 0,
track_number=getattr(item, "track_number", None) or 0,
)
- self.logger.debug("added %s to database: %s", item.name, db_id)
- # return the full item we just added
- return await self.get_library_item(db_id)
+ self.logger.debug("added %s to database (id: %s)", item.name, db_id)
+ return db_id
+
+ async def _update_library_item(
+ self, item_id: str | int, update: Track, overwrite: bool = False
+ ) -> None:
+ """Update Track record in the database, merging data."""
+ db_id = int(item_id) # ensure integer
+ cur_item = await self.get_library_item(db_id)
+ metadata = update.metadata if overwrite else cur_item.metadata.update(update.metadata)
+ cur_item.external_ids.update(update.external_ids)
+ await self.mass.music.database.update(
+ self.db_table,
+ {"item_id": db_id},
+ {
+ "name": update.name if overwrite else cur_item.name,
+ "sort_name": update.sort_name
+ if overwrite
+ else cur_item.sort_name or update.sort_name,
+ "version": update.version if overwrite else cur_item.version or update.version,
+ "duration": update.duration if overwrite else cur_item.duration or update.duration,
+ "metadata": serialize_to_json(metadata),
+ "external_ids": serialize_to_json(
+ update.external_ids if overwrite else cur_item.external_ids
+ ),
+ },
+ )
+ # update/set provider_mappings table
+ provider_mappings = (
+ update.provider_mappings
+ if overwrite
+ else {*cur_item.provider_mappings, *update.provider_mappings}
+ )
+ await self._set_provider_mappings(db_id, provider_mappings, overwrite)
+ # set track artist(s)
+ artists = update.artists if overwrite else cur_item.artists + update.artists
+ await self._set_track_artists(db_id, artists, overwrite=overwrite)
+ # update/set track album
+ if update.album:
+ await self._set_track_album(
+ db_id=db_id,
+ album=update.album,
+ disc_number=getattr(update, "disc_number", None) or 0,
+ track_number=getattr(update, "track_number", None) or 1,
+ overwrite=overwrite,
+ )
+ self.logger.debug("updated %s in database: (id %s)", update.name, db_id)
async def _set_track_album(
self,
album.item_id, album.provider
):
db_album = existing
- else:
- # not an existing album, we need to fetch before we can add it to the library
+
+ if not db_album or overwrite:
+ # ensure we have an actual album object
if isinstance(album, ItemMapping):
album = await self.mass.music.albums.get_provider_item(
album.item_id, album.provider, fallback=album
album,
metadata_lookup=False,
overwrite_existing=overwrite,
- add_album_tracks=False,
)
if not db_album:
# this should not happen but streaming providers can be awful sometimes
"track_id": db_id,
},
)
- artist_mappings: list[ItemMapping] = []
+ artist_mappings: UniqueList[ItemMapping] = UniqueList()
for artist in artists:
mapping = await self._set_track_artist(db_id, artist=artist, overwrite=overwrite)
artist_mappings.append(mapping)
# NOTE: we do not have any metadata for radio so consider this future proofing ;-)
radio.metadata.last_refresh = int(time())
+ async def get_metadata(self, item: MediaItemType) -> None:
+ """Get/update rich metadata for/on given MediaItem."""
+ if item.media_type == MediaType.ARTIST:
+ await self.get_artist_metadata(item)
+ if item.media_type == MediaType.ALBUM:
+ await self.get_album_metadata(item)
+ if item.media_type == MediaType.TRACK:
+ await self.get_track_metadata(item)
+ if item.media_type == MediaType.PLAYLIST:
+ await self.get_playlist_metadata(item)
+ if item.media_type == MediaType.RADIO:
+ await self.get_radio_metadata(item)
+
async def get_artist_mbid(self, artist: Artist) -> str | None:
"""Fetch musicbrainz id by performing search using the artist name, albums and tracks."""
if compare_strings(artist.name, VARIOUS_ARTISTS_NAME):
import unidecode
from music_assistant.common.helpers.util import create_sort_name
-from music_assistant.common.models.enums import ExternalID
+from music_assistant.common.models.enums import ExternalID, MediaType
from music_assistant.common.models.media_items import (
Album,
Artist,
ItemMapping,
MediaItem,
MediaItemMetadata,
+ MediaItemType,
+ Playlist,
+ Radio,
Track,
)
)
+def compare_media_item(
+ base_item: MediaItemType | ItemMapping,
+ compare_item: MediaItemType | ItemMapping,
+ strict: bool = True,
+) -> bool | None:
+ """Compare two media items and return True if they match."""
+ if base_item.media_type == MediaType.ARTIST and compare_item.media_type == MediaType.ARTIST:
+ return compare_artist(base_item, compare_item, strict)
+ if base_item.media_type == MediaType.ALBUM and compare_item.media_type == MediaType.ALBUM:
+ return compare_album(base_item, compare_item, strict)
+ if base_item.media_type == MediaType.TRACK and compare_item.media_type == MediaType.TRACK:
+ return compare_track(base_item, compare_item, strict)
+ if base_item.media_type == MediaType.PLAYLIST and compare_item.media_type == MediaType.PLAYLIST:
+ return compare_playlist(base_item, compare_item, strict)
+ if base_item.media_type == MediaType.RADIO and compare_item.media_type == MediaType.RADIO:
+ return compare_radio(base_item, compare_item, strict)
+ return compare_item_mapping(base_item, compare_item, strict)
+
+
def compare_artist(
base_item: Artist | ItemMapping,
compare_item: Artist | ItemMapping,
- allow_name_match: bool = True,
+ strict: bool = True,
) -> bool | None:
"""Compare two artist items and return True if they match."""
if base_item is None or compare_item is None:
external_id_match = compare_external_ids(base_item.external_ids, compare_item.external_ids)
if external_id_match is not None:
return external_id_match
- ## fallback to comparing on attributes
- name_match = compare_strings(base_item.name, compare_item.name, strict=True)
- if name_match is False:
- return False
- return name_match if allow_name_match else None
+ # finally comparing on (exact) name match
+ return compare_strings(base_item.name, compare_item.name, strict=strict)
def compare_album(
base_item: Album | ItemMapping,
compare_item: Album | ItemMapping,
- allow_name_match: bool = True,
+ strict: bool = True,
) -> bool | None:
"""Compare two album items and return True if they match."""
if base_item is None or compare_item is None:
external_id_match = compare_external_ids(base_item.external_ids, compare_item.external_ids)
if external_id_match is not None:
return external_id_match
- ## fallback to comparing on attributes
# compare version
if not compare_version(base_item.version, compare_item.version):
return False
# compare name
- name_match = compare_strings(base_item.name, compare_item.name, strict=True)
- if name_match is False:
+ if not compare_strings(base_item.name, compare_item.name, strict=True):
return False
+ if not strict and (isinstance(base_item, ItemMapping) or isinstance(compare_item, ItemMapping)):
+ return True
+ # for strict matching we REQUIRE both items to be a real album object
+ assert isinstance(base_item, Album)
+ assert isinstance(compare_item, Album)
# compare explicitness
- if (
- hasattr(base_item, "metadata")
- and hasattr(compare_item, "metadata")
- and compare_explicit(base_item.metadata, compare_item.metadata) is False
- ):
+ if compare_explicit(base_item.metadata, compare_item.metadata) is False:
return False
# compare album artist
- # Note: Not present on ItemMapping
- if (
- isinstance(base_item, Album)
- and isinstance(compare_item, Album)
- and not compare_artists(base_item.artists, compare_item.artists, True)
- ):
- return False
- return name_match if allow_name_match else None
+ return compare_artists(base_item.artists, compare_item.artists, True)
def compare_track(
- base_item: Track,
- compare_item: Track,
+ base_item: Track | ItemMapping,
+ compare_item: Track | ItemMapping,
strict: bool = True,
track_albums: list[Album | ItemMapping] | None = None,
) -> bool:
"""Compare two track items and return True if they match."""
if base_item is None or compare_item is None:
return False
- assert isinstance(base_item, Track)
- assert isinstance(compare_item, Track)
# return early on exact item_id match
if compare_item_ids(base_item, compare_item):
return True
# track version must match
if strict and not compare_version(base_item.version, compare_item.version):
return False
+ if not strict and (isinstance(base_item, ItemMapping) or isinstance(compare_item, ItemMapping)):
+ return True
+ # for strict matching we REQUIRE both items to be a real track object
+ assert isinstance(base_item, Track)
+ assert isinstance(compare_item, Track)
# check if both tracks are (not) explicit
if base_item.metadata.explicit is None and isinstance(base_item.album, Album):
base_item.metadata.explicit = base_item.album.metadata.explicit
if (
base_item.album
and compare_item.album
- and compare_album(base_item.album, compare_item.album)
+ and compare_album(base_item.album, compare_item.album, False)
and base_item.track_number == compare_item.track_number
):
return True
if (
base_item.album is not None
and compare_item.album is not None
- and compare_album(base_item.album, compare_item.album)
+ and compare_album(base_item.album, compare_item.album, False)
and abs(base_item.duration - compare_item.duration) <= 3
):
return True
and abs(base_item.duration - compare_item.duration) <= 3
):
for track_album in track_albums:
- if compare_album(track_album, compare_item.album):
+ if compare_album(track_album, compare_item.album, False):
return True
# edge case: albumless track
if (
return False
+def compare_playlist(
+ base_item: Playlist | ItemMapping,
+ compare_item: Playlist | ItemMapping,
+ strict: bool = True,
+) -> bool | None:
+ """Compare two Playlist items and return True if they match."""
+ if base_item is None or compare_item is None:
+ return False
+ # return early on exact item_id match
+ if compare_item_ids(base_item, compare_item):
+ return True
+ # return early on (un)matched external id
+ external_id_match = compare_external_ids(base_item.external_ids, compare_item.external_ids)
+ if external_id_match is not None:
+ return external_id_match
+ # compare owner (if not ItemMapping)
+ if isinstance(base_item, Playlist) and isinstance(compare_item, Playlist):
+ if not compare_strings(base_item.owner, compare_item.owner):
+ return False
+ # compare version
+ if not compare_version(base_item.version, compare_item.version):
+ return False
+ # finally comparing on (exact) name match
+ return compare_strings(base_item.name, compare_item.name, strict=strict)
+
+
+def compare_radio(
+ base_item: Radio | ItemMapping,
+ compare_item: Radio | ItemMapping,
+ strict: bool = True,
+) -> bool | None:
+ """Compare two Radio items and return True if they match."""
+ if base_item is None or compare_item is None:
+ return False
+ # return early on exact item_id match
+ if compare_item_ids(base_item, compare_item):
+ return True
+ # return early on (un)matched external id
+ external_id_match = compare_external_ids(base_item.external_ids, compare_item.external_ids)
+ if external_id_match is not None:
+ return external_id_match
+ # compare version
+ if not compare_version(base_item.version, compare_item.version):
+ return False
+ # finally comparing on (exact) name match
+ return compare_strings(base_item.name, compare_item.name, strict=strict)
+
+
+def compare_item_mapping(
+ base_item: ItemMapping,
+ compare_item: ItemMapping,
+ strict: bool = True,
+) -> bool | None:
+ """Compare two ItemMapping items and return True if they match."""
+ if base_item is None or compare_item is None:
+ return False
+ # return early on exact item_id match
+ if compare_item_ids(base_item, compare_item):
+ return True
+ # return early on (un)matched external id
+ external_id_match = compare_external_ids(base_item.external_ids, compare_item.external_ids)
+ if external_id_match is not None:
+ return external_id_match
+ # compare version
+ if not compare_version(base_item.version, compare_item.version):
+ return False
+ # finally comparing on (exact) name match
+ return compare_strings(base_item.name, compare_item.name, strict=strict)
+
+
def compare_artists(
base_items: list[Artist | ItemMapping],
compare_items: list[Artist | ItemMapping],
# note that we skip the metadata lookup purely to speed up the sync
# the additional metadata is then lazy retrieved afterwards
prov_item.favorite = True
- extra_kwargs = (
- {"add_album_tracks": True} if media_type == MediaType.ALBUM else {}
- )
library_item = await controller.add_item_to_library(
- prov_item, metadata_lookup=False, **extra_kwargs
+ prov_item, metadata_lookup=False
)
elif library_item.metadata.cache_checksum != prov_item.metadata.cache_checksum:
# existing dbitem checksum changed
if track.album and not track.album.metadata.images:
# set embedded cover on album if it does not have one yet
track.album.metadata.images = track.metadata.images
+ # copy album image from track (only if the album itself doesn't have an image)
+ # this deals with embedded images from filesystem providers
+ if track.album and not track.album.image and track.image:
+ track.album.metadata.images = [track.image]
# parse other info
track.duration = tags.duration or 0
import re
from contextlib import suppress
from dataclasses import dataclass, field
-from json import JSONDecodeError
from typing import TYPE_CHECKING, Any
-import aiohttp.client_exceptions
-from asyncio_throttle import Throttler
from mashumaro import DataClassDictMixin
from mashumaro.exceptions import MissingField
+from music_assistant.common.helpers.json import json_loads
from music_assistant.common.helpers.util import parse_title_and_version
from music_assistant.common.models.enums import ExternalID, ProviderFeature
-from music_assistant.common.models.errors import InvalidDataError
+from music_assistant.common.models.errors import (
+ InvalidDataError,
+ MediaNotFoundError,
+ ResourceTemporarilyUnavailable,
+)
from music_assistant.server.controllers.cache import use_cache
from music_assistant.server.helpers.compare import compare_strings
+from music_assistant.server.helpers.throttle_retry import ThrottlerManager, throttle_with_retries
from music_assistant.server.models.metadata_provider import MetadataProvider
if TYPE_CHECKING:
class MusicbrainzProvider(MetadataProvider):
"""The Musicbrainz Metadata provider."""
- throttler: Throttler
+ throttler = ThrottlerManager(rate_limit=1, period=1)
async def handle_async_init(self) -> None:
"""Handle async initialization of the provider."""
self.cache = self.mass.cache
- self.throttler = Throttler(rate_limit=1, period=1)
@property
def supported_features(self) -> tuple[ProviderFeature, ...]:
raise InvalidDataError(msg)
async def get_recording_details(
- self, recording_id: str | None = None, isrsc: str | None = None
+ self, recording_id: str | None = None, isrc: str | None = None
) -> MusicBrainzRecording:
"""Get Recording details by providing a MusicBrainz recording id OR isrc."""
- assert recording_id or isrsc, "Provider either Recording ID or ISRC"
+ assert recording_id or isrc, "Provider either Recording ID or ISRC"
if not recording_id:
# lookup recording id first by isrc
- if (result := await self.get_data(f"isrc/{isrsc}")) and result.get("recordings"):
+ if (result := await self.get_data(f"isrc/{isrc}")) and result.get("recordings"):
recording_id = result["recordings"][0]["id"]
else:
msg = "Invalid ISRC provided"
return None
for isrc in isrcs:
result = None
- with suppress(InvalidDataError):
+ with suppress(InvalidDataError, MediaNotFoundError):
result = await self.get_recording_details(ref_track.mbid, isrc)
if not (result and result.artist_credit):
return None
return None
@use_cache(86400 * 30)
+ @throttle_with_retries
async def get_data(self, endpoint: str, **kwargs: dict[str, Any]) -> Any:
"""Get data from api."""
url = f"http://musicbrainz.org/ws/2/{endpoint}"
headers = {
- "User-Agent": f"Music Assistant/{self.mass.version} ( https://github.com/music-assistant )" # noqa: E501
+ "User-Agent": f"Music Assistant/{self.mass.version} (https://music-assistant.io)"
}
kwargs["fmt"] = "json" # type: ignore[assignment]
async with (
- self.throttler,
- self.mass.http_session.get(
- url, headers=headers, params=kwargs, raise_for_status=True
- ) as response,
+ self.mass.http_session.get(url, headers=headers, params=kwargs) as response,
):
- try:
- result = await response.json()
- except (
- aiohttp.client_exceptions.ContentTypeError,
- JSONDecodeError,
- ) as exc:
- msg = await response.text()
- self.logger.warning("%s - %s", str(exc), msg)
- result = None
- return result
+ # handle rate limiter
+ if response.status == 429:
+ backoff_time = int(response.headers.get("Retry-After", 0))
+ raise ResourceTemporarilyUnavailable("Rate Limiter", backoff_time=backoff_time)
+ # handle temporary server error
+ if response.status in (502, 503):
+ raise ResourceTemporarilyUnavailable(backoff_time=30)
+ # handle 404 not found, convert to MediaNotFoundError
+ if response.status == 404:
+ raise MediaNotFoundError(f"{endpoint} not found")
+ response.raise_for_status()
+ return await response.json(loads=json_loads)