CONF_AUTH_TOKEN = "token"
CONF_LIBRARY_ID = "library_id"
+FAKE_ARTIST_PREFIX = "_fake://"
+
async def setup(
mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
name,
)
+ async def _get_or_create_artist_by_name(self, artist_name) -> Artist:
+ query = (
+ "SELECT * FROM artists WHERE name = :name AND provider_mappings = :provider_instance"
+ )
+ params = {
+ "name": f"%{artist_name}%",
+ "provider_instance": f"%{self.instance_id}%",
+ }
+ db_artists = await self.mass.music.artists.get_db_items_by_query(query, params)
+ if db_artists:
+ return ItemMapping.from_item(db_artists[0])
+
+ artist_id = FAKE_ARTIST_PREFIX + artist_name
+ artist = Artist(item_id=artist_id, name=artist_name, provider=self.domain)
+ artist.add_provider_mapping(
+ ProviderMapping(
+ item_id=str(artist_id),
+ provider_domain=self.domain,
+ provider_instance=self.instance_id,
+ )
+ )
+ return artist
+
async def _parse(self, plex_media) -> MediaItem | None:
if plex_media.type == "artist":
return await self._parse_artist(plex_media)
"""Parse a Plex Track response to a Track model object."""
track = Track(item_id=plex_track.key, provider=self.instance_id, name=plex_track.title)
- if plex_track.grandparentKey:
+ if plex_track.originalTitle and plex_track.originalTitle != plex_track.grandparentTitle:
+ # The artist of the track if different from the album's artist.
+ # For this kind of artist, we just know the name, so we create a fake artist,
+ # if it does not already exist.
+ track.artists.append(await self._get_or_create_artist_by_name(plex_track.originalTitle))
+ elif plex_track.grandparentKey:
track.artists.append(
self._get_item_mapping(
MediaType.ARTIST, plex_track.grandparentKey, plex_track.grandparentTitle
)
)
+ else:
+ raise InvalidDataError("No artist was found for track")
+
if thumb := plex_track.firstAttr("thumb", "parentThumb", "grandparentThumb"):
track.metadata.images = [MediaItemImage(ImageType.THUMB, thumb, self.instance_id)]
if plex_track.parentKey:
async def get_artist(self, prov_artist_id) -> Artist:
"""Get full artist details by id."""
+ if prov_artist_id.startswith(FAKE_ARTIST_PREFIX):
+ # This artist does not exist in plex, so we can just load it from DB.
+
+ if db_artist := await self.mass.music.artists.get_db_item_by_prov_id(
+ prov_artist_id, self.instance_id
+ ):
+ return db_artist
+ raise MediaNotFoundError(f"Artist not found: {prov_artist_id}")
+
if plex_artist := await self._get_data(prov_artist_id, PlexArtist):
return await self._parse_artist(plex_artist)
raise MediaNotFoundError(f"Item {prov_artist_id} not found")
async def get_artist_albums(self, prov_artist_id) -> list[Album]:
"""Get a list of albums for the given artist."""
- plex_artist = await self._get_data(prov_artist_id, PlexArtist)
- plex_albums = await self._run_async(plex_artist.albums)
- if plex_albums:
- albums = []
- for album_obj in plex_albums:
- albums.append(await self._parse_album(album_obj))
- return albums
+ if not prov_artist_id.startswith(FAKE_ARTIST_PREFIX):
+ plex_artist = await self._get_data(prov_artist_id, PlexArtist)
+ plex_albums = await self._run_async(plex_artist.albums)
+ if plex_albums:
+ albums = []
+ for album_obj in plex_albums:
+ albums.append(await self._parse_album(album_obj))
+ return albums
return []
async def get_stream_details(self, item_id: str) -> StreamDetails | None: