provider_mapping.item_id, provider_mapping.provider_instance
)
for provider_track in provider_tracks:
+ # In some cases (looking at you YTM) the disc/track number is not obtained from
+ # library_tracks. Ensure to update the disc/track number when interacting with
+ # album tracks
+ db_track = next(
+ (
+ x
+ for x in db_items
+ if x.sort_name == provider_track.sort_name
+ and x.version == provider_track.version
+ ),
+ None,
+ )
+ if (
+ db_track
+ and db_track.track_number == 0
+ and db_track.track_number != provider_track.track_number
+ ):
+ await self._set_album_track(
+ db_id=library_album.item_id,
+ db_track_id=db_track.item_id,
+ track=provider_track,
+ )
if provider_track.item_id in unique_ids:
continue
unique_id = f"{provider_track.disc_number}.{provider_track.track_number}"
)
return ItemMapping.from_item(db_artist)
+ async def _set_album_track(self, db_id: int, db_track_id: int, track: Track) -> None:
+ """Store Album Track info."""
+ # write (or update) record in album_tracks table
+ await self.mass.music.database.insert_or_replace(
+ DB_TABLE_ALBUM_TRACKS,
+ {
+ "album_id": db_id,
+ "track_id": db_track_id,
+ "track_number": track.track_number,
+ "disc_number": track.disc_number,
+ },
+ )
+
async def match_providers(self, db_album: Album) -> None:
"""Try to find match on all (streaming) providers for the provided (database) album.
if not album_obj.get("tracks"):
return []
tracks = []
- for track_obj in album_obj["tracks"]:
+ for track_number, track_obj in enumerate(album_obj["tracks"], 1):
try:
- track = self._parse_track(track_obj=track_obj)
+ track = self._parse_track(track_obj=track_obj, track_number=track_number)
except InvalidDataError:
continue
tracks.append(track)
item_id=str(album_id),
provider_domain=self.domain,
provider_instance=self.instance_id,
- url=f"{YTM_DOMAIN}/playlist?list={album_id}",
+ url=f"{YTM_DOMAIN}/playlist?list={album_obj.get('audioPlaylistId')}",
)
},
+ favorite=album_obj.get("likeStatus", "INDIFFERENT") == "LIKE",
)
if album_obj.get("year") and album_obj["year"].isdigit():
album.year = album_obj["year"]
url=f"{YTM_DOMAIN}/channel/{artist_id}",
)
},
+ favorite=artist_obj.get("likeStatus", "INDIFFERENT") == "LIKE",
)
if "description" in artist_obj:
artist.metadata.description = artist_obj["description"]
)
},
is_editable=is_editable,
+ favorite=playlist_obj.get("likeStatus", "INDIFFERENT") == "LIKE",
)
if "description" in playlist_obj:
playlist.metadata.description = playlist_obj["description"]
playlist.owner = self.name
return playlist
- def _parse_track(self, track_obj: dict) -> Track:
+ def _parse_track(self, track_obj: dict, track_number: int = 0) -> Track:
"""Parse a YT Track response to a Track model object."""
if not track_obj.get("videoId"):
msg = "Track is missing videoId"
),
)
},
- disc_number=0, # not supported on YTM?
- track_number=track_obj.get("trackNumber", 0),
+ favorite=track_obj.get("likeStatus", "INDIFFERENT") == "LIKE",
+ # Disc info is not available in YTM
+ disc_number=0,
+ # Track number is "sometimes" available in the track object, otherwise approach
+ # by counting album tracks when fetching full album details
+ track_number=track_obj.get("trackNumber") or track_number or 0,
)
if track_obj.get("artists"):
def _get_album():
ytm = ytmusicapi.YTMusic(language=language)
+ album = ytm.get_album(browseId=prov_album_id)
+ if "audioPlaylistId" in album:
+ # Track id's from album tracks do not match with actual album tracks. E.g. a track
+ # points to the videoId of the original version, while we want the album version
+ album_playlist = ytm.get_playlist(playlistId=album["audioPlaylistId"], limit=None)
+ # Do some basic checks
+ if len(album_playlist.get("tracks", [])) != len(album.get("tracks", [])):
+ return album
+ # Move the correct track info to the album tracks
+ playlist_tracks_by_title = {t.get("title"): t for t in album_playlist.get("tracks", [])}
+ for album_track in album.get("tracks", []):
+ if playlist_track := playlist_tracks_by_title.get(album_track.get("title")):
+ album_track["videoId"] = playlist_track["videoId"]
+ album_track["isAvailable"] = playlist_track.get("isAvailable", True)
+ album_track["likeStatus"] = playlist_track.get("likeStatus", "INDIFFERENT")
+ return album
return ytm.get_album(browseId=prov_album_id)
return await asyncio.to_thread(_get_album)