From 42d79309d1b7d4bdacdfe9a22153053f200b1c5b Mon Sep 17 00:00:00 2001 From: Jozef Kruszynski <60214390+jozefKruszynski@users.noreply.github.com> Date: Sun, 20 Apr 2025 23:46:26 +0200 Subject: [PATCH] Tidal: Modify recommendations setup (#2136) fix: Modify recommendations setup Parse both home and for_you pages, as Tidal have made modifications to page/module structure --- music_assistant/providers/tidal/__init__.py | 233 +++++++++++------- .../providers/tidal/tidal_page_parser.py | 229 +++++++++++++++-- 2 files changed, 352 insertions(+), 110 deletions(-) diff --git a/music_assistant/providers/tidal/__init__.py b/music_assistant/providers/tidal/__init__.py index 0d9a179a..a679d63b 100644 --- a/music_assistant/providers/tidal/__init__.py +++ b/music_assistant/providers/tidal/__init__.py @@ -41,6 +41,7 @@ from music_assistant_models.media_items import ( ItemMapping, MediaItemImage, MediaItemType, + MediaItemTypeOrItemMapping, Playlist, ProviderMapping, RecommendationFolder, @@ -1021,102 +1022,24 @@ class TidalProvider(MusicProvider): results: list[RecommendationFolder] = [] - try: - # Get page content - home_parser = await self.get_page_content("pages/home") - - # Helper function to determine icon based on content type - def get_icon_for_type(media_type: MediaType) -> str: - if media_type == MediaType.PLAYLIST: - return "mdi-playlist-music" - elif media_type == MediaType.ALBUM: - return "mdi-album" - elif media_type == MediaType.TRACK: - return "mdi-file-music" - elif media_type == MediaType.ARTIST: - return "mdi-account-music" - return "mdi-motion-play" # Default for mixed content - - # Collection for all "Because you listened to" items - because_items = [] - because_modules = set() + # Pages to fetch + pages = ["pages/home", "pages/for_you"] - # Process all modules in a single pass - for module_info in home_parser._module_map: - try: - module_title = module_info.get("title", "Unknown") - - # Skip modules without proper titles - if not module_title or module_title == "Unknown": - continue - - # Check if it's a "because you listened to" module - pre_title = module_info.get("raw_data", {}).get("preTitle") - is_because_module = pre_title and "because you listened to" in pre_title.lower() - - # Get module items - module_items, content_type = home_parser.get_module_items(module_info) - - # Skip empty modules - if not module_items: - continue - - # Create folder description and icon - subtitle = f"Tidal {content_type.name.lower()} collection" - icon = get_icon_for_type(content_type) - - if is_because_module: - # Add items to collective "Because you listened to" list - because_items.extend(module_items) - because_modules.add(module_title) - else: - # Create regular recommendation folder - item_id = "".join( - c - for c in module_title.lower().replace(" ", "_").replace("-", "_") - if c.isalnum() or c == "_" - ) - - folder = RecommendationFolder( - item_id=item_id, - name=module_title, - provider=self.lookup_key, - items=UniqueList(module_items), - subtitle=subtitle, - translation_key=item_id, - icon=icon, - ) - results.append(folder) + # Dictionary to track items by module title to combine duplicates + combined_modules: dict[str, list[Playlist | Album | Track | Artist]] = {} + module_content_types: dict[str, MediaType] = {} + module_page_names: dict[str, str] = {} - except (KeyError, ValueError, TypeError, AttributeError) as err: - self.logger.warning( - "Error processing module %s: %s", - module_info.get("title", "Unknown"), - err, - ) - - # Create a single folder for all "Because you listened to" items if we have any - if because_items: - sources_summary = " - ".join(sorted(because_modules)) - folder_subtitle = f"Recommendations based on: {sources_summary}" - - because_folder = RecommendationFolder( - item_id="because_you_listened_to", - name=f"Because You Listened To: {sources_summary}", - provider=self.lookup_key, - items=UniqueList(because_items), - subtitle=folder_subtitle, - translation_key="because_you_listened_to", - icon="mdi-headphones-box", - ) - # Add as first item in results - results.insert(0, because_folder) + try: + # Process pages and collect modules + await self._process_recommendation_pages( + pages, combined_modules, module_content_types, module_page_names + ) - self.logger.debug( - "Created 'Because You Listened To' folder with %d items from %d modules", - len(because_items), - len(because_modules), - ) + # Create recommendation folders from combined modules + results = self._create_recommendation_folders( + combined_modules, module_content_types, module_page_names + ) self.logger.debug("Created %d recommendation folders from Tidal", len(results)) @@ -1146,6 +1069,130 @@ class TidalProvider(MusicProvider): return results + async def _process_recommendation_pages( + self, + pages: list[str], + combined_modules: dict[str, list[Playlist | Album | Track | Artist]], + module_content_types: dict[str, MediaType], + module_page_names: dict[str, str], + ) -> None: + """Process recommendation pages and collect modules.""" + for page_path in pages: + # Get page content + page_parser = await self.get_page_content(page_path) + page_name = page_path.split("/")[-1].replace("_", " ").title() + + # Process all modules in a single pass + await self._process_page_modules( + page_parser, page_name, combined_modules, module_content_types, module_page_names + ) + + async def _process_page_modules( + self, + page_parser: TidalPageParser, + page_name: str, + combined_modules: dict[str, list[Playlist | Album | Track | Artist]], + module_content_types: dict[str, MediaType], + module_page_names: dict[str, str], + ) -> None: + """Process all modules from a single page.""" + for module_info in page_parser._module_map: + try: + module_title = module_info.get("title", "Unknown") + + # Skip modules without proper titles + if not module_title or module_title == "Unknown": + continue + + # Get module items + module_items, content_type = page_parser.get_module_items(module_info) + + # Skip empty modules + if not module_items: + continue + + # For all modules, collect items based on title + if module_title not in combined_modules: + combined_modules[module_title] = [] + module_content_types[module_title] = content_type + module_page_names[module_title] = page_name + else: + # If we already have this module title, update the content type + # if this module has more items than we already collected + current_items_count = len(combined_modules[module_title]) + if len(module_items) > current_items_count: + module_content_types[module_title] = content_type + + # Add items to the combined collection + combined_modules[module_title].extend(module_items) + + except (KeyError, ValueError, TypeError, AttributeError) as err: + self.logger.warning( + "Error processing module %s from %s: %s", + module_info.get("title", "Unknown"), + page_name, + err, + ) + + def _create_recommendation_folders( + self, + combined_modules: dict[str, list[Playlist | Album | Track | Artist]], + module_content_types: dict[str, MediaType], + module_page_names: dict[str, str], + ) -> list[RecommendationFolder]: + """Create recommendation folders from combined modules.""" + results: list[RecommendationFolder] = [] + + # Helper function to determine icon based on content type + def get_icon_for_type(media_type: MediaType) -> str: + if media_type == MediaType.PLAYLIST: + return "mdi-playlist-music" + elif media_type == MediaType.ALBUM: + return "mdi-album" + elif media_type == MediaType.TRACK: + return "mdi-file-music" + elif media_type == MediaType.ARTIST: + return "mdi-account-music" + return "mdi-motion-play" # Default for mixed content + + for module_title, items in combined_modules.items(): + # Use unique items list to prevent duplicates + unique_items = UniqueList(items) + + # Create a sanitized unique ID + item_id = "".join( + c + for c in module_title.lower().replace(" ", "_").replace("-", "_") + if c.isalnum() or c == "_" + ) + + # Get content type and page source + content_type = module_content_types.get(module_title, MediaType.PLAYLIST) + page_name = module_page_names.get(module_title, "Tidal") + + # Create folder with combined items + folder = RecommendationFolder( + item_id=item_id, + name=module_title, + provider=self.lookup_key, + items=UniqueList[MediaItemTypeOrItemMapping](unique_items), + subtitle=f"From {page_name} • {len(unique_items)} items", + translation_key=item_id, + icon=get_icon_for_type(content_type), + ) + results.append(folder) + + # Log a message if we combined multiple sources + if len(unique_items) < len(items): + self.logger.debug( + "Combined %d items into %d unique items for '%s'", + len(items), + len(unique_items), + module_title, + ) + + return results + def _process_track_results( self, track_objects: list[dict[str, Any]], offset: int ) -> list[Track]: diff --git a/music_assistant/providers/tidal/tidal_page_parser.py b/music_assistant/providers/tidal/tidal_page_parser.py index ffd80a4d..d1dd24e9 100644 --- a/music_assistant/providers/tidal/tidal_page_parser.py +++ b/music_assistant/providers/tidal/tidal_page_parser.py @@ -70,16 +70,186 @@ class TidalPageParser: module_data = module_info.get("raw_data", {}) module_type = module_data.get("type", "") - # Extract items based on module type - if module_type == "HIGHLIGHT_MODULE": - self._process_highlight_module(module_data, result, type_counts) - elif module_type == "MIXED_TYPES_LIST" or "pagedList" in module_data: - self._process_paged_list(module_data, module_type, result, type_counts) + self.logger.debug( + "Processing module type: %s, title: %s", + module_type, + module_data.get("title", "Unknown"), + ) + + # Process module based on type + self._process_module_by_type(module_data, module_type, result, type_counts) # Determine the primary content type based on counts primary_type = self._determine_primary_type(type_counts) + + self._log_module_results(module_data, result, type_counts) + return result, primary_type + def _process_module_by_type( + self, + module_data: dict[str, Any], + module_type: str, + result: list[Playlist | Album | Track | Artist], + type_counts: dict[MediaType, int], + ) -> None: + """Process module content based on module type.""" + # Extract paged list if present (most modules have this) + paged_list = module_data.get("pagedList", {}) + items = paged_list.get("items", []) + + # Different module types have different content structures + if module_type == "PLAYLIST_LIST": + self._process_playlist_list(items, result, type_counts) + elif module_type == "TRACK_LIST": + self._process_track_list(items, result, type_counts) + elif module_type == "ALBUM_LIST": + self._process_album_list(items, result, type_counts) + elif module_type == "ARTIST_LIST": + self._process_artist_list(items, result, type_counts) + elif module_type == "MIX_LIST": + self._process_mix_list(items, result, type_counts) + elif module_type == "HIGHLIGHT_MODULE": + self._process_highlight_module(module_data, result, type_counts) + else: + # Generic fallback for other module types + self._process_generic_items(items, result, type_counts) + + def _process_playlist_list( + self, + items: list[dict[str, Any]], + result: list[Playlist | Album | Track | Artist], + type_counts: dict[MediaType, int], + ) -> None: + """Process items from a PLAYLIST_LIST module.""" + for item in items: + if isinstance(item, dict): + # Check if item appears to be a mix + is_mix = "mixId" in item or "mixType" in item + + try: + playlist = self.provider._parse_playlist(item, is_mix=is_mix) + result.append(playlist) + type_counts[MediaType.PLAYLIST] += 1 + except (KeyError, ValueError, TypeError) as err: + self.logger.warning("Error parsing playlist: %s", err) + else: + # Skip non-dict items + pass + + def _process_track_list( + self, + items: list[dict[str, Any]], + result: list[Playlist | Album | Track | Artist], + type_counts: dict[MediaType, int], + ) -> None: + """Process items from a TRACK_LIST module.""" + for item in items: + if isinstance(item, dict): + try: + track = self.provider._parse_track(item) + result.append(track) + type_counts[MediaType.TRACK] += 1 + except (KeyError, ValueError, TypeError) as err: + self.logger.warning("Error parsing track: %s", err) + else: + # Skip non-dict items + pass + + def _process_album_list( + self, + items: list[dict[str, Any]], + result: list[Playlist | Album | Track | Artist], + type_counts: dict[MediaType, int], + ) -> None: + """Process items from an ALBUM_LIST module.""" + for item in items: + if isinstance(item, dict): + try: + album = self.provider._parse_album(item) + result.append(album) + type_counts[MediaType.ALBUM] += 1 + except (KeyError, ValueError, TypeError) as err: + self.logger.warning("Error parsing album: %s", err) + else: + # Skip non-dict items + pass + + def _process_artist_list( + self, + items: list[dict[str, Any]], + result: list[Playlist | Album | Track | Artist], + type_counts: dict[MediaType, int], + ) -> None: + """Process items from an ARTIST_LIST module.""" + for item in items: + if isinstance(item, dict): + try: + artist = self.provider._parse_artist(item) + result.append(artist) + type_counts[MediaType.ARTIST] += 1 + except (KeyError, ValueError, TypeError) as err: + self.logger.warning("Error parsing artist: %s", err) + else: + # Skip non-dict items + pass + + def _process_mix_list( + self, + items: list[dict[str, Any]], + result: list[Playlist | Album | Track | Artist], + type_counts: dict[MediaType, int], + ) -> None: + """Process items from a MIX_LIST module.""" + for item in items: + if isinstance(item, dict): + try: + mix = self.provider._parse_playlist(item, is_mix=True) + result.append(mix) + type_counts[MediaType.PLAYLIST] += 1 + except (KeyError, ValueError, TypeError) as err: + self.logger.warning("Error parsing mix: %s", err) + else: + # Skip non-dict items + pass + + def _process_generic_items( + self, + items: list[dict[str, Any]], + result: list[Playlist | Album | Track | Artist], + type_counts: dict[MediaType, int], + ) -> None: + """Process items with generic type detection.""" + for item in items: + if isinstance(item, dict): + # Try to determine item type from structure + try: + parsed_item = self._parse_item(item, type_counts) + if parsed_item: + result.append(parsed_item) + except (KeyError, ValueError, TypeError) as err: + self.logger.warning("Error parsing generic item: %s", err) + else: + # Skip non-dict items + pass + + def _log_module_results( + self, + module_data: dict[str, Any], + result: list[Playlist | Album | Track | Artist], + type_counts: dict[MediaType, int], + ) -> None: + """Log detailed module processing results.""" + self.logger.debug( + "Module '%s' processed: %d items (%d playlists, %d albums, %d tracks, %d artists)", + module_data.get("title", "Unknown"), + len(result), + type_counts[MediaType.PLAYLIST], + type_counts[MediaType.ALBUM], + type_counts[MediaType.TRACK], + type_counts[MediaType.ARTIST], + ) + def _determine_primary_type(self, type_counts: dict[MediaType, int]) -> MediaType: """Determine the primary media type based on item counts.""" primary_type = MediaType.PLAYLIST # Default @@ -138,22 +308,33 @@ class TidalPageParser: type_counts: dict[MediaType, int], item_type: str = "", ) -> Playlist | Album | Track | Artist | None: - """Parse a single item from Tidal data into a media item.""" + """Parse a single item from Tidal data into a media item. + + Args: + item: Dictionary containing item data + type_counts: Dictionary to track counts by media type + item_type: Optional item type hint + + Returns: + Parsed media item or None if parsing failed + """ # Handle nested item structure - if isinstance(item, dict) and "type" in item and "item" in item: + if not item_type and isinstance(item, dict) and "type" in item and "item" in item: item_type = item["type"] item = item["item"] # If no explicit type, try to infer from structure if not item_type: - if "mixType" in item or item.get("subTitle"): + if "mixId" in item or "mixType" in item: item_type = "MIX" elif "uuid" in item: item_type = "PLAYLIST" - elif "id" in item and "duration" in item: + elif "id" in item and "duration" in item and "album" in item: item_type = "TRACK" - elif "id" in item and "artist" in item and "numberOfTracks" in item: + elif "id" in item and "numberOfTracks" in item and "artists" in item: item_type = "ALBUM" + elif "id" in item and "picture" in item and "name" in item and "album" not in item: + item_type = "ARTIST" # Parse based on detected type try: @@ -179,18 +360,32 @@ class TidalPageParser: media_item = self.provider._parse_artist(item) type_counts[MediaType.ARTIST] += 1 return media_item - return None + else: + # Last resort - try to infer from structure for unlabeled items + if "uuid" in item: + media_item = self.provider._parse_playlist(item) + type_counts[MediaType.PLAYLIST] += 1 + return media_item + elif "id" in item and "title" in item and "duration" in item: + media_item = self.provider._parse_track(item) + type_counts[MediaType.TRACK] += 1 + return media_item + elif "id" in item and "title" in item and "numberOfTracks" in item: + media_item = self.provider._parse_album(item) + type_counts[MediaType.ALBUM] += 1 + return media_item + + self.logger.warning("Unknown item type, could not parse: %s", item) + return None + except (KeyError, ValueError, TypeError) as err: - # Data structure errors - self.logger.debug("Error parsing item data structure: %s", err) + self.logger.debug("Error parsing %s item: %s", item_type, err) return None except AttributeError as err: - # Missing attribute errors - self.logger.debug("Missing attribute in item: %s", err) + self.logger.debug("Attribute error parsing %s item: %s", item_type, err) return None except (json.JSONDecodeError, UnicodeError) as err: - # JSON/text encoding issues - self.logger.debug("Error decoding item content: %s", err) + self.logger.debug("JSON/Unicode error parsing %s item: %s", item_type, err) return None @classmethod -- 2.34.1