feat(deezer): add Flow, Radios and Top Charts as recommendations (#3077)
authorDionysis Fortis <sfortis@gmail.com>
Tue, 10 Feb 2026 06:56:01 +0000 (08:56 +0200)
committerGitHub <noreply@github.com>
Tue, 10 Feb 2026 06:56:01 +0000 (07:56 +0100)
* feat(deezer): add Flow, Radios and Top Charts as dynamic playlists

Implements recommendations for Deezer provider:
- User's personalized Flow as a virtual playlist
- All available Deezer Radios (genres, moods, etc.)
- Top Charts by country

These appear under the Recommendations panel and use virtual
playlist objects (in_library=false) for dynamic content.

Tested on dev environment and working as expected.

* fix: add missing type annotations for mypy

* Extract virtual playlist IDs and radio IDs into module-level constants

music_assistant/providers/deezer/__init__.py

index 71c870c7c468ffbdda6985d05e30ff2ea2960501..a7a50917f7af9f7bbb1f50ba7e70bd1957bdf7d6 100644 (file)
@@ -95,6 +95,30 @@ manage_community,delete_library,listening_history"
 DEEZER_APP_ID = app_var(6)
 DEEZER_APP_SECRET = app_var(7)
 
+# Virtual playlist IDs for dynamic Deezer content
+FLOW_PLAYLIST_ID = "flow"
+RECOMMENDED_TRACKS_PLAYLIST_ID = "recommended_tracks"
+TOP_CHARTS_PLAYLIST_ID = "top_charts"
+RADIO_PLAYLIST_PREFIX = "radio_"
+
+# Curated Deezer radio station IDs
+CURATED_RADIO_IDS = [
+    37151,  # Hits
+    38305,  # The '80s
+    38295,  # The '70s
+    31061,  # Pop
+    37765,  # Rock classics
+    30901,  # Metal
+    30991,  # Hip Hop
+    30771,  # Indie
+    30621,  # Electronic
+    31031,  # Jazz
+    30661,  # Classical
+    36791,  # Latin Music
+    38225,  # Focus
+    39041,  # Happy Hour
+]
+
 
 async def get_access_token(
     app_id: str, app_secret: str, code: str, http_session: ClientSession
@@ -196,6 +220,23 @@ class DeezerProvider(MusicProvider):
         )
         await self.gw_client.setup()
 
+    # Cached wrappers for dynamic Deezer content (ensures consistent data across calls)
+    @use_cache(3600)  # Cache for 1 hour
+    async def _get_flow_tracks(self) -> list[deezer.Track]:
+        """Get cached Flow tracks."""
+        return list(await self.client.get_user_flow())
+
+    @use_cache(3600)  # Cache for 1 hour
+    async def _get_recommended_tracks(self) -> list[deezer.Track]:
+        """Get cached recommended tracks."""
+        return list(await self.client.get_user_recommended_tracks())
+
+    @use_cache(3600)  # Cache for 1 hour
+    async def _get_chart_tracks(self) -> list[deezer.Track]:
+        """Get cached chart tracks."""
+        chart = await self.client.get_chart()
+        return list(chart.tracks[:100]) if chart.tracks else []
+
     @use_cache(3600 * 24 * 7)  # Cache for 7 days
     async def search(
         self, search_query: str, media_types: list[MediaType], limit: int = 5
@@ -288,6 +329,41 @@ class DeezerProvider(MusicProvider):
     @use_cache(3600 * 24 * 30)  # Cache for 30 days
     async def get_playlist(self, prov_playlist_id: str) -> Playlist:
         """Get full playlist details by id."""
+        # Handle virtual playlists (Flow, Recommended tracks, Top Charts, Radios)
+        if prov_playlist_id == FLOW_PLAYLIST_ID:
+            flow_tracks = await self._get_flow_tracks()
+            flow_cover = None
+            if flow_tracks and hasattr(flow_tracks[0], "album"):
+                flow_cover = getattr(flow_tracks[0].album, "cover_medium", None)
+            return self._create_virtual_playlist(FLOW_PLAYLIST_ID, "Flow", image_url=flow_cover)
+        if prov_playlist_id == RECOMMENDED_TRACKS_PLAYLIST_ID:
+            rec_tracks = await self._get_recommended_tracks()
+            rec_cover = None
+            if rec_tracks and hasattr(rec_tracks[0], "album"):
+                rec_cover = getattr(rec_tracks[0].album, "cover_medium", None)
+            return self._create_virtual_playlist(
+                RECOMMENDED_TRACKS_PLAYLIST_ID, "Recommended tracks", image_url=rec_cover
+            )
+        if prov_playlist_id == TOP_CHARTS_PLAYLIST_ID:
+            chart_tracks = await self._get_chart_tracks()
+            chart_cover = None
+            if chart_tracks and hasattr(chart_tracks[0], "album"):
+                chart_cover = getattr(chart_tracks[0].album, "cover_medium", None)
+            return self._create_virtual_playlist(
+                TOP_CHARTS_PLAYLIST_ID, "Top Charts", image_url=chart_cover
+            )
+        if prov_playlist_id.startswith(RADIO_PLAYLIST_PREFIX):
+            radio_id = int(prov_playlist_id.replace(RADIO_PLAYLIST_PREFIX, ""))
+            try:
+                radio = await self.client.get_radio(radio_id)
+                return self._create_virtual_playlist(
+                    prov_playlist_id,
+                    f"Radio: {radio.title}",
+                    image_url=getattr(radio, "picture_medium", None),
+                )
+            except Exception as err:
+                self.logger.warning("Failed getting radio %s: %s", radio_id, err)
+                raise MediaNotFoundError(f"Radio {prov_playlist_id} not found on Deezer") from err
         try:
             return self.parse_playlist(
                 playlist=await self.client.get_playlist(playlist_id=int(prov_playlist_id)),
@@ -322,25 +398,51 @@ class DeezerProvider(MusicProvider):
             for deezer_track in await album.get_tracks()
         ]
 
-    @use_cache(3600 * 3)  # Cache for 3 hours
     async def get_playlist_tracks(self, prov_playlist_id: str, page: int = 0) -> list[Track]:
         """Get playlist tracks."""
-        result: list[Track] = []
         if page > 0:
             # paging not supported, we always return the whole list at once
             return []
-        # TODO: access the underlying paging on the deezer api (if possible))
+
+        # Virtual playlists use their own cached wrappers (not double-cached)
+        if prov_playlist_id == FLOW_PLAYLIST_ID:
+            return self._parse_tracks_list(await self._get_flow_tracks())
+
+        if prov_playlist_id == RECOMMENDED_TRACKS_PLAYLIST_ID:
+            return self._parse_tracks_list(await self._get_recommended_tracks())
+
+        if prov_playlist_id == TOP_CHARTS_PLAYLIST_ID:
+            return self._parse_tracks_list(await self._get_chart_tracks())
+
+        if prov_playlist_id.startswith(RADIO_PLAYLIST_PREFIX):
+            radio_id = int(prov_playlist_id.replace(RADIO_PLAYLIST_PREFIX, ""))
+            try:
+                radio = await self.client.get_radio(radio_id)
+                return self._parse_tracks_list(list(await radio.get_tracks()))
+            except Exception as err:
+                self.logger.debug("Failed to get radio tracks %s: %s", radio_id, err)
+                return []
+
+        # Regular Deezer playlists (cached separately)
+        return await self._get_regular_playlist_tracks(prov_playlist_id)
+
+    @use_cache(3600 * 3)  # Cache for 3 hours
+    async def _get_regular_playlist_tracks(self, prov_playlist_id: str) -> list[Track]:
+        """Get tracks for regular Deezer playlists (cached)."""
         playlist = await self.client.get_playlist(int(prov_playlist_id))
         playlist_tracks = await playlist.get_tracks()
-        for index, deezer_track in enumerate(playlist_tracks, 1):
-            result.append(
-                self.parse_track(
-                    track=deezer_track,
-                    user_country=self.gw_client.user_country,
-                    position=index,
-                )
+        return self._parse_tracks_list(list(playlist_tracks))
+
+    def _parse_tracks_list(self, tracks: list[deezer.Track]) -> list[Track]:
+        """Parse a list of Deezer tracks to Music Assistant tracks."""
+        return [
+            self.parse_track(
+                track=track,
+                user_country=self.gw_client.user_country,
+                position=index,
             )
-        return result
+            for index, track in enumerate(tracks, 1)
+        ]
 
     @use_cache(3600 * 24 * 7)  # Cache for 7 days
     async def get_artist_albums(self, prov_artist_id: str) -> list[Album]:
@@ -419,23 +521,107 @@ class DeezerProvider(MusicProvider):
             raise NotImplementedError
         return result
 
-    @use_cache(3600)  # Cache for 1 hour
+    @use_cache(3600)
     async def recommendations(self) -> list[RecommendationFolder]:
-        """Get deezer's recommendations."""
-        return [
+        """Get Deezer's recommendations including Flow and personalized content."""
+        result: list[RecommendationFolder] = []
+
+        # Made for you - combines Flow, Recommended tracks, and recommended playlists
+        # Get covers from first track's album for each virtual playlist
+        flow_cover = None
+        flow_tracks = await self._get_flow_tracks()
+        if flow_tracks and hasattr(flow_tracks[0], "album"):
+            flow_cover = getattr(flow_tracks[0].album, "cover_medium", None)
+
+        recommended_cover = None
+        recommended_tracks = await self._get_recommended_tracks()
+        if recommended_tracks and hasattr(recommended_tracks[0], "album"):
+            recommended_cover = getattr(recommended_tracks[0].album, "cover_medium", None)
+
+        chart_tracks = await self._get_chart_tracks()
+        chart_cover = None
+        if chart_tracks and hasattr(chart_tracks[0], "album"):
+            chart_cover = getattr(chart_tracks[0].album, "cover_medium", None)
+
+        made_for_you_items: list[Playlist] = [
+            # Flow - personalized endless radio
+            self._create_virtual_playlist(FLOW_PLAYLIST_ID, "Flow", image_url=flow_cover),
+            # Recommended tracks
+            self._create_virtual_playlist(
+                RECOMMENDED_TRACKS_PLAYLIST_ID, "Recommended tracks", image_url=recommended_cover
+            ),
+            # Top Charts - global top tracks
+            self._create_virtual_playlist(
+                TOP_CHARTS_PLAYLIST_ID, "Top Charts", image_url=chart_cover
+            ),
+        ]
+        # Add recommended playlists from Deezer
+        for playlist in await self.client.get_user_recommended_playlists():
+            made_for_you_items.append(self.parse_playlist(playlist=playlist))
+
+        result.append(
             RecommendationFolder(
-                item_id="recommended_tracks",
+                item_id="made_for_you",
                 provider=self.instance_id,
-                name="Recommended tracks",
-                translation_key="recommended_tracks",
-                items=UniqueList(
-                    [
-                        self.parse_track(track=track, user_country=self.gw_client.user_country)
-                        for track in await self.client.get_user_recommended_tracks()
-                    ]
-                ),
+                name="Made for you",
+                items=UniqueList(made_for_you_items),
             )
-        ]
+        )
+
+        # Recommended albums
+        recommended_albums = list(await self.client.get_user_recommended_albums())
+        if recommended_albums:
+            result.append(
+                RecommendationFolder(
+                    item_id="recommended_albums",
+                    provider=self.instance_id,
+                    name="Recommended albums",
+                    items=UniqueList(
+                        [self.parse_album(album=album) for album in recommended_albums]
+                    ),
+                )
+            )
+
+        # Recommended artists
+        recommended_artists = list(await self.client.get_user_recommended_artists())
+        if recommended_artists:
+            result.append(
+                RecommendationFolder(
+                    item_id="recommended_artists",
+                    provider=self.instance_id,
+                    name="Recommended artists",
+                    items=UniqueList(
+                        [self.parse_artist(artist=artist) for artist in recommended_artists]
+                    ),
+                )
+            )
+
+        # Deezer Radios - curated selection (as virtual playlists in one folder)
+        radio_playlists: list[Playlist] = []
+        for radio_id in CURATED_RADIO_IDS:
+            try:
+                radio = await self.client.get_radio(radio_id)
+                radio_playlists.append(
+                    self._create_virtual_playlist(
+                        item_id=f"{RADIO_PLAYLIST_PREFIX}{radio_id}",
+                        name=f"Radio: {radio.title}",
+                        image_url=getattr(radio, "picture_medium", None),
+                    )
+                )
+            except Exception as err:
+                self.logger.debug("Failed to load radio %s: %s", radio_id, err)
+
+        if radio_playlists:
+            result.append(
+                RecommendationFolder(
+                    item_id="radios",
+                    provider=self.instance_id,
+                    name="Deezer Radios",
+                    items=UniqueList(radio_playlists),
+                )
+            )
+
+        return result
 
     async def add_playlist_tracks(self, prov_playlist_id: str, prov_track_ids: list[str]) -> None:
         """Add track(s) to playlist."""
@@ -685,6 +871,45 @@ class DeezerProvider(MusicProvider):
             return playlist.creator
         return playlist.user
 
+    def _create_virtual_playlist(
+        self,
+        item_id: str,
+        name: str,
+        image_url: str | None = None,
+    ) -> Playlist:
+        """Create a virtual playlist for Flow, Recommended tracks, or Radios.
+
+        :param item_id: The unique identifier (e.g., "flow", "radio_37151").
+        :param name: Display name for the playlist.
+        :param image_url: Optional image URL.
+        """
+        images: UniqueList[MediaItemImage] = UniqueList()
+        if image_url:
+            images.append(
+                MediaItemImage(
+                    type=ImageType.THUMB,
+                    path=image_url,
+                    provider=self.instance_id,
+                    remotely_accessible=True,
+                )
+            )
+        return Playlist(
+            item_id=item_id,
+            provider=self.instance_id,
+            name=name,
+            media_type=MediaType.PLAYLIST,
+            provider_mappings={
+                ProviderMapping(
+                    item_id=item_id,
+                    provider_domain=self.domain,
+                    provider_instance=self.instance_id,
+                )
+            },
+            metadata=MediaItemMetadata(images=images) if images else MediaItemMetadata(),
+            is_editable=False,
+            owner="Deezer",
+        )
+
     def parse_track(self, track: deezer.Track, user_country: str, position: int = 0) -> Track:
         """Parse the deezer-python track to a Music Assistant track."""
         if hasattr(track, "artist"):