ItemMapping,
MediaItemImage,
MediaItemType,
+ MediaItemTypeOrItemMapping,
Playlist,
ProviderMapping,
RecommendationFolder,
results: list[RecommendationFolder] = []
- try:
- # Get page content
- home_parser = await self.get_page_content("pages/home")
-
- # Helper function to determine icon based on content type
- def get_icon_for_type(media_type: MediaType) -> str:
- if media_type == MediaType.PLAYLIST:
- return "mdi-playlist-music"
- elif media_type == MediaType.ALBUM:
- return "mdi-album"
- elif media_type == MediaType.TRACK:
- return "mdi-file-music"
- elif media_type == MediaType.ARTIST:
- return "mdi-account-music"
- return "mdi-motion-play" # Default for mixed content
-
- # Collection for all "Because you listened to" items
- because_items = []
- because_modules = set()
+ # Pages to fetch
+ pages = ["pages/home", "pages/for_you"]
- # Process all modules in a single pass
- for module_info in home_parser._module_map:
- try:
- module_title = module_info.get("title", "Unknown")
-
- # Skip modules without proper titles
- if not module_title or module_title == "Unknown":
- continue
-
- # Check if it's a "because you listened to" module
- pre_title = module_info.get("raw_data", {}).get("preTitle")
- is_because_module = pre_title and "because you listened to" in pre_title.lower()
-
- # Get module items
- module_items, content_type = home_parser.get_module_items(module_info)
-
- # Skip empty modules
- if not module_items:
- continue
-
- # Create folder description and icon
- subtitle = f"Tidal {content_type.name.lower()} collection"
- icon = get_icon_for_type(content_type)
-
- if is_because_module:
- # Add items to collective "Because you listened to" list
- because_items.extend(module_items)
- because_modules.add(module_title)
- else:
- # Create regular recommendation folder
- item_id = "".join(
- c
- for c in module_title.lower().replace(" ", "_").replace("-", "_")
- if c.isalnum() or c == "_"
- )
-
- folder = RecommendationFolder(
- item_id=item_id,
- name=module_title,
- provider=self.lookup_key,
- items=UniqueList(module_items),
- subtitle=subtitle,
- translation_key=item_id,
- icon=icon,
- )
- results.append(folder)
+ # Dictionary to track items by module title to combine duplicates
+ combined_modules: dict[str, list[Playlist | Album | Track | Artist]] = {}
+ module_content_types: dict[str, MediaType] = {}
+ module_page_names: dict[str, str] = {}
- except (KeyError, ValueError, TypeError, AttributeError) as err:
- self.logger.warning(
- "Error processing module %s: %s",
- module_info.get("title", "Unknown"),
- err,
- )
-
- # Create a single folder for all "Because you listened to" items if we have any
- if because_items:
- sources_summary = " - ".join(sorted(because_modules))
- folder_subtitle = f"Recommendations based on: {sources_summary}"
-
- because_folder = RecommendationFolder(
- item_id="because_you_listened_to",
- name=f"Because You Listened To: {sources_summary}",
- provider=self.lookup_key,
- items=UniqueList(because_items),
- subtitle=folder_subtitle,
- translation_key="because_you_listened_to",
- icon="mdi-headphones-box",
- )
- # Add as first item in results
- results.insert(0, because_folder)
+ try:
+ # Process pages and collect modules
+ await self._process_recommendation_pages(
+ pages, combined_modules, module_content_types, module_page_names
+ )
- self.logger.debug(
- "Created 'Because You Listened To' folder with %d items from %d modules",
- len(because_items),
- len(because_modules),
- )
+ # Create recommendation folders from combined modules
+ results = self._create_recommendation_folders(
+ combined_modules, module_content_types, module_page_names
+ )
self.logger.debug("Created %d recommendation folders from Tidal", len(results))
return results
+ async def _process_recommendation_pages(
+ self,
+ pages: list[str],
+ combined_modules: dict[str, list[Playlist | Album | Track | Artist]],
+ module_content_types: dict[str, MediaType],
+ module_page_names: dict[str, str],
+ ) -> None:
+ """Process recommendation pages and collect modules."""
+ for page_path in pages:
+ # Get page content
+ page_parser = await self.get_page_content(page_path)
+ page_name = page_path.split("/")[-1].replace("_", " ").title()
+
+ # Process all modules in a single pass
+ await self._process_page_modules(
+ page_parser, page_name, combined_modules, module_content_types, module_page_names
+ )
+
+ async def _process_page_modules(
+ self,
+ page_parser: TidalPageParser,
+ page_name: str,
+ combined_modules: dict[str, list[Playlist | Album | Track | Artist]],
+ module_content_types: dict[str, MediaType],
+ module_page_names: dict[str, str],
+ ) -> None:
+ """Process all modules from a single page."""
+ for module_info in page_parser._module_map:
+ try:
+ module_title = module_info.get("title", "Unknown")
+
+ # Skip modules without proper titles
+ if not module_title or module_title == "Unknown":
+ continue
+
+ # Get module items
+ module_items, content_type = page_parser.get_module_items(module_info)
+
+ # Skip empty modules
+ if not module_items:
+ continue
+
+ # For all modules, collect items based on title
+ if module_title not in combined_modules:
+ combined_modules[module_title] = []
+ module_content_types[module_title] = content_type
+ module_page_names[module_title] = page_name
+ else:
+ # If we already have this module title, update the content type
+ # if this module has more items than we already collected
+ current_items_count = len(combined_modules[module_title])
+ if len(module_items) > current_items_count:
+ module_content_types[module_title] = content_type
+
+ # Add items to the combined collection
+ combined_modules[module_title].extend(module_items)
+
+ except (KeyError, ValueError, TypeError, AttributeError) as err:
+ self.logger.warning(
+ "Error processing module %s from %s: %s",
+ module_info.get("title", "Unknown"),
+ page_name,
+ err,
+ )
+
+ def _create_recommendation_folders(
+ self,
+ combined_modules: dict[str, list[Playlist | Album | Track | Artist]],
+ module_content_types: dict[str, MediaType],
+ module_page_names: dict[str, str],
+ ) -> list[RecommendationFolder]:
+ """Create recommendation folders from combined modules."""
+ results: list[RecommendationFolder] = []
+
+ # Helper function to determine icon based on content type
+ def get_icon_for_type(media_type: MediaType) -> str:
+ if media_type == MediaType.PLAYLIST:
+ return "mdi-playlist-music"
+ elif media_type == MediaType.ALBUM:
+ return "mdi-album"
+ elif media_type == MediaType.TRACK:
+ return "mdi-file-music"
+ elif media_type == MediaType.ARTIST:
+ return "mdi-account-music"
+ return "mdi-motion-play" # Default for mixed content
+
+ for module_title, items in combined_modules.items():
+ # Use unique items list to prevent duplicates
+ unique_items = UniqueList(items)
+
+ # Create a sanitized unique ID
+ item_id = "".join(
+ c
+ for c in module_title.lower().replace(" ", "_").replace("-", "_")
+ if c.isalnum() or c == "_"
+ )
+
+ # Get content type and page source
+ content_type = module_content_types.get(module_title, MediaType.PLAYLIST)
+ page_name = module_page_names.get(module_title, "Tidal")
+
+ # Create folder with combined items
+ folder = RecommendationFolder(
+ item_id=item_id,
+ name=module_title,
+ provider=self.lookup_key,
+ items=UniqueList[MediaItemTypeOrItemMapping](unique_items),
+ subtitle=f"From {page_name} • {len(unique_items)} items",
+ translation_key=item_id,
+ icon=get_icon_for_type(content_type),
+ )
+ results.append(folder)
+
+ # Log a message if we combined multiple sources
+ if len(unique_items) < len(items):
+ self.logger.debug(
+ "Combined %d items into %d unique items for '%s'",
+ len(items),
+ len(unique_items),
+ module_title,
+ )
+
+ return results
+
def _process_track_results(
self, track_objects: list[dict[str, Any]], offset: int
) -> list[Track]:
module_data = module_info.get("raw_data", {})
module_type = module_data.get("type", "")
- # Extract items based on module type
- if module_type == "HIGHLIGHT_MODULE":
- self._process_highlight_module(module_data, result, type_counts)
- elif module_type == "MIXED_TYPES_LIST" or "pagedList" in module_data:
- self._process_paged_list(module_data, module_type, result, type_counts)
+ self.logger.debug(
+ "Processing module type: %s, title: %s",
+ module_type,
+ module_data.get("title", "Unknown"),
+ )
+
+ # Process module based on type
+ self._process_module_by_type(module_data, module_type, result, type_counts)
# Determine the primary content type based on counts
primary_type = self._determine_primary_type(type_counts)
+
+ self._log_module_results(module_data, result, type_counts)
+
return result, primary_type
+ def _process_module_by_type(
+ self,
+ module_data: dict[str, Any],
+ module_type: str,
+ result: list[Playlist | Album | Track | Artist],
+ type_counts: dict[MediaType, int],
+ ) -> None:
+ """Process module content based on module type."""
+ # Extract paged list if present (most modules have this)
+ paged_list = module_data.get("pagedList", {})
+ items = paged_list.get("items", [])
+
+ # Different module types have different content structures
+ if module_type == "PLAYLIST_LIST":
+ self._process_playlist_list(items, result, type_counts)
+ elif module_type == "TRACK_LIST":
+ self._process_track_list(items, result, type_counts)
+ elif module_type == "ALBUM_LIST":
+ self._process_album_list(items, result, type_counts)
+ elif module_type == "ARTIST_LIST":
+ self._process_artist_list(items, result, type_counts)
+ elif module_type == "MIX_LIST":
+ self._process_mix_list(items, result, type_counts)
+ elif module_type == "HIGHLIGHT_MODULE":
+ self._process_highlight_module(module_data, result, type_counts)
+ else:
+ # Generic fallback for other module types
+ self._process_generic_items(items, result, type_counts)
+
+ def _process_playlist_list(
+ self,
+ items: list[dict[str, Any]],
+ result: list[Playlist | Album | Track | Artist],
+ type_counts: dict[MediaType, int],
+ ) -> None:
+ """Process items from a PLAYLIST_LIST module."""
+ for item in items:
+ if isinstance(item, dict):
+ # Check if item appears to be a mix
+ is_mix = "mixId" in item or "mixType" in item
+
+ try:
+ playlist = self.provider._parse_playlist(item, is_mix=is_mix)
+ result.append(playlist)
+ type_counts[MediaType.PLAYLIST] += 1
+ except (KeyError, ValueError, TypeError) as err:
+ self.logger.warning("Error parsing playlist: %s", err)
+ else:
+ # Skip non-dict items
+ pass
+
+ def _process_track_list(
+ self,
+ items: list[dict[str, Any]],
+ result: list[Playlist | Album | Track | Artist],
+ type_counts: dict[MediaType, int],
+ ) -> None:
+ """Process items from a TRACK_LIST module."""
+ for item in items:
+ if isinstance(item, dict):
+ try:
+ track = self.provider._parse_track(item)
+ result.append(track)
+ type_counts[MediaType.TRACK] += 1
+ except (KeyError, ValueError, TypeError) as err:
+ self.logger.warning("Error parsing track: %s", err)
+ else:
+ # Skip non-dict items
+ pass
+
+ def _process_album_list(
+ self,
+ items: list[dict[str, Any]],
+ result: list[Playlist | Album | Track | Artist],
+ type_counts: dict[MediaType, int],
+ ) -> None:
+ """Process items from an ALBUM_LIST module."""
+ for item in items:
+ if isinstance(item, dict):
+ try:
+ album = self.provider._parse_album(item)
+ result.append(album)
+ type_counts[MediaType.ALBUM] += 1
+ except (KeyError, ValueError, TypeError) as err:
+ self.logger.warning("Error parsing album: %s", err)
+ else:
+ # Skip non-dict items
+ pass
+
+ def _process_artist_list(
+ self,
+ items: list[dict[str, Any]],
+ result: list[Playlist | Album | Track | Artist],
+ type_counts: dict[MediaType, int],
+ ) -> None:
+ """Process items from an ARTIST_LIST module."""
+ for item in items:
+ if isinstance(item, dict):
+ try:
+ artist = self.provider._parse_artist(item)
+ result.append(artist)
+ type_counts[MediaType.ARTIST] += 1
+ except (KeyError, ValueError, TypeError) as err:
+ self.logger.warning("Error parsing artist: %s", err)
+ else:
+ # Skip non-dict items
+ pass
+
+ def _process_mix_list(
+ self,
+ items: list[dict[str, Any]],
+ result: list[Playlist | Album | Track | Artist],
+ type_counts: dict[MediaType, int],
+ ) -> None:
+ """Process items from a MIX_LIST module."""
+ for item in items:
+ if isinstance(item, dict):
+ try:
+ mix = self.provider._parse_playlist(item, is_mix=True)
+ result.append(mix)
+ type_counts[MediaType.PLAYLIST] += 1
+ except (KeyError, ValueError, TypeError) as err:
+ self.logger.warning("Error parsing mix: %s", err)
+ else:
+ # Skip non-dict items
+ pass
+
+ def _process_generic_items(
+ self,
+ items: list[dict[str, Any]],
+ result: list[Playlist | Album | Track | Artist],
+ type_counts: dict[MediaType, int],
+ ) -> None:
+ """Process items with generic type detection."""
+ for item in items:
+ if isinstance(item, dict):
+ # Try to determine item type from structure
+ try:
+ parsed_item = self._parse_item(item, type_counts)
+ if parsed_item:
+ result.append(parsed_item)
+ except (KeyError, ValueError, TypeError) as err:
+ self.logger.warning("Error parsing generic item: %s", err)
+ else:
+ # Skip non-dict items
+ pass
+
+ def _log_module_results(
+ self,
+ module_data: dict[str, Any],
+ result: list[Playlist | Album | Track | Artist],
+ type_counts: dict[MediaType, int],
+ ) -> None:
+ """Log detailed module processing results."""
+ self.logger.debug(
+ "Module '%s' processed: %d items (%d playlists, %d albums, %d tracks, %d artists)",
+ module_data.get("title", "Unknown"),
+ len(result),
+ type_counts[MediaType.PLAYLIST],
+ type_counts[MediaType.ALBUM],
+ type_counts[MediaType.TRACK],
+ type_counts[MediaType.ARTIST],
+ )
+
def _determine_primary_type(self, type_counts: dict[MediaType, int]) -> MediaType:
"""Determine the primary media type based on item counts."""
primary_type = MediaType.PLAYLIST # Default
type_counts: dict[MediaType, int],
item_type: str = "",
) -> Playlist | Album | Track | Artist | None:
- """Parse a single item from Tidal data into a media item."""
+ """Parse a single item from Tidal data into a media item.
+
+ Args:
+ item: Dictionary containing item data
+ type_counts: Dictionary to track counts by media type
+ item_type: Optional item type hint
+
+ Returns:
+ Parsed media item or None if parsing failed
+ """
# Handle nested item structure
- if isinstance(item, dict) and "type" in item and "item" in item:
+ if not item_type and isinstance(item, dict) and "type" in item and "item" in item:
item_type = item["type"]
item = item["item"]
# If no explicit type, try to infer from structure
if not item_type:
- if "mixType" in item or item.get("subTitle"):
+ if "mixId" in item or "mixType" in item:
item_type = "MIX"
elif "uuid" in item:
item_type = "PLAYLIST"
- elif "id" in item and "duration" in item:
+ elif "id" in item and "duration" in item and "album" in item:
item_type = "TRACK"
- elif "id" in item and "artist" in item and "numberOfTracks" in item:
+ elif "id" in item and "numberOfTracks" in item and "artists" in item:
item_type = "ALBUM"
+ elif "id" in item and "picture" in item and "name" in item and "album" not in item:
+ item_type = "ARTIST"
# Parse based on detected type
try:
media_item = self.provider._parse_artist(item)
type_counts[MediaType.ARTIST] += 1
return media_item
- return None
+ else:
+ # Last resort - try to infer from structure for unlabeled items
+ if "uuid" in item:
+ media_item = self.provider._parse_playlist(item)
+ type_counts[MediaType.PLAYLIST] += 1
+ return media_item
+ elif "id" in item and "title" in item and "duration" in item:
+ media_item = self.provider._parse_track(item)
+ type_counts[MediaType.TRACK] += 1
+ return media_item
+ elif "id" in item and "title" in item and "numberOfTracks" in item:
+ media_item = self.provider._parse_album(item)
+ type_counts[MediaType.ALBUM] += 1
+ return media_item
+
+ self.logger.warning("Unknown item type, could not parse: %s", item)
+ return None
+
except (KeyError, ValueError, TypeError) as err:
- # Data structure errors
- self.logger.debug("Error parsing item data structure: %s", err)
+ self.logger.debug("Error parsing %s item: %s", item_type, err)
return None
except AttributeError as err:
- # Missing attribute errors
- self.logger.debug("Missing attribute in item: %s", err)
+ self.logger.debug("Attribute error parsing %s item: %s", item_type, err)
return None
except (json.JSONDecodeError, UnicodeError) as err:
- # JSON/text encoding issues
- self.logger.debug("Error decoding item content: %s", err)
+ self.logger.debug("JSON/Unicode error parsing %s item: %s", item_type, err)
return None
@classmethod