CONF_ACCESS_TOKEN = "access_token"
+CONF_ARL_TOKEN = "arl_token"
CONF_ACTION_AUTH = "auth"
DEEZER_AUTH_URL = "https://connect.deezer.com/oauth/auth.php"
RELAY_URL = "https://deezer.oauth.jonathanbangert.com/"
DEEZER_APP_SECRET = app_var(7)
-async def update_access_token(
+async def get_access_token(
app_id: str, app_secret: str, code: str, http_session: ClientSession
) -> str:
"""Update the access_token."""
values: dict[str, ConfigValueType] | None = None,
) -> tuple[ConfigEntry, ...]:
"""Return Config entries to setup this provider."""
- # If the action is to launch oauth flow
+ # Action is to launch oauth flow
if action == CONF_ACTION_AUTH:
- # We use the AuthenticationHelper to authenticate
+ # Use the AuthenticationHelper to authenticate
async with AuthenticationHelper(mass, values["session_id"]) as auth_helper: # type: ignore
- callback_url = auth_helper.callback_url
url = f"{DEEZER_AUTH_URL}?app_id={DEEZER_APP_ID}&redirect_uri={RELAY_URL}\
-&perms={DEEZER_PERMS}&state={callback_url}"
+&perms={DEEZER_PERMS}&state={auth_helper.callback_url}"
code = (await auth_helper.authenticate(url))["code"]
- values[CONF_ACCESS_TOKEN] = await update_access_token( # type: ignore
+ values[CONF_ACCESS_TOKEN] = await get_access_token( # type: ignore
DEEZER_APP_ID, DEEZER_APP_SECRET, code, mass.http_session
)
action_label="Authenticate with Deezer",
value=values.get(CONF_ACCESS_TOKEN) if values else None,
),
+ ConfigEntry(
+ key=CONF_ARL_TOKEN,
+ type=ConfigEntryType.SECURE_STRING,
+ label="Arl token",
+ required=True,
+ description="See https://www.dumpmedia.com/deezplus/deezer-arl.html",
+ value=values.get(CONF_ARL_TOKEN) if values else None,
+ ),
)
client: deezer.Client
gw_client: GWClient
- creds: DeezerCredentials
+ credentials: DeezerCredentials
user: deezer.User
async def handle_setup(self) -> None:
"""Set up the Deezer provider."""
- self.creds = DeezerCredentials(
+ self.credentials = DeezerCredentials(
app_id=DEEZER_APP_ID,
app_secret=DEEZER_APP_SECRET,
access_token=self.config.get_value(CONF_ACCESS_TOKEN), # type: ignore
)
self.client = deezer.Client(
- app_id=self.creds.app_id,
- app_secret=self.creds.app_secret,
- access_token=self.creds.access_token,
+ app_id=self.credentials.app_id,
+ app_secret=self.credentials.app_secret,
+ access_token=self.credentials.access_token,
)
self.user = await self.client.get_user()
- self.gw_client = GWClient(self.mass.http_session, self.config.get_value(CONF_ACCESS_TOKEN))
+ self.gw_client = GWClient(
+ self.mass.http_session,
+ self.config.get_value(CONF_ACCESS_TOKEN),
+ self.config.get_value(CONF_ARL_TOKEN),
+ )
await self.gw_client.setup()
@property
:param search_query: Search query.
:param media_types: A list of media_types to include. All types if None.
"""
+ # If no media_types are provided, search for all types
if not media_types:
media_types = [
MediaType.ARTIST,
MediaType.PLAYLIST,
]
+ # Create a task for each media_type
tasks = {}
async with TaskGroup() as taskgroup:
async def get_library_artists(self) -> AsyncGenerator[Artist, None]:
"""Retrieve all library artists from Deezer."""
- for artist in await self.client.get_user_artists():
+ async for artist in await self.client.get_user_artists():
yield self.parse_artist(artist=artist)
async def get_library_albums(self) -> AsyncGenerator[Album, None]:
"""Retrieve all library albums from Deezer."""
- for album in await self.client.get_user_albums():
+ async for album in await self.client.get_user_albums():
yield self.parse_album(album=album)
async def get_library_playlists(self) -> AsyncGenerator[Playlist, None]:
"""Retrieve all library playlists from Deezer."""
- for playlist in await self.user.get_playlists():
+ async for playlist in await self.user.get_playlists():
yield self.parse_playlist(playlist=playlist)
async def get_library_tracks(self) -> AsyncGenerator[Track, None]:
"""Retrieve all library tracks from Deezer."""
- for track in await self.client.get_user_tracks():
+ async for track in await self.client.get_user_tracks():
yield self.parse_track(track=track, user_country=self.gw_client.user_country)
async def get_artist(self, prov_artist_id: str) -> Artist:
"""Get full artist details by id."""
- return self.parse_artist(artist=await self.client.get_artist(artist_id=int(prov_artist_id)))
+ try:
+ return self.parse_artist(
+ artist=await self.client.get_artist(artist_id=int(prov_artist_id))
+ )
+ except deezer.exceptions.DeezerErrorResponse as error:
+ self.logger.warning("Failed getting artist: %s", error)
async def get_album(self, prov_album_id: str) -> Album:
"""Get full album details by id."""
async def get_playlist(self, prov_playlist_id: str) -> Playlist:
"""Get full playlist details by id."""
- return self.parse_playlist(
- playlist=await self.client.get_playlist(playlist_id=int(prov_playlist_id)),
- )
+ try:
+ return self.parse_playlist(
+ playlist=await self.client.get_playlist(playlist_id=int(prov_playlist_id)),
+ )
+ except deezer.exceptions.DeezerErrorResponse as error:
+ self.logger.warning("Failed getting playlist: %s", error)
async def get_track(self, prov_track_id: str) -> Track:
"""Get full track details by id."""
- return self.parse_track(
- track=await self.client.get_track(track_id=int(prov_track_id)),
- user_country=self.gw_client.user_country,
- )
+ try:
+ return self.parse_track(
+ track=await self.client.get_track(track_id=int(prov_track_id)),
+ user_country=self.gw_client.user_country,
+ )
+ except deezer.exceptions.DeezerErrorResponse as error:
+ self.logger.warning("Failed getting track: %s", error)
async def get_album_tracks(self, prov_album_id: str) -> list[AlbumTrack]:
- """Get all tracks in a album."""
+ """Get all tracks in an album."""
album = await self.client.get_album(album_id=int(prov_album_id))
- result = []
- for count, deezer_track in enumerate(await album.get_tracks(), start=1):
- result.append(
- self.parse_track(
- track=deezer_track,
- user_country=self.gw_client.user_country,
- extra_init_kwargs={"disc_number": 0, "track_number": count},
- )
+ return [
+ self.parse_track(
+ track=deezer_track,
+ user_country=self.gw_client.user_country,
+ extra_init_kwargs={"disc_number": 0, "track_number": count + 1},
)
- return result
+ for count, deezer_track in enumerate(await album.get_tracks())
+ ]
async def get_playlist_tracks(
self, prov_playlist_id: str
async def get_artist_albums(self, prov_artist_id: str) -> list[Album]:
"""Get albums by an artist."""
artist = await self.client.get_artist(artist_id=int(prov_artist_id))
- albums = []
- for album in await artist.get_albums():
- albums.append(self.parse_album(album=album))
- return albums
+ return [self.parse_album(album=album) async for album in await artist.get_albums()]
async def get_artist_toptracks(self, prov_artist_id: str) -> list[Track]:
"""Get top 50 tracks of an artist."""
artist = await self.client.get_artist(artist_id=int(prov_artist_id))
- top_tracks = await artist.get_top(limit=50)
return [
self.parse_track(track=track, user_country=self.gw_client.user_country)
- async for track in top_tracks
+ async for track in await artist.get_top(limit=50)
]
async def library_add(self, prov_item_id: str, media_type: MediaType) -> bool:
]
async def add_playlist_tracks(self, prov_playlist_id: str, prov_track_ids: list[str]) -> None:
- """Add tra ck(s) to playlist."""
+ """Add track(s) to playlist."""
playlist = await self.client.get_playlist(int(prov_playlist_id))
await playlist.add_tracks(tracks=[int(i) for i in prov_track_ids])
async def remove_playlist_tracks(
self, prov_playlist_id: str, positions_to_remove: tuple[int, ...]
) -> None:
- """Remove track(s) to playlist."""
+ """Remove track(s) from playlist."""
playlist_track_ids = []
async for track in self.get_playlist_tracks(prov_playlist_id):
if track.position in positions_to_remove:
### PARSING FUNCTIONS ###
def parse_artist(self, artist: deezer.Artist) -> Artist:
- """Parse the deezer-python artist to a MASS artist."""
+ """Parse the deezer-python artist to a Music Assistant artist."""
return Artist(
item_id=str(artist.id),
provider=self.domain,
)
def parse_album(self, album: deezer.Album) -> Album:
- """Parse the deezer-python album to a MASS album."""
+ """Parse the deezer-python album to a Music Assistant album."""
return Album(
album_type=AlbumType(album.type),
item_id=str(album.id),
)
def parse_playlist(self, playlist: deezer.Playlist) -> Playlist:
- """Parse the deezer-python playlist to a MASS playlist."""
+ """Parse the deezer-python playlist to a Music Assistant playlist."""
creator = self.get_playlist_creator(playlist)
return Playlist(
item_id=str(playlist.id),
)
def get_playlist_creator(self, playlist: deezer.Playlist):
- """See https://twitter.com/Un10cked/status/1682709413889540097."""
+ """On playlists, the creator is called creator, elsewhere it's called user."""
if hasattr(playlist, "creator"):
return playlist.creator
return playlist.user
user_country: str,
extra_init_kwargs: dict[str, Any] | None = None,
) -> Track | PlaylistTrack | AlbumTrack:
- """Parse the deezer-python track to a MASS track."""
+ """Parse the deezer-python track to a Music Assistant track."""
if hasattr(track, "artist"):
artist = ItemMapping(
media_type=MediaType.ARTIST,
"""Search for tracks and parse them."""
deezer_tracks = await self.client.search(query=query, limit=limit)
tracks = []
- index = 0
- async for track in deezer_tracks:
+ async for index, track in enumerate(deezer_tracks):
tracks.append(self.parse_track(track, user_country))
- index += 1
- if index >= limit:
+ if index == limit:
return tracks
return tracks
"""Search for artists and parse them."""
deezer_artist = await self.client.search_artists(query=query, limit=limit)
artists = []
- index = 0
- async for artist in deezer_artist:
+ async for index, artist in enumerate(deezer_artist):
artists.append(self.parse_artist(artist))
- index += 1
- if index >= limit:
+ if index == limit:
return artists
return artists
"""Search for album and parse them."""
deezer_albums = await self.client.search_albums(query=query, limit=limit)
albums = []
- index = 0
- async for album in deezer_albums:
+ async for index, album in enumerate(deezer_albums):
albums.append(self.parse_album(album))
- index += 1
- if index >= limit:
+ if index == limit:
return albums
return albums
"""Search for playlists and parse them."""
deezer_playlists = await self.client.search_playlists(query=query, limit=limit)
playlists = []
- index = 0
- async for playlist in deezer_playlists:
+ async for index, playlist in enumerate(deezer_playlists):
playlists.append(self.parse_playlist(playlist))
- index += 1
- if index >= limit:
+ if index == limit:
return playlists
return playlists