async def get_library_tracks(self) -> AsyncGenerator[Track, None]:
"""Retrieve all library tracks from Deezer."""
for track in await self.client.get_user_tracks():
- yield self.parse_track(track=track, user_country=self.gw_client.user_country)
+ yield await self.parse_track(track=track, user_country=self.gw_client.user_country)
async def get_artist(self, prov_artist_id: str) -> Artist:
"""Get full artist details by id."""
async def get_track(self, prov_track_id: str) -> Track:
"""Get full track details by id."""
- return self.parse_track(
+ return await self.parse_track(
track=await self.client.get_track(track_id=int(prov_track_id)),
user_country=self.gw_client.user_country,
)
"""Get all tracks in a album."""
album = await self.client.get_album(album_id=int(prov_album_id))
result = []
- for count, deezer_track in enumerate(album.tracks, start=1):
+ for count, deezer_track in enumerate(await album.get_tracks(), start=1):
result.append(
- self.parse_track(
+ await self.parse_track(
track=deezer_track,
user_country=self.gw_client.user_country,
extra_init_kwargs={"disc_number": 0, "track_number": count},
"""Get all tracks in a playlist."""
playlist = await self.client.get_playlist(playlist_id=prov_playlist_id)
for count, deezer_track in enumerate(await playlist.get_tracks(), start=1):
- track = self.parse_track(track=deezer_track, user_country=self.gw_client.user_country)
+ track = await self.parse_track(
+ track=deezer_track,
+ user_country=self.gw_client.user_country,
+ extra_init_kwargs={"position": count},
+ )
track.position = count
yield track
artist = await self.client.get_artist(artist_id=int(prov_artist_id))
top_tracks = await artist.get_top()
return [
- self.parse_track(track=track, user_country=self.gw_client.user_country)
- for track in top_tracks
+ await self.parse_track(track=track, user_country=self.gw_client.user_country)
+ for track in islice(top_tracks, 0, 25)
]
async def library_add(self, prov_item_id: str, media_type: MediaType) -> bool:
name="Recommendations",
label="recommendations",
items=[
- self.parse_track(track=track, user_country=self.gw_client.user_country)
+ await self.parse_track(track=track, user_country=self.gw_client.user_country)
for track in await self.client.get_recommended_tracks()
],
)
async def add_playlist_tracks(self, prov_playlist_id: str, prov_track_ids: list[str]):
"""Add tra ck(s) to playlist."""
- await self.client.add_playlist_tracks(
- playlist_id=prov_playlist_id, tracks=[int(i) for i in prov_track_ids]
- )
+ playlist = await self.client.get_playlist(prov_playlist_id)
+ await playlist.add_tracks(tracks=[int(i) for i in prov_track_ids])
async def remove_playlist_tracks(
self, prov_playlist_id: str, positions_to_remove: tuple[int, ...]
playlist_track_ids.append(track.id)
if len(playlist_track_ids) == len(positions_to_remove):
break
- await self.client.remove_playlist_tracks(
- playlist_id=prov_playlist_id, tracks=list(playlist_track_ids)
- )
+ playlist = await self.client.get_playlist(prov_playlist_id)
+ await playlist.delete_tracks(playlist_track_ids)
async def create_playlist(self, name: str) -> Playlist:
"""Create a new playlist on provider with given name."""
### PARSING METADATA FUNCTIONS ###
- def parse_metadata_track(self, track: deezer.Track) -> MediaItemMetadata:
+ async def parse_metadata_track(self, track: deezer.Track) -> MediaItemMetadata:
"""Parse the track metadata."""
try:
return MediaItemMetadata(
images=[
MediaItemImage(
type=ImageType.THUMB,
- path=track.album.cover_big,
+ path=(await track.get_album()).cover_big,
)
],
)
- except AttributeError:
+ except (deezer.exceptions.DeezerErrorResponse, AttributeError):
return MediaItemMetadata()
def parse_metadata_album(self, album: deezer.Album) -> MediaItemMetadata:
except AttributeError:
return playlist.user
- def parse_track(
+ async def parse_track(
self,
track: deezer.Track,
user_country: str,
extra_init_kwargs: dict[str, Any] | None = None,
- ) -> Track | PlaylistTrack:
+ ) -> Track | PlaylistTrack | AlbumTrack:
"""Parse the deezer-python track to a MASS track."""
+ try:
+ deezer_artist = await track.get_artist()
+ artist = ItemMapping(
+ MediaType.ARTIST,
+ str(deezer_artist.id),
+ self.instance_id,
+ deezer_artist.name,
+ )
+ except deezer.exceptions.DeezerErrorResponse:
+ artist = ItemMapping(
+ MediaType.ARTIST,
+ self.instance_id,
+ )
+ try:
+ deezer_album = await track.get_album()
+ album = ItemMapping(
+ MediaType.ALBUM,
+ str(deezer_album.id),
+ self.instance_id,
+ deezer_album.title,
+ )
+ except deezer.exceptions.DeezerErrorResponse:
+ album = None
if extra_init_kwargs is None:
extra_init_kwargs = {}
track_class = Track
media_type=MediaType.TRACK,
sort_name=self.get_short_title(track, track_class),
duration=track.duration,
- artists=[
- ItemMapping(
- MediaType.ARTIST,
- str(track.artist.id),
- self.instance_id,
- track.artist.name,
- )
- ],
- album=ItemMapping(
- MediaType.ALBUM,
- str(track.album.id),
- self.instance_id,
- track.album.title,
- ),
+ artists=[artist],
+ album=album,
provider_mappings={
ProviderMapping(
item_id=str(track.id),
available=self.track_available(track, user_country, track_class),
)
},
- metadata=self.parse_metadata_track(track=track),
+ metadata=await self.parse_metadata_track(track=track),
**extra_init_kwargs,
)
self, query: str, user_country: str, limit: int = 5
) -> list[Track]:
"""Search for tracks and parse them."""
- deezer_tracks = await self.client.search(query=query)
- return [self.parse_track(track, user_country) for track in islice(deezer_tracks, 0, limit)]
+ deezer_tracks = await self.client.search(query=query, limit=1)
+ return [
+ await self.parse_track(track, user_country) for track in islice(deezer_tracks, 0, limit)
+ ]
async def search_and_parse_artists(self, query: str, limit: int = 5) -> list[Artist]:
"""Search for artists and parse them."""
- deezer_artist = await self.client.search_artists(query=query)
+ deezer_artist = await self.client.search_artists(query=query, limit=1)
return [self.parse_artist(artist=artist) for artist in islice(deezer_artist, 0, limit)]
async def search_and_parse_albums(self, query: str, limit: int = 5) -> list[Album]:
"""Search for album and parse them."""
- deezer_albums = await self.client.search_albums(query=query)
+ deezer_albums = await self.client.search_albums(query=query, limit=1)
return [self.parse_album(album=album) for album in islice(deezer_albums, 0, limit)]
async def search_and_parse_playlists(self, query: str, limit: int = 5) -> list[Playlist]:
"""Search for playlists and parse them."""
- deezer_playlists = await self.client.search_playlists(query=query)
+ deezer_playlists = await self.client.search_playlists(query=query, limit=1)
return [
self.parse_playlist(playlist=playlist)
for playlist in islice(deezer_playlists, 0, limit)