Tidal: Modify recommendations setup (#2136)
authorJozef Kruszynski <60214390+jozefKruszynski@users.noreply.github.com>
Sun, 20 Apr 2025 21:46:26 +0000 (23:46 +0200)
committerGitHub <noreply@github.com>
Sun, 20 Apr 2025 21:46:26 +0000 (23:46 +0200)
fix: Modify recommendations setup

Parse both home and for_you pages, as Tidal have made modifications
to page/module structure

music_assistant/providers/tidal/__init__.py
music_assistant/providers/tidal/tidal_page_parser.py

index 0d9a179aa0db16ee47db06219a42d8dada28d2c8..a679d63b5ad5e3e3e1661511d0c8662f2040a650 100644 (file)
@@ -41,6 +41,7 @@ from music_assistant_models.media_items import (
     ItemMapping,
     MediaItemImage,
     MediaItemType,
+    MediaItemTypeOrItemMapping,
     Playlist,
     ProviderMapping,
     RecommendationFolder,
@@ -1021,102 +1022,24 @@ class TidalProvider(MusicProvider):
 
         results: list[RecommendationFolder] = []
 
-        try:
-            # Get page content
-            home_parser = await self.get_page_content("pages/home")
-
-            # Helper function to determine icon based on content type
-            def get_icon_for_type(media_type: MediaType) -> str:
-                if media_type == MediaType.PLAYLIST:
-                    return "mdi-playlist-music"
-                elif media_type == MediaType.ALBUM:
-                    return "mdi-album"
-                elif media_type == MediaType.TRACK:
-                    return "mdi-file-music"
-                elif media_type == MediaType.ARTIST:
-                    return "mdi-account-music"
-                return "mdi-motion-play"  # Default for mixed content
-
-            # Collection for all "Because you listened to" items
-            because_items = []
-            because_modules = set()
+        # Pages to fetch
+        pages = ["pages/home", "pages/for_you"]
 
-            # Process all modules in a single pass
-            for module_info in home_parser._module_map:
-                try:
-                    module_title = module_info.get("title", "Unknown")
-
-                    # Skip modules without proper titles
-                    if not module_title or module_title == "Unknown":
-                        continue
-
-                    # Check if it's a "because you listened to" module
-                    pre_title = module_info.get("raw_data", {}).get("preTitle")
-                    is_because_module = pre_title and "because you listened to" in pre_title.lower()
-
-                    # Get module items
-                    module_items, content_type = home_parser.get_module_items(module_info)
-
-                    # Skip empty modules
-                    if not module_items:
-                        continue
-
-                    # Create folder description and icon
-                    subtitle = f"Tidal {content_type.name.lower()} collection"
-                    icon = get_icon_for_type(content_type)
-
-                    if is_because_module:
-                        # Add items to collective "Because you listened to" list
-                        because_items.extend(module_items)
-                        because_modules.add(module_title)
-                    else:
-                        # Create regular recommendation folder
-                        item_id = "".join(
-                            c
-                            for c in module_title.lower().replace(" ", "_").replace("-", "_")
-                            if c.isalnum() or c == "_"
-                        )
-
-                        folder = RecommendationFolder(
-                            item_id=item_id,
-                            name=module_title,
-                            provider=self.lookup_key,
-                            items=UniqueList(module_items),
-                            subtitle=subtitle,
-                            translation_key=item_id,
-                            icon=icon,
-                        )
-                        results.append(folder)
+        # Dictionary to track items by module title to combine duplicates
+        combined_modules: dict[str, list[Playlist | Album | Track | Artist]] = {}
+        module_content_types: dict[str, MediaType] = {}
+        module_page_names: dict[str, str] = {}
 
-                except (KeyError, ValueError, TypeError, AttributeError) as err:
-                    self.logger.warning(
-                        "Error processing module %s: %s",
-                        module_info.get("title", "Unknown"),
-                        err,
-                    )
-
-            # Create a single folder for all "Because you listened to" items if we have any
-            if because_items:
-                sources_summary = " - ".join(sorted(because_modules))
-                folder_subtitle = f"Recommendations based on: {sources_summary}"
-
-                because_folder = RecommendationFolder(
-                    item_id="because_you_listened_to",
-                    name=f"Because You Listened To: {sources_summary}",
-                    provider=self.lookup_key,
-                    items=UniqueList(because_items),
-                    subtitle=folder_subtitle,
-                    translation_key="because_you_listened_to",
-                    icon="mdi-headphones-box",
-                )
-                # Add as first item in results
-                results.insert(0, because_folder)
+        try:
+            # Process pages and collect modules
+            await self._process_recommendation_pages(
+                pages, combined_modules, module_content_types, module_page_names
+            )
 
-                self.logger.debug(
-                    "Created 'Because You Listened To' folder with %d items from %d modules",
-                    len(because_items),
-                    len(because_modules),
-                )
+            # Create recommendation folders from combined modules
+            results = self._create_recommendation_folders(
+                combined_modules, module_content_types, module_page_names
+            )
 
             self.logger.debug("Created %d recommendation folders from Tidal", len(results))
 
@@ -1146,6 +1069,130 @@ class TidalProvider(MusicProvider):
 
         return results
 
+    async def _process_recommendation_pages(
+        self,
+        pages: list[str],
+        combined_modules: dict[str, list[Playlist | Album | Track | Artist]],
+        module_content_types: dict[str, MediaType],
+        module_page_names: dict[str, str],
+    ) -> None:
+        """Process recommendation pages and collect modules."""
+        for page_path in pages:
+            # Get page content
+            page_parser = await self.get_page_content(page_path)
+            page_name = page_path.split("/")[-1].replace("_", " ").title()
+
+            # Process all modules in a single pass
+            await self._process_page_modules(
+                page_parser, page_name, combined_modules, module_content_types, module_page_names
+            )
+
+    async def _process_page_modules(
+        self,
+        page_parser: TidalPageParser,
+        page_name: str,
+        combined_modules: dict[str, list[Playlist | Album | Track | Artist]],
+        module_content_types: dict[str, MediaType],
+        module_page_names: dict[str, str],
+    ) -> None:
+        """Process all modules from a single page."""
+        for module_info in page_parser._module_map:
+            try:
+                module_title = module_info.get("title", "Unknown")
+
+                # Skip modules without proper titles
+                if not module_title or module_title == "Unknown":
+                    continue
+
+                # Get module items
+                module_items, content_type = page_parser.get_module_items(module_info)
+
+                # Skip empty modules
+                if not module_items:
+                    continue
+
+                # For all modules, collect items based on title
+                if module_title not in combined_modules:
+                    combined_modules[module_title] = []
+                    module_content_types[module_title] = content_type
+                    module_page_names[module_title] = page_name
+                else:
+                    # If we already have this module title, update the content type
+                    # if this module has more items than we already collected
+                    current_items_count = len(combined_modules[module_title])
+                    if len(module_items) > current_items_count:
+                        module_content_types[module_title] = content_type
+
+                # Add items to the combined collection
+                combined_modules[module_title].extend(module_items)
+
+            except (KeyError, ValueError, TypeError, AttributeError) as err:
+                self.logger.warning(
+                    "Error processing module %s from %s: %s",
+                    module_info.get("title", "Unknown"),
+                    page_name,
+                    err,
+                )
+
+    def _create_recommendation_folders(
+        self,
+        combined_modules: dict[str, list[Playlist | Album | Track | Artist]],
+        module_content_types: dict[str, MediaType],
+        module_page_names: dict[str, str],
+    ) -> list[RecommendationFolder]:
+        """Create recommendation folders from combined modules."""
+        results: list[RecommendationFolder] = []
+
+        # Helper function to determine icon based on content type
+        def get_icon_for_type(media_type: MediaType) -> str:
+            if media_type == MediaType.PLAYLIST:
+                return "mdi-playlist-music"
+            elif media_type == MediaType.ALBUM:
+                return "mdi-album"
+            elif media_type == MediaType.TRACK:
+                return "mdi-file-music"
+            elif media_type == MediaType.ARTIST:
+                return "mdi-account-music"
+            return "mdi-motion-play"  # Default for mixed content
+
+        for module_title, items in combined_modules.items():
+            # Use unique items list to prevent duplicates
+            unique_items = UniqueList(items)
+
+            # Create a sanitized unique ID
+            item_id = "".join(
+                c
+                for c in module_title.lower().replace(" ", "_").replace("-", "_")
+                if c.isalnum() or c == "_"
+            )
+
+            # Get content type and page source
+            content_type = module_content_types.get(module_title, MediaType.PLAYLIST)
+            page_name = module_page_names.get(module_title, "Tidal")
+
+            # Create folder with combined items
+            folder = RecommendationFolder(
+                item_id=item_id,
+                name=module_title,
+                provider=self.lookup_key,
+                items=UniqueList[MediaItemTypeOrItemMapping](unique_items),
+                subtitle=f"From {page_name} • {len(unique_items)} items",
+                translation_key=item_id,
+                icon=get_icon_for_type(content_type),
+            )
+            results.append(folder)
+
+            # Log a message if we combined multiple sources
+            if len(unique_items) < len(items):
+                self.logger.debug(
+                    "Combined %d items into %d unique items for '%s'",
+                    len(items),
+                    len(unique_items),
+                    module_title,
+                )
+
+        return results
+
     def _process_track_results(
         self, track_objects: list[dict[str, Any]], offset: int
     ) -> list[Track]:
index ffd80a4de5c88c2e7c30b4e047ef9422089aac8a..d1dd24e9311657ced9c8e32ea0100ab26701f925 100644 (file)
@@ -70,16 +70,186 @@ class TidalPageParser:
         module_data = module_info.get("raw_data", {})
         module_type = module_data.get("type", "")
 
-        # Extract items based on module type
-        if module_type == "HIGHLIGHT_MODULE":
-            self._process_highlight_module(module_data, result, type_counts)
-        elif module_type == "MIXED_TYPES_LIST" or "pagedList" in module_data:
-            self._process_paged_list(module_data, module_type, result, type_counts)
+        self.logger.debug(
+            "Processing module type: %s, title: %s",
+            module_type,
+            module_data.get("title", "Unknown"),
+        )
+
+        # Process module based on type
+        self._process_module_by_type(module_data, module_type, result, type_counts)
 
         # Determine the primary content type based on counts
         primary_type = self._determine_primary_type(type_counts)
+
+        self._log_module_results(module_data, result, type_counts)
+
         return result, primary_type
 
+    def _process_module_by_type(
+        self,
+        module_data: dict[str, Any],
+        module_type: str,
+        result: list[Playlist | Album | Track | Artist],
+        type_counts: dict[MediaType, int],
+    ) -> None:
+        """Process module content based on module type."""
+        # Extract paged list if present (most modules have this)
+        paged_list = module_data.get("pagedList", {})
+        items = paged_list.get("items", [])
+
+        # Different module types have different content structures
+        if module_type == "PLAYLIST_LIST":
+            self._process_playlist_list(items, result, type_counts)
+        elif module_type == "TRACK_LIST":
+            self._process_track_list(items, result, type_counts)
+        elif module_type == "ALBUM_LIST":
+            self._process_album_list(items, result, type_counts)
+        elif module_type == "ARTIST_LIST":
+            self._process_artist_list(items, result, type_counts)
+        elif module_type == "MIX_LIST":
+            self._process_mix_list(items, result, type_counts)
+        elif module_type == "HIGHLIGHT_MODULE":
+            self._process_highlight_module(module_data, result, type_counts)
+        else:
+            # Generic fallback for other module types
+            self._process_generic_items(items, result, type_counts)
+
+    def _process_playlist_list(
+        self,
+        items: list[dict[str, Any]],
+        result: list[Playlist | Album | Track | Artist],
+        type_counts: dict[MediaType, int],
+    ) -> None:
+        """Process items from a PLAYLIST_LIST module."""
+        for item in items:
+            if isinstance(item, dict):
+                # Check if item appears to be a mix
+                is_mix = "mixId" in item or "mixType" in item
+
+                try:
+                    playlist = self.provider._parse_playlist(item, is_mix=is_mix)
+                    result.append(playlist)
+                    type_counts[MediaType.PLAYLIST] += 1
+                except (KeyError, ValueError, TypeError) as err:
+                    self.logger.warning("Error parsing playlist: %s", err)
+            else:
+                # Skip non-dict items
+                pass
+
+    def _process_track_list(
+        self,
+        items: list[dict[str, Any]],
+        result: list[Playlist | Album | Track | Artist],
+        type_counts: dict[MediaType, int],
+    ) -> None:
+        """Process items from a TRACK_LIST module."""
+        for item in items:
+            if isinstance(item, dict):
+                try:
+                    track = self.provider._parse_track(item)
+                    result.append(track)
+                    type_counts[MediaType.TRACK] += 1
+                except (KeyError, ValueError, TypeError) as err:
+                    self.logger.warning("Error parsing track: %s", err)
+            else:
+                # Skip non-dict items
+                pass
+
+    def _process_album_list(
+        self,
+        items: list[dict[str, Any]],
+        result: list[Playlist | Album | Track | Artist],
+        type_counts: dict[MediaType, int],
+    ) -> None:
+        """Process items from an ALBUM_LIST module."""
+        for item in items:
+            if isinstance(item, dict):
+                try:
+                    album = self.provider._parse_album(item)
+                    result.append(album)
+                    type_counts[MediaType.ALBUM] += 1
+                except (KeyError, ValueError, TypeError) as err:
+                    self.logger.warning("Error parsing album: %s", err)
+            else:
+                # Skip non-dict items
+                pass
+
+    def _process_artist_list(
+        self,
+        items: list[dict[str, Any]],
+        result: list[Playlist | Album | Track | Artist],
+        type_counts: dict[MediaType, int],
+    ) -> None:
+        """Process items from an ARTIST_LIST module."""
+        for item in items:
+            if isinstance(item, dict):
+                try:
+                    artist = self.provider._parse_artist(item)
+                    result.append(artist)
+                    type_counts[MediaType.ARTIST] += 1
+                except (KeyError, ValueError, TypeError) as err:
+                    self.logger.warning("Error parsing artist: %s", err)
+            else:
+                # Skip non-dict items
+                pass
+
+    def _process_mix_list(
+        self,
+        items: list[dict[str, Any]],
+        result: list[Playlist | Album | Track | Artist],
+        type_counts: dict[MediaType, int],
+    ) -> None:
+        """Process items from a MIX_LIST module."""
+        for item in items:
+            if isinstance(item, dict):
+                try:
+                    mix = self.provider._parse_playlist(item, is_mix=True)
+                    result.append(mix)
+                    type_counts[MediaType.PLAYLIST] += 1
+                except (KeyError, ValueError, TypeError) as err:
+                    self.logger.warning("Error parsing mix: %s", err)
+            else:
+                # Skip non-dict items
+                pass
+
+    def _process_generic_items(
+        self,
+        items: list[dict[str, Any]],
+        result: list[Playlist | Album | Track | Artist],
+        type_counts: dict[MediaType, int],
+    ) -> None:
+        """Process items with generic type detection."""
+        for item in items:
+            if isinstance(item, dict):
+                # Try to determine item type from structure
+                try:
+                    parsed_item = self._parse_item(item, type_counts)
+                    if parsed_item:
+                        result.append(parsed_item)
+                except (KeyError, ValueError, TypeError) as err:
+                    self.logger.warning("Error parsing generic item: %s", err)
+            else:
+                # Skip non-dict items
+                pass
+
+    def _log_module_results(
+        self,
+        module_data: dict[str, Any],
+        result: list[Playlist | Album | Track | Artist],
+        type_counts: dict[MediaType, int],
+    ) -> None:
+        """Log detailed module processing results."""
+        self.logger.debug(
+            "Module '%s' processed: %d items (%d playlists, %d albums, %d tracks, %d artists)",
+            module_data.get("title", "Unknown"),
+            len(result),
+            type_counts[MediaType.PLAYLIST],
+            type_counts[MediaType.ALBUM],
+            type_counts[MediaType.TRACK],
+            type_counts[MediaType.ARTIST],
+        )
+
     def _determine_primary_type(self, type_counts: dict[MediaType, int]) -> MediaType:
         """Determine the primary media type based on item counts."""
         primary_type = MediaType.PLAYLIST  # Default
@@ -138,22 +308,33 @@ class TidalPageParser:
         type_counts: dict[MediaType, int],
         item_type: str = "",
     ) -> Playlist | Album | Track | Artist | None:
-        """Parse a single item from Tidal data into a media item."""
+        """Parse a single item from Tidal data into a media item.
+
+        Args:
+            item: Dictionary containing item data
+            type_counts: Dictionary to track counts by media type
+            item_type: Optional item type hint
+
+        Returns:
+            Parsed media item or None if parsing failed
+        """
         # Handle nested item structure
-        if isinstance(item, dict) and "type" in item and "item" in item:
+        if not item_type and isinstance(item, dict) and "type" in item and "item" in item:
             item_type = item["type"]
             item = item["item"]
 
         # If no explicit type, try to infer from structure
         if not item_type:
-            if "mixType" in item or item.get("subTitle"):
+            if "mixId" in item or "mixType" in item:
                 item_type = "MIX"
             elif "uuid" in item:
                 item_type = "PLAYLIST"
-            elif "id" in item and "duration" in item:
+            elif "id" in item and "duration" in item and "album" in item:
                 item_type = "TRACK"
-            elif "id" in item and "artist" in item and "numberOfTracks" in item:
+            elif "id" in item and "numberOfTracks" in item and "artists" in item:
                 item_type = "ALBUM"
+            elif "id" in item and "picture" in item and "name" in item and "album" not in item:
+                item_type = "ARTIST"
 
         # Parse based on detected type
         try:
@@ -179,18 +360,32 @@ class TidalPageParser:
                 media_item = self.provider._parse_artist(item)
                 type_counts[MediaType.ARTIST] += 1
                 return media_item
-            return None
+            else:
+                # Last resort - try to infer from structure for unlabeled items
+                if "uuid" in item:
+                    media_item = self.provider._parse_playlist(item)
+                    type_counts[MediaType.PLAYLIST] += 1
+                    return media_item
+                elif "id" in item and "title" in item and "duration" in item:
+                    media_item = self.provider._parse_track(item)
+                    type_counts[MediaType.TRACK] += 1
+                    return media_item
+                elif "id" in item and "title" in item and "numberOfTracks" in item:
+                    media_item = self.provider._parse_album(item)
+                    type_counts[MediaType.ALBUM] += 1
+                    return media_item
+
+                self.logger.warning("Unknown item type, could not parse: %s", item)
+                return None
+
         except (KeyError, ValueError, TypeError) as err:
-            # Data structure errors
-            self.logger.debug("Error parsing item data structure: %s", err)
+            self.logger.debug("Error parsing %s item: %s", item_type, err)
             return None
         except AttributeError as err:
-            # Missing attribute errors
-            self.logger.debug("Missing attribute in item: %s", err)
+            self.logger.debug("Attribute error parsing %s item: %s", item_type, err)
             return None
         except (json.JSONDecodeError, UnicodeError) as err:
-            # JSON/text encoding issues
-            self.logger.debug("Error decoding item content: %s", err)
+            self.logger.debug("JSON/Unicode error parsing %s item: %s", item_type, err)
             return None
 
     @classmethod