from asyncio import TaskGroup
from collections.abc import AsyncGenerator
from dataclasses import dataclass
+from itertools import islice
from math import ceil
from typing import Any
async def get_library_artists(self) -> AsyncGenerator[Artist, None]:
"""Retrieve all library artists from Deezer."""
- async for artist in await self.client.get_user_artists():
+ for artist in await self.client.get_user_artists():
yield self.parse_artist(artist=artist)
async def get_library_albums(self) -> AsyncGenerator[Album, None]:
"""Retrieve all library albums from Deezer."""
- async for album in await self.client.get_user_albums():
+ for album in await self.client.get_user_albums():
yield self.parse_album(album=album)
async def get_library_playlists(self) -> AsyncGenerator[Playlist, None]:
"""Retrieve all library playlists from Deezer."""
- async for playlist in await self.user.get_playlists():
+ for playlist in await self.user.get_playlists():
yield self.parse_playlist(playlist=playlist)
async def get_library_tracks(self) -> AsyncGenerator[Track, None]:
"""Retrieve all library tracks from Deezer."""
- async for track in await self.client.get_user_tracks():
+ for track in await self.client.get_user_tracks():
yield self.parse_track(track=track, user_country=self.gw_client.user_country)
async def get_artist(self, prov_artist_id: str) -> Artist:
return self.parse_album(album=await self.client.get_album(album_id=int(prov_album_id)))
except deezer.exceptions.DeezerErrorResponse as error:
self.logger.warning("Failed getting album: %s", error)
- return Album(itemid=prov_album_id, provider=self.instance_id, name="Not Found")
async def get_playlist(self, prov_playlist_id: str) -> Playlist:
"""Get full playlist details by id."""
) -> AsyncGenerator[PlaylistTrack, None]:
"""Get all tracks in a playlist."""
playlist = await self.client.get_playlist(playlist_id=prov_playlist_id)
- for count, deezer_track in enumerate(playlist.tracks, start=1):
+ for count, deezer_track in enumerate(await playlist.get_tracks(), start=1):
track = self.parse_track(track=deezer_track, user_country=self.gw_client.user_country)
track.position = count
- track.id = track.id
yield track
async def get_artist_albums(self, prov_artist_id: str) -> list[Album]:
"""Get albums by an artist."""
artist = await self.client.get_artist(artist_id=int(prov_artist_id))
albums = []
- for album in await self.client.get_albums_by_artist(artist=artist):
- albums.append(await self.parse_album(album=album))
+ for album in await artist.get_albums():
+ albums.append(self.parse_album(album=album))
return albums
async def get_artist_toptracks(self, prov_artist_id: str) -> list[Track]:
"""Get top 25 tracks of an artist."""
artist = await self.client.get_artist(artist_id=int(prov_artist_id))
- top_tracks = await self.client.get_artist_top(artist=artist, limit=25)
+ top_tracks = await artist.get_top()
return [
self.parse_track(track=track, user_country=self.gw_client.user_country)
for track in top_tracks
"""Parse the track metadata."""
try:
return MediaItemMetadata(
- preview=track.preview,
images=[
MediaItemImage(
type=ImageType.THUMB,
],
)
except AttributeError:
- return MediaItemMetadata(
- preview=track.preview,
- )
+ return MediaItemMetadata()
def parse_metadata_album(self, album: deezer.Album) -> MediaItemMetadata:
"""Parse the album metadata."""
metadata=MediaItemMetadata(
images=[MediaItemImage(type=ImageType.THUMB, path=playlist.picture_big)],
),
- is_editable=playlist.creator.id == self.user.id,
+ is_editable=self.get_playlist_creator(playlist).id == self.user.id,
)
+ def get_playlist_creator(self, playlist: deezer.Playlist):
+ """See https://twitter.com/Un10cked/status/1682709413889540097."""
+ try:
+ return playlist.creator
+ except AttributeError:
+ return playlist.user
+
def parse_track(
self,
track: deezer.Track,
extra_init_kwargs: dict[str, Any] | None = None,
) -> Track | PlaylistTrack:
"""Parse the deezer-python track to a MASS track."""
- if "position" in extra_init_kwargs:
+ if extra_init_kwargs is None:
+ extra_init_kwargs = {}
+ track_class = Track
+ elif "position" in extra_init_kwargs:
track_class = PlaylistTrack
elif "disc_number" in extra_init_kwargs and "track_number" in extra_init_kwargs:
track_class = AlbumTrack
provider=self.domain,
name=track.title,
media_type=MediaType.TRACK,
- sort_name=track.title_short,
- position=track.track_position,
+ sort_name=self.get_short_title(track, track_class),
duration=track.duration,
artists=[
ItemMapping(
item_id=str(track.id),
provider_domain=self.domain,
provider_instance=self.instance_id,
- available=self.track_available(track, user_country),
+ available=self.track_available(track, user_country, track_class),
)
},
metadata=self.parse_metadata_track(track=track),
- **extra_init_kwargs or {},
+ **extra_init_kwargs,
)
+ def get_short_title(self, track: deezer.Track, track_type: Track | PlaylistTrack | AlbumTrack):
+ """Short names only available on normal tracks."""
+ if isinstance(track_type, Track):
+ return track.title_short
+ else:
+ return track.title
+
### SEARCH AND PARSE FUNCTIONS ###
async def search_and_parse_tracks(
self, query: str, user_country: str, limit: int = 5
) -> list[Track]:
"""Search for tracks and parse them."""
- deezer_tracks = (await self.client.search(query=query))[:limit]
- return [self.parse_track(track, user_country) for track in deezer_tracks]
+ deezer_tracks = await self.client.search(query=query)
+ return [self.parse_track(track, user_country) for track in islice(deezer_tracks, 0, limit)]
async def search_and_parse_artists(self, query: str, limit: int = 5) -> list[Artist]:
"""Search for artists and parse them."""
- deezer_artist = (await self.client.search_artists(query=query))[:limit]
- return [self.parse_artist(artist=artist) for artist in deezer_artist]
+ deezer_artist = await self.client.search_artists(query=query)
+ return [self.parse_artist(artist=artist) for artist in islice(deezer_artist, 0, limit)]
async def search_and_parse_albums(self, query: str, limit: int = 5) -> list[Album]:
"""Search for album and parse them."""
- deezer_albums = (await self.client.search_albums(query=query))[:limit]
- return [self.parse_album(album=album) async for album in deezer_albums]
+ deezer_albums = await self.client.search_albums(query=query)
+ return [self.parse_album(album=album) for album in islice(deezer_albums, 0, limit)]
async def search_and_parse_playlists(self, query: str, limit: int = 5) -> list[Playlist]:
"""Search for playlists and parse them."""
- deezer_playlists = (await self.client.search_playlists(query=query))[:limit]
- return [self.parse_playlist(playlist=playlist) for playlist in deezer_playlists]
+ deezer_playlists = await self.client.search_playlists(query=query)
+ return [
+ self.parse_playlist(playlist=playlist)
+ for playlist in islice(deezer_playlists, 0, limit)
+ ]
### OTHER PARSING FUNCTIONS ###
def _get_album(self, track: deezer.Track) -> Album | None:
raise NotImplementedError("Unsupported contenttype")
- def track_available(self, track: deezer.Track, user_country: str) -> bool:
+ def track_available(
+ self, track: deezer.Track, user_country: str, track_type: Track | PlaylistTrack | AlbumTrack
+ ) -> bool:
"""Check if a given track is available in the users country."""
- return user_country in track.available_countries
+ if isinstance(track_type, Track):
+ return user_country in track.available_countries
+ else:
+ return True
def _md5(self, data, data_type="ascii"):
md5sum = hashlib.md5()