if not self.config.get_value(CONF_AUTH_TOKEN):
msg = "Invalid login credentials"
raise LoginFailed(msg)
- await self._initialize_headers()
- await self._initialize_context()
+ self._initialize_headers()
+ self._initialize_context()
self._cookies = {"CONSENT": "YES+1"}
# get default language (that is supported by YTM)
mass_locale = self.mass.metadata.locale
for result in results:
try:
if result["resultType"] == "artist" and MediaType.ARTIST in media_types:
- parsed_results.artists.append(await self._parse_artist(result))
+ parsed_results.artists.append(self._parse_artist(result))
elif result["resultType"] == "album" and MediaType.ALBUM in media_types:
- parsed_results.albums.append(await self._parse_album(result))
+ parsed_results.albums.append(self._parse_album(result))
elif result["resultType"] == "playlist" and MediaType.PLAYLIST in media_types:
- parsed_results.playlists.append(await self._parse_playlist(result))
+ parsed_results.playlists.append(self._parse_playlist(result))
elif (
result["resultType"] in ("song", "video")
and MediaType.TRACK in media_types
- and (track := await self._parse_track(result))
+ and (track := self._parse_track(result))
):
parsed_results.tracks.append(track)
except InvalidDataError:
await self._check_oauth_token()
artists_obj = await get_library_artists(headers=self._headers, language=self.language)
for artist in artists_obj:
- yield await self._parse_artist(artist)
+ yield self._parse_artist(artist)
async def get_library_albums(self) -> AsyncGenerator[Album, None]:
"""Retrieve all library albums from Youtube Music."""
await self._check_oauth_token()
albums_obj = await get_library_albums(headers=self._headers, language=self.language)
for album in albums_obj:
- yield await self._parse_album(album, album["browseId"])
+ yield self._parse_album(album, album["browseId"])
async def get_library_playlists(self) -> AsyncGenerator[Playlist, None]:
"""Retrieve all library playlists from the provider."""
await self._check_oauth_token()
playlists_obj = await get_library_playlists(headers=self._headers, language=self.language)
for playlist in playlists_obj:
- yield await self._parse_playlist(playlist)
+ yield self._parse_playlist(playlist)
async def get_library_tracks(self) -> AsyncGenerator[Track, None]:
"""Retrieve library tracks from Youtube Music."""
# Library tracks sometimes do not have a valid artist id
# In that case, call the API for track details based on track id
try:
- yield await self._parse_track(track)
+ yield self._parse_track(track)
except InvalidDataError:
track = await self.get_track(track["videoId"])
yield track
"""Get full album details by id."""
await self._check_oauth_token()
if album_obj := await get_album(prov_album_id=prov_album_id, language=self.language):
- return await self._parse_album(album_obj=album_obj, album_id=prov_album_id)
+ return self._parse_album(album_obj=album_obj, album_id=prov_album_id)
msg = f"Item {prov_album_id} not found"
raise MediaNotFoundError(msg)
tracks = []
for idx, track_obj in enumerate(album_obj["tracks"], 1):
try:
- track = await self._parse_track(track_obj=track_obj)
+ track = self._parse_track(track_obj=track_obj)
track.disc_number = 0
track.track_number = track_obj.get("trackNumber", idx)
except InvalidDataError:
if artist_obj := await get_artist(
prov_artist_id=prov_artist_id, headers=self._headers, language=self.language
):
- return await self._parse_artist(artist_obj=artist_obj)
+ return self._parse_artist(artist_obj=artist_obj)
msg = f"Item {prov_artist_id} not found"
raise MediaNotFoundError(msg)
headers=self._headers,
language=self.language,
):
- return await self._parse_track(track_obj)
+ return self._parse_track(track_obj)
msg = f"Item {prov_track_id} not found"
raise MediaNotFoundError(msg)
if playlist_obj := await get_playlist(
prov_playlist_id=prov_playlist_id, headers=self._headers, language=self.language
):
- return await self._parse_playlist(playlist_obj)
+ return self._parse_playlist(playlist_obj)
msg = f"Item {prov_playlist_id} not found"
raise MediaNotFoundError(msg)
# Playlist tracks sometimes do not have a valid artist id
# In that case, call the API for track details based on track id
try:
- if track := await self._parse_track(track_obj):
+ if track := self._parse_track(track_obj):
track.position = index + 1
result.append(track)
except InvalidDataError:
album_obj["artists"] = [
{"id": artist_obj["channelId"], "name": artist_obj["name"]}
]
- albums.append(await self._parse_album(album_obj, album_obj["browseId"]))
+ albums.append(self._parse_album(album_obj, album_obj["browseId"]))
return albums
return []
# Playlist tracks sometimes do not have a valid artist id
# In that case, call the API for track details based on track id
try:
- track = await self._parse_track(track)
+ track = self._parse_track(track)
if track:
tracks.append(track)
except InvalidDataError:
self.config.update({CONF_AUTH_TOKEN: token["access_token"]})
self.config.update({CONF_EXPIRY_TIME: time() + token["expires_in"]})
self.config.update({CONF_TOKEN_TYPE: token["token_type"]})
- await self._initialize_headers()
+ self._initialize_headers()
await self._update_ytdlp_oauth_token_cache()
- async def _initialize_headers(self) -> dict[str, str]:
+ def _initialize_headers(self) -> dict[str, str]:
"""Return headers to include in the requests."""
auth = f"{self.config.get_value(CONF_TOKEN_TYPE)} {self.config.get_value(CONF_AUTH_TOKEN)}"
headers = {
}
self._headers = headers
- async def _initialize_context(self) -> dict[str, str]:
+ def _initialize_context(self) -> dict[str, str]:
"""Return a dict to use as a context in requests."""
self._context = {
"context": {
}
}
- async def _parse_album(self, album_obj: dict, album_id: str | None = None) -> Album:
+ def _parse_album(self, album_obj: dict, album_id: str | None = None) -> Album:
"""Parse a YT Album response to an Album model object."""
album_id = album_id or album_obj.get("id") or album_obj.get("browseId")
if "title" in album_obj:
if album_obj.get("year") and album_obj["year"].isdigit():
album.year = album_obj["year"]
if "thumbnails" in album_obj:
- album.metadata.images = await self._parse_thumbnails(album_obj["thumbnails"])
+ album.metadata.images = self._parse_thumbnails(album_obj["thumbnails"])
if "description" in album_obj:
album.metadata.description = unquote(album_obj["description"])
if "isExplicit" in album_obj:
album.album_type = album_type
return album
- async def _parse_artist(self, artist_obj: dict) -> Artist:
+ def _parse_artist(self, artist_obj: dict) -> Artist:
"""Parse a YT Artist response to Artist model object."""
artist_id = None
if "channelId" in artist_obj:
if "description" in artist_obj:
artist.metadata.description = artist_obj["description"]
if artist_obj.get("thumbnails"):
- artist.metadata.images = await self._parse_thumbnails(artist_obj["thumbnails"])
+ artist.metadata.images = self._parse_thumbnails(artist_obj["thumbnails"])
return artist
- async def _parse_playlist(self, playlist_obj: dict) -> Playlist:
+ def _parse_playlist(self, playlist_obj: dict) -> Playlist:
"""Parse a YT Playlist response to a Playlist object."""
playlist_id = playlist_obj["id"]
playlist_name = playlist_obj["title"]
if "description" in playlist_obj:
playlist.metadata.description = playlist_obj["description"]
if playlist_obj.get("thumbnails"):
- playlist.metadata.images = await self._parse_thumbnails(playlist_obj["thumbnails"])
+ playlist.metadata.images = self._parse_thumbnails(playlist_obj["thumbnails"])
is_editable = False
if playlist_obj.get("privacy") and playlist_obj.get("privacy") == "PRIVATE":
is_editable = True
playlist.metadata.cache_checksum = playlist_obj.get("checksum")
return playlist
- async def _parse_track(self, track_obj: dict) -> Track:
+ def _parse_track(self, track_obj: dict) -> Track:
"""Parse a YT Track response to a Track model object."""
if not track_obj.get("videoId"):
msg = "Track is missing videoId"
msg = "Track is missing artists"
raise InvalidDataError(msg)
if track_obj.get("thumbnails"):
- track.metadata.images = await self._parse_thumbnails(track_obj["thumbnails"])
+ track.metadata.images = self._parse_thumbnails(track_obj["thumbnails"])
if (
track_obj.get("album")
and isinstance(track_obj.get("album"), dict)
# Only premium users can stream the HQ stream of this song
return stream_format["format_id"] == "141"
- async def _parse_thumbnails(self, thumbnails_obj: dict) -> list[MediaItemImage]:
+ def _parse_thumbnails(self, thumbnails_obj: dict) -> list[MediaItemImage]:
"""Parse and YTM thumbnails to MediaItemImage."""
result: list[MediaItemImage] = []
processed_images = set()