From: Dionysis Fortis Date: Tue, 10 Feb 2026 06:56:01 +0000 (+0200) Subject: feat(deezer): add Flow, Radios and Top Charts as recommendations (#3077) X-Git-Url: https://git.kitaultman.com/?a=commitdiff_plain;h=321a8bd7f57f59f34351f5c1ae8f53615f956972;p=music-assistant-server.git feat(deezer): add Flow, Radios and Top Charts as recommendations (#3077) * feat(deezer): add Flow, Radios and Top Charts as dynamic playlists Implements recommendations for Deezer provider: - User's personalized Flow as a virtual playlist - All available Deezer Radios (genres, moods, etc.) - Top Charts by country These appear under the Recommendations panel and use virtual playlist objects (in_library=false) for dynamic content. Tested on dev environment and working as expected. * fix: add missing type annotations for mypy * Extract virtual playlist IDs and radio IDs into module-level constants --- diff --git a/music_assistant/providers/deezer/__init__.py b/music_assistant/providers/deezer/__init__.py index 71c870c7..a7a50917 100644 --- a/music_assistant/providers/deezer/__init__.py +++ b/music_assistant/providers/deezer/__init__.py @@ -95,6 +95,30 @@ manage_community,delete_library,listening_history" DEEZER_APP_ID = app_var(6) DEEZER_APP_SECRET = app_var(7) +# Virtual playlist IDs for dynamic Deezer content +FLOW_PLAYLIST_ID = "flow" +RECOMMENDED_TRACKS_PLAYLIST_ID = "recommended_tracks" +TOP_CHARTS_PLAYLIST_ID = "top_charts" +RADIO_PLAYLIST_PREFIX = "radio_" + +# Curated Deezer radio station IDs +CURATED_RADIO_IDS = [ + 37151, # Hits + 38305, # The '80s + 38295, # The '70s + 31061, # Pop + 37765, # Rock classics + 30901, # Metal + 30991, # Hip Hop + 30771, # Indie + 30621, # Electronic + 31031, # Jazz + 30661, # Classical + 36791, # Latin Music + 38225, # Focus + 39041, # Happy Hour +] + async def get_access_token( app_id: str, app_secret: str, code: str, http_session: ClientSession @@ -196,6 +220,23 @@ class DeezerProvider(MusicProvider): ) await self.gw_client.setup() + # Cached wrappers for dynamic Deezer content (ensures consistent data across calls) + @use_cache(3600) # Cache for 1 hour + async def _get_flow_tracks(self) -> list[deezer.Track]: + """Get cached Flow tracks.""" + return list(await self.client.get_user_flow()) + + @use_cache(3600) # Cache for 1 hour + async def _get_recommended_tracks(self) -> list[deezer.Track]: + """Get cached recommended tracks.""" + return list(await self.client.get_user_recommended_tracks()) + + @use_cache(3600) # Cache for 1 hour + async def _get_chart_tracks(self) -> list[deezer.Track]: + """Get cached chart tracks.""" + chart = await self.client.get_chart() + return list(chart.tracks[:100]) if chart.tracks else [] + @use_cache(3600 * 24 * 7) # Cache for 7 days async def search( self, search_query: str, media_types: list[MediaType], limit: int = 5 @@ -288,6 +329,41 @@ class DeezerProvider(MusicProvider): @use_cache(3600 * 24 * 30) # Cache for 30 days async def get_playlist(self, prov_playlist_id: str) -> Playlist: """Get full playlist details by id.""" + # Handle virtual playlists (Flow, Recommended tracks, Top Charts, Radios) + if prov_playlist_id == FLOW_PLAYLIST_ID: + flow_tracks = await self._get_flow_tracks() + flow_cover = None + if flow_tracks and hasattr(flow_tracks[0], "album"): + flow_cover = getattr(flow_tracks[0].album, "cover_medium", None) + return self._create_virtual_playlist(FLOW_PLAYLIST_ID, "Flow", image_url=flow_cover) + if prov_playlist_id == RECOMMENDED_TRACKS_PLAYLIST_ID: + rec_tracks = await self._get_recommended_tracks() + rec_cover = None + if rec_tracks and hasattr(rec_tracks[0], "album"): + rec_cover = getattr(rec_tracks[0].album, "cover_medium", None) + return self._create_virtual_playlist( + RECOMMENDED_TRACKS_PLAYLIST_ID, "Recommended tracks", image_url=rec_cover + ) + if prov_playlist_id == TOP_CHARTS_PLAYLIST_ID: + chart_tracks = await self._get_chart_tracks() + chart_cover = None + if chart_tracks and hasattr(chart_tracks[0], "album"): + chart_cover = getattr(chart_tracks[0].album, "cover_medium", None) + return self._create_virtual_playlist( + TOP_CHARTS_PLAYLIST_ID, "Top Charts", image_url=chart_cover + ) + if prov_playlist_id.startswith(RADIO_PLAYLIST_PREFIX): + radio_id = int(prov_playlist_id.replace(RADIO_PLAYLIST_PREFIX, "")) + try: + radio = await self.client.get_radio(radio_id) + return self._create_virtual_playlist( + prov_playlist_id, + f"Radio: {radio.title}", + image_url=getattr(radio, "picture_medium", None), + ) + except Exception as err: + self.logger.warning("Failed getting radio %s: %s", radio_id, err) + raise MediaNotFoundError(f"Radio {prov_playlist_id} not found on Deezer") from err try: return self.parse_playlist( playlist=await self.client.get_playlist(playlist_id=int(prov_playlist_id)), @@ -322,25 +398,51 @@ class DeezerProvider(MusicProvider): for deezer_track in await album.get_tracks() ] - @use_cache(3600 * 3) # Cache for 3 hours async def get_playlist_tracks(self, prov_playlist_id: str, page: int = 0) -> list[Track]: """Get playlist tracks.""" - result: list[Track] = [] if page > 0: # paging not supported, we always return the whole list at once return [] - # TODO: access the underlying paging on the deezer api (if possible)) + + # Virtual playlists use their own cached wrappers (not double-cached) + if prov_playlist_id == FLOW_PLAYLIST_ID: + return self._parse_tracks_list(await self._get_flow_tracks()) + + if prov_playlist_id == RECOMMENDED_TRACKS_PLAYLIST_ID: + return self._parse_tracks_list(await self._get_recommended_tracks()) + + if prov_playlist_id == TOP_CHARTS_PLAYLIST_ID: + return self._parse_tracks_list(await self._get_chart_tracks()) + + if prov_playlist_id.startswith(RADIO_PLAYLIST_PREFIX): + radio_id = int(prov_playlist_id.replace(RADIO_PLAYLIST_PREFIX, "")) + try: + radio = await self.client.get_radio(radio_id) + return self._parse_tracks_list(list(await radio.get_tracks())) + except Exception as err: + self.logger.debug("Failed to get radio tracks %s: %s", radio_id, err) + return [] + + # Regular Deezer playlists (cached separately) + return await self._get_regular_playlist_tracks(prov_playlist_id) + + @use_cache(3600 * 3) # Cache for 3 hours + async def _get_regular_playlist_tracks(self, prov_playlist_id: str) -> list[Track]: + """Get tracks for regular Deezer playlists (cached).""" playlist = await self.client.get_playlist(int(prov_playlist_id)) playlist_tracks = await playlist.get_tracks() - for index, deezer_track in enumerate(playlist_tracks, 1): - result.append( - self.parse_track( - track=deezer_track, - user_country=self.gw_client.user_country, - position=index, - ) + return self._parse_tracks_list(list(playlist_tracks)) + + def _parse_tracks_list(self, tracks: list[deezer.Track]) -> list[Track]: + """Parse a list of Deezer tracks to Music Assistant tracks.""" + return [ + self.parse_track( + track=track, + user_country=self.gw_client.user_country, + position=index, ) - return result + for index, track in enumerate(tracks, 1) + ] @use_cache(3600 * 24 * 7) # Cache for 7 days async def get_artist_albums(self, prov_artist_id: str) -> list[Album]: @@ -419,23 +521,107 @@ class DeezerProvider(MusicProvider): raise NotImplementedError return result - @use_cache(3600) # Cache for 1 hour + @use_cache(3600) async def recommendations(self) -> list[RecommendationFolder]: - """Get deezer's recommendations.""" - return [ + """Get Deezer's recommendations including Flow and personalized content.""" + result: list[RecommendationFolder] = [] + + # Made for you - combines Flow, Recommended tracks, and recommended playlists + # Get covers from first track's album for each virtual playlist + flow_cover = None + flow_tracks = await self._get_flow_tracks() + if flow_tracks and hasattr(flow_tracks[0], "album"): + flow_cover = getattr(flow_tracks[0].album, "cover_medium", None) + + recommended_cover = None + recommended_tracks = await self._get_recommended_tracks() + if recommended_tracks and hasattr(recommended_tracks[0], "album"): + recommended_cover = getattr(recommended_tracks[0].album, "cover_medium", None) + + chart_tracks = await self._get_chart_tracks() + chart_cover = None + if chart_tracks and hasattr(chart_tracks[0], "album"): + chart_cover = getattr(chart_tracks[0].album, "cover_medium", None) + + made_for_you_items: list[Playlist] = [ + # Flow - personalized endless radio + self._create_virtual_playlist(FLOW_PLAYLIST_ID, "Flow", image_url=flow_cover), + # Recommended tracks + self._create_virtual_playlist( + RECOMMENDED_TRACKS_PLAYLIST_ID, "Recommended tracks", image_url=recommended_cover + ), + # Top Charts - global top tracks + self._create_virtual_playlist( + TOP_CHARTS_PLAYLIST_ID, "Top Charts", image_url=chart_cover + ), + ] + # Add recommended playlists from Deezer + for playlist in await self.client.get_user_recommended_playlists(): + made_for_you_items.append(self.parse_playlist(playlist=playlist)) + + result.append( RecommendationFolder( - item_id="recommended_tracks", + item_id="made_for_you", provider=self.instance_id, - name="Recommended tracks", - translation_key="recommended_tracks", - items=UniqueList( - [ - self.parse_track(track=track, user_country=self.gw_client.user_country) - for track in await self.client.get_user_recommended_tracks() - ] - ), + name="Made for you", + items=UniqueList(made_for_you_items), ) - ] + ) + + # Recommended albums + recommended_albums = list(await self.client.get_user_recommended_albums()) + if recommended_albums: + result.append( + RecommendationFolder( + item_id="recommended_albums", + provider=self.instance_id, + name="Recommended albums", + items=UniqueList( + [self.parse_album(album=album) for album in recommended_albums] + ), + ) + ) + + # Recommended artists + recommended_artists = list(await self.client.get_user_recommended_artists()) + if recommended_artists: + result.append( + RecommendationFolder( + item_id="recommended_artists", + provider=self.instance_id, + name="Recommended artists", + items=UniqueList( + [self.parse_artist(artist=artist) for artist in recommended_artists] + ), + ) + ) + + # Deezer Radios - curated selection (as virtual playlists in one folder) + radio_playlists: list[Playlist] = [] + for radio_id in CURATED_RADIO_IDS: + try: + radio = await self.client.get_radio(radio_id) + radio_playlists.append( + self._create_virtual_playlist( + item_id=f"{RADIO_PLAYLIST_PREFIX}{radio_id}", + name=f"Radio: {radio.title}", + image_url=getattr(radio, "picture_medium", None), + ) + ) + except Exception as err: + self.logger.debug("Failed to load radio %s: %s", radio_id, err) + + if radio_playlists: + result.append( + RecommendationFolder( + item_id="radios", + provider=self.instance_id, + name="Deezer Radios", + items=UniqueList(radio_playlists), + ) + ) + + return result async def add_playlist_tracks(self, prov_playlist_id: str, prov_track_ids: list[str]) -> None: """Add track(s) to playlist.""" @@ -685,6 +871,45 @@ class DeezerProvider(MusicProvider): return playlist.creator return playlist.user + def _create_virtual_playlist( + self, + item_id: str, + name: str, + image_url: str | None = None, + ) -> Playlist: + """Create a virtual playlist for Flow, Recommended tracks, or Radios. + + :param item_id: The unique identifier (e.g., "flow", "radio_37151"). + :param name: Display name for the playlist. + :param image_url: Optional image URL. + """ + images: UniqueList[MediaItemImage] = UniqueList() + if image_url: + images.append( + MediaItemImage( + type=ImageType.THUMB, + path=image_url, + provider=self.instance_id, + remotely_accessible=True, + ) + ) + return Playlist( + item_id=item_id, + provider=self.instance_id, + name=name, + media_type=MediaType.PLAYLIST, + provider_mappings={ + ProviderMapping( + item_id=item_id, + provider_domain=self.domain, + provider_instance=self.instance_id, + ) + }, + metadata=MediaItemMetadata(images=images) if images else MediaItemMetadata(), + is_editable=False, + owner="Deezer", + ) + def parse_track(self, track: deezer.Track, user_country: str, position: int = 0) -> Track: """Parse the deezer-python track to a Music Assistant track.""" if hasattr(track, "artist"):