return await loop.run_in_executor(None, _get_album)
-async def get_playlist(prov_playlist_id: str) -> Dict[str, str]:
+async def get_playlist(
+ prov_playlist_id: str, headers: Dict[str, str]
+) -> Dict[str, str]:
"""Async wrapper around the ytmusicapi get_playlist function."""
def _get_playlist():
- ytm = ytmusicapi.YTMusic()
+ ytm = ytmusicapi.YTMusic(auth=json.dumps(headers))
return ytm.get_playlist(playlistId=prov_playlist_id)
loop = asyncio.get_running_loop()
"thumbnail"
]["thumbnails"]
track["isAvailable"] = track_obj["playabilityStatus"]["status"] == "OK"
+ return track
loop = asyncio.get_running_loop()
return await loop.run_in_executor(None, _get_song)
try:
yield await self._parse_track(track)
except InvalidDataError:
- yield await self.get_track(track["videoId"])
+ track = await self.get_track(track["videoId"])
+ yield track
async def get_album(self, prov_album_id) -> Album:
"""Get full album details by id."""
async def get_track(self, prov_track_id) -> Track:
"""Get full track details by id."""
track_obj = await get_track(prov_track_id=prov_track_id)
- return await self._parse_track(track_obj) if track_obj else None
+ return await self._parse_track(track_obj)
async def get_playlist(self, prov_playlist_id) -> Playlist:
"""Get full playlist details by id."""
- playlist_obj = await get_playlist(prov_playlist_id)
+ playlist_obj = await get_playlist(
+ prov_playlist_id=prov_playlist_id, headers=self._headers
+ )
return await self._parse_playlist(playlist_obj)
async def get_playlist_tracks(self, prov_playlist_id) -> List[Track]:
"""Get all playlist tracks for given playlist id."""
- playlist_obj = await get_playlist(prov_playlist_id)
+ playlist_obj = await get_playlist(
+ prov_playlist_id=prov_playlist_id, headers=self._headers
+ )
if "tracks" in playlist_obj:
tracks = []
for track in playlist_obj["tracks"]:
if track["isAvailable"]:
- tracks.append(await self._parse_track(track))
+ # Playlist tracks sometimes do not have a valid artist id
+ # In that case, call the API for track details based on track id
+ try:
+ track = await self._parse_track(track)
+ if track:
+ tracks.append(track)
+ except InvalidDataError:
+ track = await self.get_track(track["videoId"])
+ if track:
+ tracks.append(track)
return tracks
return []