"""Add album to local db and return the database item."""
# resolve any ItemMapping artists
item.artists = [
- await self.mass.music.artists.get_provider_item(artist.item_id, artist.provider)
+ await self.mass.music.artists.get_provider_item(
+ artist.item_id, artist.provider, fallback=artist
+ )
if isinstance(artist, ItemMapping)
else artist
for artist in item.artists
continue
# we must fetch the full album version, search results are simplified objects
prov_album = await self.get_provider_item(
- search_result_item.item_id, search_result_item.provider
+ search_result_item.item_id,
+ search_result_item.provider,
+ fallback=search_result_item,
)
if compare_album(prov_album, db_album):
# 100% match, we can simply update the db with additional provider ids
# 100% album match
# get full artist details so we have all metadata
prov_artist = await self.get_provider_item(
- search_item_artist.item_id, search_item_artist.provider
+ search_item_artist.item_id,
+ search_item_artist.provider,
+ fallback=search_result_item,
)
await self._update_db_item(db_artist.item_id, prov_artist)
return True
prov_artist = await self.get_provider_item(
search_result_item.artists[0].item_id,
search_result_item.artists[0].provider,
+ fallback=search_result_item,
)
await self._update_db_item(db_artist.item_id, prov_artist)
return True
self.mass.signal_event(EventType.MEDIA_ITEM_UPDATED, db_item.uri, db_item)
async def get_provider_item(
- self, item_id: str, provider_instance_id_or_domain: str, force_refresh: bool = False
+ self,
+ item_id: str,
+ provider_instance_id_or_domain: str,
+ force_refresh: bool = False,
+ fallback: ItemMapping | ItemCls = None,
) -> ItemCls:
"""Return item details for the given provider item id."""
cache_key = (
return await self.get_db_item(item_id)
if not force_refresh and (cache := await self.mass.cache.get(cache_key)):
return self.item_cls.from_dict(cache)
- if provider := self.mass.get_provider(provider_instance_id_or_domain): # noqa: SIM102
- item: MediaItemType = None
- try:
- item = await provider.get_item(self.media_type, item_id)
- except MediaNotFoundError:
- # fallback to domain matching
- for provider in self.mass.music.providers:
- if not provider.available:
- continue
- if provider_instance_id_or_domain != provider.domain:
- continue
- with suppress(MediaNotFoundError):
- if item := await provider.get_item(self.media_type, item_id):
- break
- if item:
- await self.mass.cache.set(cache_key, item.to_dict())
- return item
+ if provider := self.mass.get_provider(provider_instance_id_or_domain):
+ with suppress(MediaNotFoundError):
+ if item := await provider.get_item(self.media_type, item_id):
+ await self.mass.cache.set(cache_key, item.to_dict())
+ return item
+ # if we reach this point all possibilities failed and the item could not be found.
+ # There is a possibility that the (streaming) provider changed the id of the item
+ # so we return the previous details (if we have any) marked as unavailable, so
+ # at least we have the possibility to sort out the new id through matching logic.
+ if not fallback:
+ fallback = await self.get_db_item_by_prov_id(item_id, provider_instance_id_or_domain)
+ if fallback:
+ fallback_result = ItemCls(
+ item_id=item_id,
+ provider=provider.instance_id,
+ name=fallback.name,
+ provider_mappings={
+ ProviderMapping(
+ item_id=item_id,
+ provider_domain=provider.domain,
+ provider_instance=provider.instance_id,
+ available=False,
+ )
+ },
+ )
+ if hasattr(fallback, "version") and hasattr(fallback_result, "version"):
+ fallback_result.version = fallback.version
+ return fallback_result
raise MediaNotFoundError(
f"{self.media_type.value}://{item_id} not "
"found on provider {provider_instance_id_or_domain}"
assert item.artists
# resolve any ItemMapping artists
item.artists = [
- await self.mass.music.artists.get_provider_item(artist.item_id, artist.provider)
+ await self.mass.music.artists.get_provider_item(
+ artist.item_id, artist.provider, fallback=artist
+ )
if isinstance(artist, ItemMapping)
else artist
for artist in item.artists
# resolve ItemMapping album
if isinstance(item.album, ItemMapping):
item.album = await self.mass.music.albums.get_provider_item(
- item.album.item_id, item.album.provider
+ item.album.item_id, item.album.provider, fallback=item.album
)
if item.album:
item.album.artists = [
- await self.mass.music.artists.get_provider_item(artist.item_id, artist.provider)
+ await self.mass.music.artists.get_provider_item(
+ artist.item_id, artist.provider, fallback=artist
+ )
if isinstance(artist, ItemMapping)
else artist
for artist in item.album.artists
continue
# we must fetch the full album version, search results are simplified objects
prov_track = await self.get_provider_item(
- search_result_item.item_id, search_result_item.provider
+ search_result_item.item_id,
+ search_result_item.provider,
+ fallback=search_result_item,
)
if compare_track(prov_track, db_track):
# 100% match, we can simply update the db with additional provider ids
async def get_album(self, prov_album_id) -> Album:
"""Get full album details by id."""
- plex_album = await self._get_data(prov_album_id, PlexAlbum)
- return await self._parse_album(plex_album) if plex_album else None
+ if plex_album := await self._get_data(prov_album_id, PlexAlbum):
+ return await self._parse_album(plex_album)
+ raise MediaNotFoundError(f"Item {prov_album_id} not found")
async def get_album_tracks(self, prov_album_id: str) -> list[Track]:
"""Get album tracks for given album id."""
async def get_artist(self, prov_artist_id) -> Artist:
"""Get full artist details by id."""
- plex_artist = await self._get_data(prov_artist_id, PlexArtist)
- return await self._parse_artist(plex_artist) if plex_artist else None
+ if plex_artist := await self._get_data(prov_artist_id, PlexArtist):
+ return await self._parse_artist(plex_artist)
+ raise MediaNotFoundError(f"Item {prov_artist_id} not found")
async def get_track(self, prov_track_id) -> Track:
"""Get full track details by id."""
- plex_track = await self._get_data(prov_track_id, PlexTrack)
- return await self._parse_track(plex_track)
+ if plex_track := await self._get_data(prov_track_id, PlexTrack):
+ return await self._parse_track(plex_track)
+ raise MediaNotFoundError(f"Item {prov_track_id} not found")
async def get_playlist(self, prov_playlist_id) -> Playlist:
"""Get full playlist details by id."""
- plex_playlist = await self._get_data(prov_playlist_id, PlexPlaylist)
- return await self._parse_playlist(plex_playlist)
+ if plex_playlist := await self._get_data(prov_playlist_id, PlexPlaylist):
+ return await self._parse_playlist(plex_playlist)
+ raise MediaNotFoundError(f"Item {prov_playlist_id} not found")
async def get_playlist_tracks( # type: ignore[return]
self, prov_playlist_id: str
async def get_artist(self, prov_artist_id) -> Artist:
"""Get full artist details by id."""
params = {"artist_id": prov_artist_id}
- artist_obj = await self._get_data("artist/get", **params)
- return await self._parse_artist(artist_obj) if artist_obj and artist_obj["id"] else None
+ if (artist_obj := await self._get_data("artist/get", **params)) and artist_obj["id"]:
+ return await self._parse_artist(artist_obj)
+ raise MediaNotFoundError(f"Item {prov_artist_id} not found")
async def get_album(self, prov_album_id) -> Album:
"""Get full album details by id."""
params = {"album_id": prov_album_id}
- album_obj = await self._get_data("album/get", **params)
- return await self._parse_album(album_obj) if album_obj and album_obj["id"] else None
+ if (album_obj := await self._get_data("album/get", **params)) and album_obj["id"]:
+ return await self._parse_album(album_obj)
+ raise MediaNotFoundError(f"Item {prov_album_id} not found")
async def get_track(self, prov_track_id) -> Track:
"""Get full track details by id."""
params = {"track_id": prov_track_id}
- track_obj = await self._get_data("track/get", **params)
- return await self._parse_track(track_obj) if track_obj and track_obj["id"] else None
+ if (track_obj := await self._get_data("track/get", **params)) and track_obj["id"]:
+ return await self._parse_track(track_obj)
+ raise MediaNotFoundError(f"Item {prov_track_id} not found")
async def get_playlist(self, prov_playlist_id) -> Playlist:
"""Get full playlist details by id."""
params = {"playlist_id": prov_playlist_id}
- playlist_obj = await self._get_data("playlist/get", **params)
- return (
- await self._parse_playlist(playlist_obj)
- if playlist_obj and playlist_obj["id"]
- else None
- )
+ if (playlist_obj := await self._get_data("playlist/get", **params)) and playlist_obj["id"]:
+ return await self._parse_playlist(playlist_obj)
+ raise MediaNotFoundError(f"Item {prov_playlist_id} not found")
async def get_album_tracks(self, prov_album_id) -> list[Track]:
"""Get all album tracks for given album id."""
async def get_album(self, prov_album_id) -> Album:
"""Get full album details by id."""
- album_obj = await self._get_data(f"albums/{prov_album_id}")
- return await self._parse_album(album_obj) if album_obj else None
+ if album_obj := await self._get_data(f"albums/{prov_album_id}"):
+ return await self._parse_album(album_obj)
+ raise MediaNotFoundError(f"Item {prov_album_id} not found")
async def get_track(self, prov_track_id) -> Track:
"""Get full track details by id."""
- track_obj = await self._get_data(f"tracks/{prov_track_id}")
- return await self._parse_track(track_obj) if track_obj else None
+ if track_obj := await self._get_data(f"tracks/{prov_track_id}"):
+ return await self._parse_track(track_obj)
+ raise MediaNotFoundError(f"Item {prov_track_id} not found")
async def get_playlist(self, prov_playlist_id) -> Playlist:
"""Get full playlist details by id."""
- playlist_obj = await self._get_data(f"playlists/{prov_playlist_id}")
- return await self._parse_playlist(playlist_obj) if playlist_obj else None
+ if playlist_obj := await self._get_data(f"playlists/{prov_playlist_id}"):
+ return await self._parse_playlist(playlist_obj)
+ raise MediaNotFoundError(f"Item {prov_playlist_id} not found")
async def get_album_tracks(self, prov_album_id) -> list[Track]:
"""Get all album tracks for given album id."""
async for radio in self.get_library_radios():
if radio.item_id == prov_radio_id:
return radio
- return None
+ raise MediaNotFoundError(f"Item {prov_radio_id} not found")
async def _parse_radio(
self, details: dict, stream: dict | None = None, folder: str | None = None
async def get_album(self, prov_album_id) -> Album:
"""Get full album details by id."""
- album_obj = await get_album(prov_album_id=prov_album_id)
- return (
- await self._parse_album(album_obj=album_obj, album_id=prov_album_id)
- if album_obj
- else None
- )
+ if album_obj := await get_album(prov_album_id=prov_album_id):
+ return await self._parse_album(album_obj=album_obj, album_id=prov_album_id)
+ raise MediaNotFoundError(f"Item {prov_album_id} not found")
async def get_album_tracks(self, prov_album_id: str) -> list[Track]:
"""Get album tracks for given album id."""
async def get_artist(self, prov_artist_id) -> Artist:
"""Get full artist details by id."""
- artist_obj = await get_artist(prov_artist_id=prov_artist_id)
- return await self._parse_artist(artist_obj=artist_obj) if artist_obj else None
+ if artist_obj := await get_artist(prov_artist_id=prov_artist_id):
+ return await self._parse_artist(artist_obj=artist_obj)
+ raise MediaNotFoundError(f"Item {prov_artist_id} not found")
async def get_track(self, prov_track_id) -> Track:
"""Get full track details by id."""
- track_obj = await get_track(prov_track_id=prov_track_id)
- return await self._parse_track(track_obj)
+ if track_obj := await get_track(prov_track_id=prov_track_id):
+ return await self._parse_track(track_obj)
+ raise MediaNotFoundError(f"Item {prov_track_id} not found")
async def get_playlist(self, prov_playlist_id) -> Playlist:
"""Get full playlist details by id."""
- playlist_obj = await get_playlist(
+ if playlist_obj := await get_playlist(
prov_playlist_id=prov_playlist_id,
headers=self._headers,
username=self.config.get_value(CONF_USERNAME),
- )
- return await self._parse_playlist(playlist_obj)
+ ):
+ return await self._parse_playlist(playlist_obj)
+ raise MediaNotFoundError(f"Item {prov_playlist_id} not found")
async def get_playlist_tracks(self, prov_playlist_id) -> AsyncGenerator[Track, None]:
"""Get all playlist tracks for given playlist id."""
def get_provider(
self, provider_instance_or_domain: str, return_unavailable: bool = False
) -> ProviderInstanceType | None:
- """Return provider by instance id (or domain)."""
- prov = self._providers.get(provider_instance_or_domain)
- if prov is not None and (return_unavailable or prov.available):
- return prov
+ """Return provider by instance id or domain."""
+ # lookup by instance_id first
+ if prov := self._providers.get(provider_instance_or_domain):
+ if return_unavailable or prov.available:
+ return prov
+ if prov.is_unique:
+ # no need to lookup other instances because this provider has unique data
+ return None
+ provider_instance_or_domain = prov.domain
+ # fallback to match on domain
for prov in self._providers.values():
if prov.domain != provider_instance_or_domain:
continue