"""Check equality of two items."""
return self.__hash__() == other.__hash__()
- def __post_init__(self):
- """Call after init."""
+ @classmethod
+ def __pre_deserialize__(cls, d: dict[Any, Any]) -> dict[Any, Any]:
+ """Handle actions before deserialization."""
# migrate from url provider --> builtin
# TODO: remove this after 2.0 is launched
- if self.provider == "url":
- self.provider = "builtin"
- self.remotely_accessible = True
+ if d["provider"] == "url":
+ d["provider"] = "builtin"
+ d["remotely_accessible"] = True
+ return d
@dataclass(frozen=True, kw_only=True)
full_album = await self.get_provider_item(item_id, provider_instance_id_or_domain)
# prefer cache items (if any) for streaming providers only
- cache_key = f"{prov.instance_id}.albumtracks.{item_id}"
+ cache_key = f"{prov.lookup_key}.albumtracks.{item_id}"
if prov.is_streaming_provider and (cache := await self.mass.cache.get(cache_key)):
return [AlbumTrack.from_dict(x) for x in cache]
# no items in cache - get listing from provider
if not provider.is_streaming_provider:
# matching on unique providers is pointless as they push (all) their content to MA
continue
- if await self._match(db_artist, provider):
- cur_provider_domains.add(provider.domain)
+ for unpack_item_mapping in (False, True):
+ if await self._match(db_artist, provider, unpack_item_mapping):
+ cur_provider_domains.add(provider.domain)
+ break
else:
self.logger.debug(
"Could not find match for Artist %s on provider %s",
if prov is None:
return []
# prefer cache items (if any) - for streaming providers
- cache_key = f"{prov.instance_id}.artist_toptracks.{item_id}"
+ cache_key = f"{prov.lookup_key}.artist_toptracks.{item_id}"
if prov.is_streaming_provider and (cache := await self.mass.cache.get(cache_key)):
return [Track.from_dict(x) for x in cache]
# no items in cache - get listing from provider
if prov is None:
return []
# prefer cache items (if any)
- cache_key = f"{prov.instance_id}.artist_albums.{item_id}"
+ cache_key = f"{prov.lookup_key}.artist_albums.{item_id}"
if prov.is_streaming_provider and (cache := await self.mass.cache.get(cache_key)):
return [Album.from_dict(x) for x in cache]
# no items in cache - get listing from provider
msg = "No Music Provider found that supports requesting similar tracks."
raise UnsupportedFeaturedException(msg)
- async def _match(self, db_artist: Artist, provider: MusicProvider) -> bool:
+ async def _match(
+ self, db_artist: Artist, provider: MusicProvider, unpack_item_mapping: bool = False
+ ) -> bool:
"""Try to find matching artists on given provider for the provided (database) artist."""
self.logger.debug("Trying to match artist %s on provider %s", db_artist.name, provider.name)
# try to get a match with some reference tracks of this artist
for ref_track in ref_tracks:
# make sure we have a full track
if isinstance(ref_track.album, ItemMapping):
+ if not unpack_item_mapping:
+ continue
try:
- ref_track = await self.mass.music.tracks.get_provider_item( # noqa: PLW2901
+ ref_track = await self.mass.music.tracks.get( # noqa: PLW2901
ref_track.item_id, ref_track.provider
)
except MediaNotFoundError:
from music_assistant.common.helpers.json import json_loads, serialize_to_json
from music_assistant.common.models.enums import EventType, ExternalID, MediaType, ProviderFeature
-from music_assistant.common.models.errors import InvalidDataError, MediaNotFoundError
+from music_assistant.common.models.errors import (
+ InvalidDataError,
+ MediaNotFoundError,
+ ProviderUnavailableError,
+)
from music_assistant.common.models.media_items import (
Album,
Artist,
return []
# prefer cache items (if any)
- cache_key = f"{prov.instance_id}.search.{self.media_type.value}.{search_query}.{limit}"
+ cache_key = f"{prov.lookup_key}.search.{self.media_type.value}.{search_query}.{limit}"
if cache := await self.mass.cache.get(cache_key):
return [media_from_dict(x) for x in cache]
# no items in cache - get listing from provider
fallback: ItemMapping | ItemCls = None,
) -> ItemCls:
"""Return item details for the given provider item id."""
- cache_key = (
- f"provider_item.{self.media_type.value}.{provider_instance_id_or_domain}.{item_id}"
- )
+ if not (provider := self.mass.get_provider(provider_instance_id_or_domain)):
+ raise ProviderUnavailableError(f"{provider_instance_id_or_domain} is not available")
+ cache_key = f"provider_item.{self.media_type.value}.{provider.lookup_key}.{item_id}"
if provider_instance_id_or_domain == "library":
return await self.get_library_item(item_id)
if not force_refresh and (cache := await self.mass.cache.get(cache_key)):
# actually add the tracks to the playlist on the provider
await playlist_prov.add_playlist_tracks(playlist_prov_map.item_id, list(ids_to_add))
# invalidate cache so tracks get refreshed
- cache_key = f"{playlist_prov.instance_id}.playlist.{playlist_prov_map.item_id}.tracks"
+ cache_key = f"{playlist_prov.lookup_key}.playlist.{playlist_prov_map.item_id}.tracks"
await self.mass.cache.delete(cache_key)
async def add_playlist_track(self, db_playlist_id: str | int, track_uri: str) -> None:
continue
await provider.remove_playlist_tracks(prov_mapping.item_id, positions_to_remove)
# invalidate cache so tracks get refreshed
- cache_key = f"{provider.instance_id}.playlist.{prov_mapping.item_id}.tracks"
+ cache_key = f"{provider.lookup_key}.playlist.{prov_mapping.item_id}.tracks"
await self.mass.cache.delete(cache_key)
async def _add_library_item(self, item: Playlist) -> Playlist:
if not provider:
return
# prefer cache items (if any)
- cache_key = f"{provider.instance_id}.playlist.{item_id}.tracks"
+ cache_key = f"{provider.lookup_key}.playlist.{item_id}.tracks"
if cache := await self.mass.cache.get(cache_key, checksum=cache_checksum):
for track_dict in cache:
yield PlaylistTrack.from_dict(track_dict)
assert item.position is not None, "Playlist items require position to be set"
yield item
all_items.append(item)
+ # if this is a complete track object, pre-cache it as
+ # that will save us an (expensive) lookup later
+ if item.image and item.artist_str and item.album and provider.domain != "builtin":
+ cache_key = f"provider_item.track.{provider.lookup_key}.{item_id}"
+ await self.mass.cache.set(cache_key, item.to_dict())
# store (serializable items) in cache
if cache_checksum != "no_cache":
self.mass.create_task(
# create full db item
# note that we skip the metadata lookup purely to speed up the sync
# the additional metadata is then lazy retrieved afterwards
-
prov_item.favorite = True
extra_kwargs = (
{"add_album_tracks": True} if media_type == MediaType.ALBUM else {}
import shortuuid
+from music_assistant.common.helpers.uri import parse_uri
from music_assistant.common.models.config_entries import ConfigEntry
from music_assistant.common.models.enums import (
ConfigEntryType,
ProviderFeature,
StreamType,
)
-from music_assistant.common.models.errors import InvalidDataError, MediaNotFoundError
+from music_assistant.common.models.errors import (
+ InvalidDataError,
+ MediaNotFoundError,
+ ProviderUnavailableError,
+)
from music_assistant.common.models.media_items import (
AlbumTrack,
Artist,
# user created universal playlist
conf_key = f"{CONF_KEY_PLAYLIST_ITEMS}/{prov_playlist_id}"
playlist_items: list[str] = self.mass.config.get(conf_key, [])
- for count, playlist_item_uri in enumerate(playlist_items, 1):
+ for count, uri in enumerate(playlist_items, 1):
try:
- base_item = await self.mass.music.get_item_by_uri(playlist_item_uri)
+ # get the provider item and not the full track from a regular 'get' call
+ # as we only need basic track info here
+ media_type, provider_instance_id_or_domain, item_id = await parse_uri(uri)
+ media_controller = self.mass.music.get_controller(media_type)
+ base_item = await media_controller.get_provider_item(
+ item_id, provider_instance_id_or_domain
+ )
yield PlaylistTrack.from_dict({**base_item.to_dict(), "position": count})
- except (MediaNotFoundError, InvalidDataError) as err:
- self.logger.warning("Skipping item in playlist: %s:%s", playlist_item_uri, str(err))
+ except (MediaNotFoundError, InvalidDataError, ProviderUnavailableError) as err:
+ self.logger.warning("Skipping item in playlist: %s:%s", uri, str(err))
async def add_playlist_tracks(self, prov_playlist_id: str, prov_track_ids: list[str]) -> None:
"""Add track(s) to playlist."""