DEEZER_APP_ID = app_var(6)
DEEZER_APP_SECRET = app_var(7)
+# Virtual playlist IDs for dynamic Deezer content
+FLOW_PLAYLIST_ID = "flow"
+RECOMMENDED_TRACKS_PLAYLIST_ID = "recommended_tracks"
+TOP_CHARTS_PLAYLIST_ID = "top_charts"
+RADIO_PLAYLIST_PREFIX = "radio_"
+
+# Curated Deezer radio station IDs
+CURATED_RADIO_IDS = [
+ 37151, # Hits
+ 38305, # The '80s
+ 38295, # The '70s
+ 31061, # Pop
+ 37765, # Rock classics
+ 30901, # Metal
+ 30991, # Hip Hop
+ 30771, # Indie
+ 30621, # Electronic
+ 31031, # Jazz
+ 30661, # Classical
+ 36791, # Latin Music
+ 38225, # Focus
+ 39041, # Happy Hour
+]
+
async def get_access_token(
app_id: str, app_secret: str, code: str, http_session: ClientSession
)
await self.gw_client.setup()
+ # Cached wrappers for dynamic Deezer content (ensures consistent data across calls)
+ @use_cache(3600) # Cache for 1 hour
+ async def _get_flow_tracks(self) -> list[deezer.Track]:
+ """Get cached Flow tracks."""
+ return list(await self.client.get_user_flow())
+
+ @use_cache(3600) # Cache for 1 hour
+ async def _get_recommended_tracks(self) -> list[deezer.Track]:
+ """Get cached recommended tracks."""
+ return list(await self.client.get_user_recommended_tracks())
+
+ @use_cache(3600) # Cache for 1 hour
+ async def _get_chart_tracks(self) -> list[deezer.Track]:
+ """Get cached chart tracks."""
+ chart = await self.client.get_chart()
+ return list(chart.tracks[:100]) if chart.tracks else []
+
@use_cache(3600 * 24 * 7) # Cache for 7 days
async def search(
self, search_query: str, media_types: list[MediaType], limit: int = 5
@use_cache(3600 * 24 * 30) # Cache for 30 days
async def get_playlist(self, prov_playlist_id: str) -> Playlist:
"""Get full playlist details by id."""
+ # Handle virtual playlists (Flow, Recommended tracks, Top Charts, Radios)
+ if prov_playlist_id == FLOW_PLAYLIST_ID:
+ flow_tracks = await self._get_flow_tracks()
+ flow_cover = None
+ if flow_tracks and hasattr(flow_tracks[0], "album"):
+ flow_cover = getattr(flow_tracks[0].album, "cover_medium", None)
+ return self._create_virtual_playlist(FLOW_PLAYLIST_ID, "Flow", image_url=flow_cover)
+ if prov_playlist_id == RECOMMENDED_TRACKS_PLAYLIST_ID:
+ rec_tracks = await self._get_recommended_tracks()
+ rec_cover = None
+ if rec_tracks and hasattr(rec_tracks[0], "album"):
+ rec_cover = getattr(rec_tracks[0].album, "cover_medium", None)
+ return self._create_virtual_playlist(
+ RECOMMENDED_TRACKS_PLAYLIST_ID, "Recommended tracks", image_url=rec_cover
+ )
+ if prov_playlist_id == TOP_CHARTS_PLAYLIST_ID:
+ chart_tracks = await self._get_chart_tracks()
+ chart_cover = None
+ if chart_tracks and hasattr(chart_tracks[0], "album"):
+ chart_cover = getattr(chart_tracks[0].album, "cover_medium", None)
+ return self._create_virtual_playlist(
+ TOP_CHARTS_PLAYLIST_ID, "Top Charts", image_url=chart_cover
+ )
+ if prov_playlist_id.startswith(RADIO_PLAYLIST_PREFIX):
+ radio_id = int(prov_playlist_id.replace(RADIO_PLAYLIST_PREFIX, ""))
+ try:
+ radio = await self.client.get_radio(radio_id)
+ return self._create_virtual_playlist(
+ prov_playlist_id,
+ f"Radio: {radio.title}",
+ image_url=getattr(radio, "picture_medium", None),
+ )
+ except Exception as err:
+ self.logger.warning("Failed getting radio %s: %s", radio_id, err)
+ raise MediaNotFoundError(f"Radio {prov_playlist_id} not found on Deezer") from err
try:
return self.parse_playlist(
playlist=await self.client.get_playlist(playlist_id=int(prov_playlist_id)),
for deezer_track in await album.get_tracks()
]
- @use_cache(3600 * 3) # Cache for 3 hours
async def get_playlist_tracks(self, prov_playlist_id: str, page: int = 0) -> list[Track]:
"""Get playlist tracks."""
- result: list[Track] = []
if page > 0:
# paging not supported, we always return the whole list at once
return []
- # TODO: access the underlying paging on the deezer api (if possible))
+
+ # Virtual playlists use their own cached wrappers (not double-cached)
+ if prov_playlist_id == FLOW_PLAYLIST_ID:
+ return self._parse_tracks_list(await self._get_flow_tracks())
+
+ if prov_playlist_id == RECOMMENDED_TRACKS_PLAYLIST_ID:
+ return self._parse_tracks_list(await self._get_recommended_tracks())
+
+ if prov_playlist_id == TOP_CHARTS_PLAYLIST_ID:
+ return self._parse_tracks_list(await self._get_chart_tracks())
+
+ if prov_playlist_id.startswith(RADIO_PLAYLIST_PREFIX):
+ radio_id = int(prov_playlist_id.replace(RADIO_PLAYLIST_PREFIX, ""))
+ try:
+ radio = await self.client.get_radio(radio_id)
+ return self._parse_tracks_list(list(await radio.get_tracks()))
+ except Exception as err:
+ self.logger.debug("Failed to get radio tracks %s: %s", radio_id, err)
+ return []
+
+ # Regular Deezer playlists (cached separately)
+ return await self._get_regular_playlist_tracks(prov_playlist_id)
+
+ @use_cache(3600 * 3) # Cache for 3 hours
+ async def _get_regular_playlist_tracks(self, prov_playlist_id: str) -> list[Track]:
+ """Get tracks for regular Deezer playlists (cached)."""
playlist = await self.client.get_playlist(int(prov_playlist_id))
playlist_tracks = await playlist.get_tracks()
- for index, deezer_track in enumerate(playlist_tracks, 1):
- result.append(
- self.parse_track(
- track=deezer_track,
- user_country=self.gw_client.user_country,
- position=index,
- )
+ return self._parse_tracks_list(list(playlist_tracks))
+
+ def _parse_tracks_list(self, tracks: list[deezer.Track]) -> list[Track]:
+ """Parse a list of Deezer tracks to Music Assistant tracks."""
+ return [
+ self.parse_track(
+ track=track,
+ user_country=self.gw_client.user_country,
+ position=index,
)
- return result
+ for index, track in enumerate(tracks, 1)
+ ]
@use_cache(3600 * 24 * 7) # Cache for 7 days
async def get_artist_albums(self, prov_artist_id: str) -> list[Album]:
raise NotImplementedError
return result
- @use_cache(3600) # Cache for 1 hour
+ @use_cache(3600)
async def recommendations(self) -> list[RecommendationFolder]:
- """Get deezer's recommendations."""
- return [
+ """Get Deezer's recommendations including Flow and personalized content."""
+ result: list[RecommendationFolder] = []
+
+ # Made for you - combines Flow, Recommended tracks, and recommended playlists
+ # Get covers from first track's album for each virtual playlist
+ flow_cover = None
+ flow_tracks = await self._get_flow_tracks()
+ if flow_tracks and hasattr(flow_tracks[0], "album"):
+ flow_cover = getattr(flow_tracks[0].album, "cover_medium", None)
+
+ recommended_cover = None
+ recommended_tracks = await self._get_recommended_tracks()
+ if recommended_tracks and hasattr(recommended_tracks[0], "album"):
+ recommended_cover = getattr(recommended_tracks[0].album, "cover_medium", None)
+
+ chart_tracks = await self._get_chart_tracks()
+ chart_cover = None
+ if chart_tracks and hasattr(chart_tracks[0], "album"):
+ chart_cover = getattr(chart_tracks[0].album, "cover_medium", None)
+
+ made_for_you_items: list[Playlist] = [
+ # Flow - personalized endless radio
+ self._create_virtual_playlist(FLOW_PLAYLIST_ID, "Flow", image_url=flow_cover),
+ # Recommended tracks
+ self._create_virtual_playlist(
+ RECOMMENDED_TRACKS_PLAYLIST_ID, "Recommended tracks", image_url=recommended_cover
+ ),
+ # Top Charts - global top tracks
+ self._create_virtual_playlist(
+ TOP_CHARTS_PLAYLIST_ID, "Top Charts", image_url=chart_cover
+ ),
+ ]
+ # Add recommended playlists from Deezer
+ for playlist in await self.client.get_user_recommended_playlists():
+ made_for_you_items.append(self.parse_playlist(playlist=playlist))
+
+ result.append(
RecommendationFolder(
- item_id="recommended_tracks",
+ item_id="made_for_you",
provider=self.instance_id,
- name="Recommended tracks",
- translation_key="recommended_tracks",
- items=UniqueList(
- [
- self.parse_track(track=track, user_country=self.gw_client.user_country)
- for track in await self.client.get_user_recommended_tracks()
- ]
- ),
+ name="Made for you",
+ items=UniqueList(made_for_you_items),
)
- ]
+ )
+
+ # Recommended albums
+ recommended_albums = list(await self.client.get_user_recommended_albums())
+ if recommended_albums:
+ result.append(
+ RecommendationFolder(
+ item_id="recommended_albums",
+ provider=self.instance_id,
+ name="Recommended albums",
+ items=UniqueList(
+ [self.parse_album(album=album) for album in recommended_albums]
+ ),
+ )
+ )
+
+ # Recommended artists
+ recommended_artists = list(await self.client.get_user_recommended_artists())
+ if recommended_artists:
+ result.append(
+ RecommendationFolder(
+ item_id="recommended_artists",
+ provider=self.instance_id,
+ name="Recommended artists",
+ items=UniqueList(
+ [self.parse_artist(artist=artist) for artist in recommended_artists]
+ ),
+ )
+ )
+
+ # Deezer Radios - curated selection (as virtual playlists in one folder)
+ radio_playlists: list[Playlist] = []
+ for radio_id in CURATED_RADIO_IDS:
+ try:
+ radio = await self.client.get_radio(radio_id)
+ radio_playlists.append(
+ self._create_virtual_playlist(
+ item_id=f"{RADIO_PLAYLIST_PREFIX}{radio_id}",
+ name=f"Radio: {radio.title}",
+ image_url=getattr(radio, "picture_medium", None),
+ )
+ )
+ except Exception as err:
+ self.logger.debug("Failed to load radio %s: %s", radio_id, err)
+
+ if radio_playlists:
+ result.append(
+ RecommendationFolder(
+ item_id="radios",
+ provider=self.instance_id,
+ name="Deezer Radios",
+ items=UniqueList(radio_playlists),
+ )
+ )
+
+ return result
async def add_playlist_tracks(self, prov_playlist_id: str, prov_track_ids: list[str]) -> None:
"""Add track(s) to playlist."""
return playlist.creator
return playlist.user
+ def _create_virtual_playlist(
+ self,
+ item_id: str,
+ name: str,
+ image_url: str | None = None,
+ ) -> Playlist:
+ """Create a virtual playlist for Flow, Recommended tracks, or Radios.
+
+ :param item_id: The unique identifier (e.g., "flow", "radio_37151").
+ :param name: Display name for the playlist.
+ :param image_url: Optional image URL.
+ """
+ images: UniqueList[MediaItemImage] = UniqueList()
+ if image_url:
+ images.append(
+ MediaItemImage(
+ type=ImageType.THUMB,
+ path=image_url,
+ provider=self.instance_id,
+ remotely_accessible=True,
+ )
+ )
+ return Playlist(
+ item_id=item_id,
+ provider=self.instance_id,
+ name=name,
+ media_type=MediaType.PLAYLIST,
+ provider_mappings={
+ ProviderMapping(
+ item_id=item_id,
+ provider_domain=self.domain,
+ provider_instance=self.instance_id,
+ )
+ },
+ metadata=MediaItemMetadata(images=images) if images else MediaItemMetadata(),
+ is_editable=False,
+ owner="Deezer",
+ )
+
def parse_track(self, track: deezer.Track, user_country: str, position: int = 0) -> Track:
"""Parse the deezer-python track to a Music Assistant track."""
if hasattr(track, "artist"):