from enum import StrEnum
from typing import TYPE_CHECKING, Any, TypeVar, cast
-from aiohttp import ClientResponse
+from aiohttp import ClientConnectionError, ClientResponse
from aiohttp.client_exceptions import (
ClientConnectorError,
ClientError,
MediaItemType,
Playlist,
ProviderMapping,
+ RecommendationFolder,
SearchResults,
Track,
UniqueList,
)
from music_assistant_models.streamdetails import StreamDetails
-from music_assistant.constants import CACHE_CATEGORY_DEFAULT
+from music_assistant.constants import (
+ CACHE_CATEGORY_DEFAULT,
+ CACHE_CATEGORY_RECOMMENDATIONS,
+)
from music_assistant.helpers.throttle_retry import (
ThrottlerManager,
throttle_with_retries,
from music_assistant.models.music_provider import MusicProvider
from .auth_manager import ManualAuthenticationHelper, TidalAuthManager
+from .tidal_page_parser import TidalPageParser
if TYPE_CHECKING:
from collections.abc import AsyncGenerator
"""Implementation of a Tidal MusicProvider."""
BASE_URL: str = "https://api.tidal.com/v1"
- OPEN_API_URL: str = "https://openapi.tidal.com/v2/"
+ BASE_URL_V2: str = "https://api.tidal.com/v2"
+ OPEN_API_URL: str = "https://openapi.tidal.com/v2"
throttler = ThrottlerManager(rate_limit=1, period=2)
config_updater=self._update_auth_config,
logger=self.logger,
)
+ self.page_cache_ttl = 12 * 3600
def _update_auth_config(self, auth_info: dict[str, Any]) -> None:
"""Update auth config with new auth info."""
ProviderFeature.SIMILAR_TRACKS,
ProviderFeature.BROWSE,
ProviderFeature.PLAYLIST_TRACKS_EDIT,
+ ProviderFeature.RECOMMENDATIONS,
}
#
async def get_playlist(self, prov_playlist_id: str) -> Playlist:
"""Get playlist details for given playlist id."""
+ # Check if this is a mix by ID prefix
+ is_mix = prov_playlist_id.startswith("mix_")
+
+ if is_mix:
+ # Strip prefix and use mix API
+ actual_id = prov_playlist_id[4:] # Remove "mix_" prefix
+ try:
+ return await self._get_mix_details(actual_id)
+ except ResourceTemporarilyUnavailable:
+ raise
+ except (ClientError, KeyError, ValueError) as err:
+ raise MediaNotFoundError(f"Mix {prov_playlist_id} not found") from err
+
+ # Try regular playlist endpoint
try:
api_result = await self._get_data(f"playlists/{prov_playlist_id}")
playlist_obj = self._extract_data(api_result)
return self._parse_playlist(playlist_obj)
+ except MediaNotFoundError:
+ # If not found, try as a Tidal mix (might be unidentified mix)
+ self.logger.debug("Playlist %s not found, trying as Tidal Mix", prov_playlist_id)
+ try:
+ return await self._get_mix_details(prov_playlist_id)
+ except ResourceTemporarilyUnavailable:
+ raise
+ except (ClientError, KeyError, ValueError) as err:
+ # Re-raise the original error with the requested ID
+ raise MediaNotFoundError(f"Playlist {prov_playlist_id} not found") from err
except ResourceTemporarilyUnavailable:
raise
except (ClientError, KeyError, ValueError) as err:
raise MediaNotFoundError(f"Playlist {prov_playlist_id} not found") from err
+ async def _get_mix_details(self, prov_mix_id: str) -> Playlist:
+ """Get details for a Tidal Mix."""
+ try:
+ params = {"mixId": prov_mix_id, "deviceType": "BROWSER"}
+ api_result = await self._get_data("pages/mix", params=params)
+ tidal_mix = self._extract_data(api_result)
+
+ # Extract mix details from page data
+ if "title" not in tidal_mix:
+ raise MediaNotFoundError(f"Mix {prov_mix_id} not found")
+
+ # Create basic mix object with required fields
+ mix_obj = {
+ "id": prov_mix_id,
+ "title": tidal_mix.get("title", "Unknown Mix"),
+ "images": tidal_mix.get("images", {}),
+ "updated": tidal_mix.get("lastUpdated", ""),
+ }
+
+ return self._parse_playlist(mix_obj, is_mix=True)
+ except ResourceTemporarilyUnavailable:
+ raise
+ except (ClientError, KeyError, ValueError) as err:
+ raise MediaNotFoundError(f"Mix {prov_mix_id} not found") from err
+
async def get_album_tracks(self, prov_album_id: str) -> list[Track]:
"""Get album tracks for given album id."""
try:
raise MediaNotFoundError(f"Artist {prov_artist_id} not found") from err
async def get_playlist_tracks(self, prov_playlist_id: str, page: int = 0) -> list[Track]:
- """Get playlist tracks."""
- result: list[Track] = []
+ """Get playlist tracks for either regular playlists or Tidal mixes."""
page_size = 200
offset = page * page_size
+
+ # Check if this is a mix by ID prefix
+ is_mix = prov_playlist_id.startswith("mix_")
+
+ if is_mix:
+ # Strip prefix and use mix API
+ actual_id = prov_playlist_id[4:] # Remove "mix_" prefix
+ try:
+ return await self._get_mix_playlist_tracks(actual_id, page_size, offset)
+ except ResourceTemporarilyUnavailable:
+ raise
+ except (ClientError, KeyError, ValueError) as err:
+ raise MediaNotFoundError(f"Mix playlist {prov_playlist_id} not found") from err
+
+ # Otherwise try regular endpoint first, fall back only if needed
try:
- api_result = await self._get_data(
- f"playlists/{prov_playlist_id}/tracks",
- params={"limit": page_size, "offset": offset},
+ return await self._get_regular_playlist_tracks(prov_playlist_id, page_size, offset)
+ except MediaNotFoundError:
+ self.logger.debug("Playlist not found, trying as Tidal Mix")
+ try:
+ return await self._get_mix_playlist_tracks(prov_playlist_id, page_size, offset)
+ except ResourceTemporarilyUnavailable:
+ raise
+ except (ClientError, KeyError, ValueError) as err:
+ # Re-raise the original error with the requested ID
+ raise MediaNotFoundError(f"Playlist {prov_playlist_id} not found") from err
+
+ async def _get_regular_playlist_tracks(
+ self, prov_playlist_id: str, page_size: int, offset: int
+ ) -> list[Track]:
+ """Get tracks from a regular Tidal playlist."""
+ api_result = await self._get_data(
+ f"playlists/{prov_playlist_id}/tracks",
+ params={"limit": page_size, "offset": offset},
+ )
+ tidal_tracks = self._extract_data(api_result)
+
+ return self._process_track_results(tidal_tracks.get("items", []), offset)
+
+ async def _get_mix_playlist_tracks(
+ self, prov_playlist_id: str, page_size: int, offset: int
+ ) -> list[Track]:
+ """Get tracks from a Tidal Mix playlist."""
+ try:
+ params = {"mixId": prov_playlist_id, "deviceType": "BROWSER"}
+ api_result = await self._get_data("pages/mix", params=params)
+ tidal_mix = self._extract_data(api_result)
+
+ # Verify we have the expected structure
+ if "rows" not in tidal_mix or len(tidal_mix["rows"]) < 2:
+ raise MediaNotFoundError(f"Invalid mix structure for {prov_playlist_id}")
+
+ module = tidal_mix["rows"][1]["modules"][0] if len(tidal_mix["rows"]) > 1 else None
+ if not module or "pagedList" not in module:
+ raise MediaNotFoundError(f"Invalid mix module for {prov_playlist_id}")
+
+ all_tracks = module["pagedList"].get("items", [])
+
+ # Manually paginate the results
+ start_idx = min(offset, len(all_tracks))
+ end_idx = min(offset + page_size, len(all_tracks))
+ paginated_tracks = all_tracks[start_idx:end_idx]
+
+ self.logger.debug(
+ "Mix tracks - total: %d, page: %d, returning: %d tracks",
+ len(all_tracks),
+ offset // page_size,
+ len(paginated_tracks),
)
- tidal_tracks = self._extract_data(api_result)
- for index, track_obj in enumerate(tidal_tracks.get("items", []), 1):
- track = self._parse_track(track_obj=track_obj)
- track.position = offset + index
- result.append(track)
- return result
+
+ return self._process_track_results(paginated_tracks, offset)
except ResourceTemporarilyUnavailable:
raise
except (ClientError, KeyError, ValueError) as err:
raise MediaNotFoundError(f"Playlist {prov_playlist_id} not found") from err
+ async def recommendations(self) -> list[RecommendationFolder]:
+ """Get this provider's recommendations organized into folders."""
+ # Check cache first
+ cache_key = f"tidal_recommendations_{self.lookup_key}"
+ cached_recommendations: list[RecommendationFolder] = await self.mass.cache.get(
+ cache_key, category=CACHE_CATEGORY_RECOMMENDATIONS, base_key=self.lookup_key
+ )
+
+ if cached_recommendations:
+ self.logger.debug("Returning cached recommendations (TTL: 1 hour)")
+ return cached_recommendations
+
+ results: list[RecommendationFolder] = []
+
+ try:
+ # Get page content
+ home_parser = await self.get_page_content("pages/home")
+
+ # Helper function to determine icon based on content type
+ def get_icon_for_type(media_type: MediaType) -> str:
+ if media_type == MediaType.PLAYLIST:
+ return "mdi-playlist-music"
+ elif media_type == MediaType.ALBUM:
+ return "mdi-album"
+ elif media_type == MediaType.TRACK:
+ return "mdi-file-music"
+ elif media_type == MediaType.ARTIST:
+ return "mdi-account-music"
+ return "mdi-motion-play" # Default for mixed content
+
+ # Collection for all "Because you listened to" items
+ because_items = []
+ because_modules = set()
+
+ # Process all modules in a single pass
+ for module_info in home_parser._module_map:
+ try:
+ module_title = module_info.get("title", "Unknown")
+
+ # Skip modules without proper titles
+ if not module_title or module_title == "Unknown":
+ continue
+
+ # Check if it's a "because you listened to" module
+ pre_title = module_info.get("raw_data", {}).get("preTitle")
+ is_because_module = pre_title and "because you listened to" in pre_title.lower()
+
+ # Get module items
+ module_items, content_type = home_parser.get_module_items(module_info)
+
+ # Skip empty modules
+ if not module_items:
+ continue
+
+ # Create folder description and icon
+ subtitle = f"Tidal {content_type.name.lower()} collection"
+ icon = get_icon_for_type(content_type)
+
+ if is_because_module:
+ # Add items to collective "Because you listened to" list
+ because_items.extend(module_items)
+ because_modules.add(module_title)
+ else:
+ # Create regular recommendation folder
+ item_id = "".join(
+ c
+ for c in module_title.lower().replace(" ", "_").replace("-", "_")
+ if c.isalnum() or c == "_"
+ )
+
+ folder = RecommendationFolder(
+ item_id=item_id,
+ name=f"{self.lookup_key} - {module_title}",
+ provider=self.lookup_key,
+ items=UniqueList(module_items),
+ subtitle=subtitle,
+ icon=icon,
+ )
+ results.append(folder)
+
+ except (KeyError, ValueError, TypeError, AttributeError) as err:
+ self.logger.warning(
+ "Error processing module %s: %s",
+ module_info.get("title", "Unknown"),
+ err,
+ )
+
+ # Create a single folder for all "Because you listened to" items if we have any
+ if because_items:
+ sources_summary = " - ".join(sorted(because_modules))
+ folder_subtitle = f"Recommendations based on: {sources_summary}"
+
+ because_folder = RecommendationFolder(
+ item_id="because_you_listened_to",
+ name=f"{self.lookup_key} - Because You Listened To: {sources_summary}",
+ provider=self.lookup_key,
+ items=UniqueList(because_items),
+ subtitle=folder_subtitle,
+ icon="mdi-headphones-box",
+ )
+ # Add as first item in results
+ results.insert(0, because_folder)
+
+ self.logger.debug(
+ "Created 'Because You Listened To' folder with %d items from %d modules",
+ len(because_items),
+ len(because_modules),
+ )
+
+ self.logger.debug("Created %d recommendation folders from Tidal", len(results))
+
+ # Cache the results for 1 hour (3600 seconds)
+ await self.mass.cache.set(
+ cache_key,
+ results,
+ category=CACHE_CATEGORY_RECOMMENDATIONS,
+ base_key=self.lookup_key,
+ expiration=3600,
+ )
+
+ except (ClientError, ResourceTemporarilyUnavailable) as err:
+ # Network-related errors
+ self.logger.warning("Network error fetching Tidal recommendations: %s", err)
+ except (KeyError, ValueError, TypeError, json.JSONDecodeError) as err:
+ # Data parsing errors
+ self.logger.warning("Error parsing Tidal recommendations data: %s", err)
+ except (
+ ClientConnectionError,
+ ClientConnectorError,
+ ClientResponseError,
+ ClientPayloadError,
+ ) as err:
+ # More specific network errors
+ self.logger.warning("Network error in Tidal recommendations: %s", err)
+
+ return results
+
+ def _process_track_results(
+ self, track_objects: list[dict[str, Any]], offset: int
+ ) -> list[Track]:
+ """Process track objects into Track objects with positions."""
+ result: list[Track] = []
+ for index, track_obj in enumerate(track_objects, 1):
+ try:
+ track = self._parse_track(track_obj)
+ track.position = offset + index
+ result.append(track)
+ except (KeyError, TypeError) as err:
+ self.logger.warning("Error parsing track: %s", err)
+ continue
+ return result
+
async def get_stream_details(
self, item_id: str, media_type: MediaType = MediaType.TRACK
) -> StreamDetails:
manifest_type = stream_data.get("manifestMimeType", "")
is_mpd = "dash+xml" in manifest_type
- if is_mpd:
+ if is_mpd and "manifest" in stream_data:
url = f"data:application/dash+xml;base64,{stream_data['manifest']}"
else:
# For non-MPD streams, use the direct URL
- url = stream_data.get("urls", [None])[0]
- if not url:
+ urls = stream_data.get("urls", [])
+ if not urls:
raise MediaNotFoundError(f"No stream URL for track {item_id}")
+ url = urls[0]
# Determine audio format info
bit_depth = stream_data.get("bitDepth", 16)
sample_rate = stream_data.get("sampleRate", 44100)
audio_quality: str | None = stream_data.get("audioQuality")
- if audio_quality in ("HI_RES_LOSSLESS", "LOSSLESS"):
+ if audio_quality in ("HIRES_LOSSLESS", "HI_RES_LOSSLESS", "LOSSLESS"):
content_type = ContentType.FLAC
elif codec := stream_data.get("codec"):
content_type = ContentType.try_parse(codec)
# Get tracks by ISRC using direct API
api_result = await self._get_data(
- "tracks",
+ "/tracks",
params={
"filter[isrc]": isrc,
},
# LIBRARY MANAGEMENT
#
+ async def get_page_content(self, page_path: str = "pages/home") -> TidalPageParser:
+ """Get a lazy page parser for a Tidal page."""
+ # Try to get from cache first
+ cached_parser = await TidalPageParser.from_cache(self, page_path)
+ if cached_parser:
+ self.logger.debug(
+ "Using cached page content for '%s' (age: %.1f minutes)",
+ page_path,
+ cached_parser.content_stats.get("cache_age_minutes", 0),
+ )
+ return cached_parser
+
+ # Not in cache or expired, fetch fresh content
+ try:
+ # Get the page structure
+ self.logger.debug("Fetching fresh page content for '%s'", page_path)
+ api_result = await self._get_data(
+ page_path,
+ base_url="https://listen.tidal.com/v1",
+ params={
+ "locale": "en_US",
+ "deviceType": "BROWSER",
+ "countryCode": self.auth.country_code or "US",
+ },
+ )
+
+ # Extract and build lazy parser
+ page_data = self._extract_data(api_result) or {}
+ parser = TidalPageParser(self)
+ parser.parse_page_structure(page_data, page_path)
+
+ self.logger.debug("Page '%s' indexed with: %s", page_path, parser.content_stats)
+
+ # Cache the parser data
+ cache_key = f"tidal_page_{page_path}"
+ cache_data = {
+ "module_map": parser._module_map,
+ "content_map": parser._content_map,
+ "parsed_at": parser._parsed_at,
+ }
+ await self.mass.cache.set(
+ cache_key,
+ cache_data,
+ category=CACHE_CATEGORY_RECOMMENDATIONS,
+ base_key=self.lookup_key,
+ expiration=self.page_cache_ttl,
+ )
+
+ return parser
+ except ResourceTemporarilyUnavailable:
+ # Network-related errors - propagate
+ raise
+ except (ClientError, ClientConnectorError, ClientPayloadError) as err:
+ # Network-related errors
+ self.logger.error("Network error fetching Tidal page: %s", err)
+ return TidalPageParser(self) # Return empty parser
+ except (KeyError, ValueError, TypeError, json.JSONDecodeError) as err:
+ # Data parsing errors
+ self.logger.error("Error parsing Tidal page data: %s", err)
+ return TidalPageParser(self) # Return empty parser
+
async def get_library_artists(self) -> AsyncGenerator[Artist, None]:
"""Retrieve all library artists from Tidal."""
user_id = self.auth.user_id
async def get_library_playlists(self) -> AsyncGenerator[Playlist, None]:
"""Retrieve all library playlists from the provider."""
user_id = self.auth.user_id
- path = f"users/{user_id}/playlistsAndFavoritePlaylists"
+ playlist_path = f"users/{user_id}/playlistsAndFavoritePlaylists"
- async for playlist_item in self._paginate_api(path, nested_key="playlist"):
+ async for playlist_item in self._paginate_api(playlist_path, nested_key="playlist"):
if playlist_item and playlist_item.get("uuid"):
yield self._parse_playlist(playlist_item)
def _parse_album(self, album_obj: dict[str, Any]) -> Album:
"""Parse tidal album object to generic layout."""
- name = album_obj["title"]
- version = album_obj["version"] or ""
- album_id = str(album_obj["id"])
+ name = album_obj.get("title", "Unknown Album")
+ version = album_obj.get("version", "") or ""
+ album_id = str(album_obj.get("id", ""))
+
album = Album(
item_id=album_id,
provider=self.lookup_key,
content_type=ContentType.FLAC,
),
url=f"https://tidal.com/album/{album_id}",
- available=album_obj["streamReady"],
+ available=album_obj.get("streamReady", True), # Default to available
)
},
)
- various_artist_album: bool = False
- for artist_obj in album_obj["artists"]:
- if artist_obj["name"] == "Various Artists":
- various_artist_album = True
- album.artists.append(self._parse_artist(artist_obj))
- if album_obj["type"] == "COMPILATION" or various_artist_album:
+ # Safely handle artists array
+ various_artist_album: bool = False
+ for artist_obj in album_obj.get("artists", []):
+ try:
+ if artist_obj.get("name") == "Various Artists":
+ various_artist_album = True
+ album.artists.append(self._parse_artist(artist_obj))
+ except (KeyError, TypeError) as err:
+ self.logger.warning("Error parsing artist in album %s: %s", name, err)
+
+ # Safely determine album type
+ album_type = album_obj.get("type", "ALBUM")
+ if album_type == "COMPILATION" or various_artist_album:
album.album_type = AlbumType.COMPILATION
- elif album_obj["type"] == "ALBUM":
+ elif album_type == "ALBUM":
album.album_type = AlbumType.ALBUM
- elif album_obj["type"] == "EP":
+ elif album_type == "EP":
album.album_type = AlbumType.EP
- elif album_obj["type"] == "SINGLE":
+ elif album_type == "SINGLE":
album.album_type = AlbumType.SINGLE
- album.year = int(album_obj["releaseDate"].split("-")[0])
- # metadata
- if album_obj["upc"]:
- album.external_ids.add((ExternalID.BARCODE, album_obj["upc"]))
- album.metadata.copyright = album_obj["copyright"]
- album.metadata.explicit = album_obj["explicit"]
- album.metadata.popularity = album_obj["popularity"]
- if album_obj["cover"]:
- picture_id = album_obj["cover"].replace("-", "/")
+ # Safely parse year
+ release_date = album_obj.get("releaseDate", "")
+ if release_date:
+ try:
+ album.year = int(release_date.split("-")[0])
+ except (ValueError, IndexError):
+ self.logger.debug("Invalid release date format: %s", release_date)
+
+ # Safely set metadata
+ upc = album_obj.get("upc")
+ if upc:
+ album.external_ids.add((ExternalID.BARCODE, upc))
+
+ album.metadata.copyright = album_obj.get("copyright", "")
+ album.metadata.explicit = album_obj.get("explicit", False)
+ album.metadata.popularity = album_obj.get("popularity", 0)
+
+ # Safely handle cover image
+ cover = album_obj.get("cover")
+ if cover:
+ picture_id = cover.replace("-", "/")
image_url = f"{RESOURCES_URL}/{picture_id}/750x750.jpg"
album.metadata.images = UniqueList(
[
track_obj: dict[str, Any],
) -> Track:
"""Parse tidal track object to generic layout."""
- version = track_obj["version"] or ""
- track_id = str(track_obj["id"])
- hi_res_lossless = "HI_RES_LOSSLESS" in track_obj["mediaMetadata"]["tags"]
+ version = track_obj.get("version", "") or ""
+ track_id = str(track_obj.get("id", 0))
+ media_metadata = track_obj.get("mediaMetadata", {})
+ tags = media_metadata.get("tags", [])
+ hi_res_lossless = any(tag in tags for tag in ["HIRES_LOSSLESS", "HI_RES_LOSSLESS"])
track = Track(
item_id=track_id,
provider=self.lookup_key,
- name=track_obj["title"],
+ name=track_obj.get("title", "Unknown"),
version=version,
- duration=track_obj["duration"],
+ duration=track_obj.get("duration", 0),
provider_mappings={
ProviderMapping(
item_id=str(track_id),
available=track_obj["streamReady"],
)
},
- disc_number=track_obj["volumeNumber"] or 0,
- track_number=track_obj["trackNumber"] or 0,
+ disc_number=track_obj.get("volumeNumber", 0) or 0,
+ track_number=track_obj.get("trackNumber", 0) or 0,
)
- if track_obj["isrc"]:
+ if "isrc" in track_obj:
track.external_ids.add((ExternalID.ISRC, track_obj["isrc"]))
track.artists = UniqueList()
for track_artist in track_obj["artists"]:
# metadata
track.metadata.explicit = track_obj["explicit"]
track.metadata.popularity = track_obj["popularity"]
- track.metadata.copyright = track_obj["copyright"]
+ if "copyright" in track_obj:
+ track.metadata.copyright = track_obj["copyright"]
if track_obj["album"]:
# Here we use an ItemMapping as Tidal returns
# minimal data when getting an Album from a Track
)
return track
- def _parse_playlist(self, playlist_obj: dict[str, Any]) -> Playlist:
+ def _parse_playlist(self, playlist_obj: dict[str, Any], is_mix: bool = False) -> Playlist:
"""Parse tidal playlist object to generic layout."""
- playlist_id = str(playlist_obj["uuid"])
- creator_id = None
- if playlist_obj["creator"]:
- creator_id = playlist_obj["creator"]["id"]
- is_editable = bool(creator_id and str(creator_id) == str(self.auth.user_id))
-
- owner_name = "Tidal"
- if is_editable:
- if self.auth.user.profile_name:
- owner_name = self.auth.user.profile_name
- elif self.auth.user.user_name:
- owner_name = self.auth.user.user_name
- else:
- owner_name = str(self.auth.user_id or "Unknown")
+ # Get ID based on playlist type
+ raw_id = str(playlist_obj.get("id" if is_mix else "uuid", ""))
+
+ # Add prefix for mixes to distinguish them
+ playlist_id = f"mix_{raw_id}" if is_mix else raw_id
+
+ # Owner logic differs between types
+ if is_mix:
+ owner_name = "Created by Tidal"
+ is_editable = False
+ else:
+ creator_id = None
+ creator = playlist_obj.get("creator", {})
+ if creator:
+ creator_id = creator.get("id")
+ is_editable = bool(creator_id and str(creator_id) == str(self.auth.user_id))
+
+ owner_name = "Tidal"
+ if is_editable:
+ if self.auth.user.profile_name:
+ owner_name = self.auth.user.profile_name
+ elif self.auth.user.user_name:
+ owner_name = self.auth.user.user_name
+ elif self.auth.user_id:
+ owner_name = str(self.auth.user_id)
+
+ # URL path differs by type - use raw_id for URLs
+ url_path = "mix" if is_mix else "playlist"
+
playlist = Playlist(
item_id=playlist_id,
provider=self.instance_id if is_editable else self.lookup_key,
- name=playlist_obj["title"],
+ name=playlist_obj.get("title", "Unknown"),
owner=owner_name,
provider_mappings={
ProviderMapping(
- item_id=playlist_id,
+ item_id=playlist_id, # Use raw ID for provider mapping
provider_domain=self.domain,
provider_instance=self.instance_id,
- url=f"{BROWSE_URL}/playlist/{playlist_id}",
+ url=f"{BROWSE_URL}/{url_path}/{raw_id}",
)
},
is_editable=is_editable,
)
- # metadata
- playlist.cache_checksum = str(playlist_obj["lastUpdated"])
- playlist.metadata.popularity = playlist_obj["popularity"]
- if picture := (playlist_obj["squareImage"] or playlist_obj["image"]):
+
+ # Metadata - different fields based on type
+ if is_mix:
+ playlist.cache_checksum = str(playlist_obj.get("updated", ""))
+ else:
+ playlist.cache_checksum = str(playlist_obj.get("lastUpdated", ""))
+ if "popularity" in playlist_obj:
+ playlist.metadata.popularity = playlist_obj.get("popularity", 0)
+
+ # Handle images differently based on type
+ if is_mix:
+ if pictures := playlist_obj.get("images", {}).get("MEDIUM"):
+ image_url = pictures.get("url", "")
+ if image_url:
+ playlist.metadata.images = UniqueList(
+ [
+ MediaItemImage(
+ type=ImageType.THUMB,
+ path=image_url,
+ provider=self.lookup_key,
+ remotely_accessible=True,
+ )
+ ]
+ )
+ elif picture := (playlist_obj.get("squareImage") or playlist_obj.get("image")):
picture_id = picture.replace("-", "/")
image_url = f"{RESOURCES_URL}/{picture_id}/750x750.jpg"
playlist.metadata.images = UniqueList(
--- /dev/null
+"""Parser for Tidal page structures with lazy loading."""
+
+from __future__ import annotations
+
+import json
+import time
+from typing import TYPE_CHECKING, Any
+
+from music_assistant_models.enums import MediaType
+
+from music_assistant.constants import CACHE_CATEGORY_RECOMMENDATIONS
+
+if TYPE_CHECKING:
+ from music_assistant_models.media_items import Album, Artist, Playlist, Track
+
+ from music_assistant.providers.tidal import TidalProvider
+
+
+class TidalPageParser:
+ """Parser for Tidal page structures with lazy loading."""
+
+ def __init__(self, provider: TidalProvider) -> None:
+ """Initialize the parser with the Tidal provider instance."""
+ self.provider = provider
+ self.logger = provider.logger
+ self._content_map: dict[str, dict[str, Any]] = {
+ "MIX": {},
+ "PLAYLIST": {},
+ "ALBUM": {},
+ "TRACK": {},
+ "ARTIST": {},
+ }
+ self._module_map: list[dict[str, Any]] = []
+ self._page_path: str | None = None
+ self._parsed_at: int = 0
+
+ def parse_page_structure(self, page_data: dict[str, Any], page_path: str) -> None:
+ """Parse Tidal page structure into indexed modules."""
+ self._page_path = page_path
+ self._parsed_at = int(time.time())
+ self._module_map = []
+
+ # Extract modules from rows
+ module_idx = 0
+ for row_idx, row in enumerate(page_data.get("rows", [])):
+ for module in row.get("modules", []):
+ # Store basic module info for later processing
+ module_info = {
+ "title": module.get("title", ""),
+ "type": module.get("type", ""),
+ "raw_data": module,
+ "module_idx": module_idx,
+ "row_idx": row_idx,
+ }
+ self._module_map.append(module_info)
+ module_idx += 1
+
+ def get_module_items(
+ self, module_info: dict[str, Any]
+ ) -> tuple[list[Playlist | Album | Track | Artist], MediaType]:
+ """Extract media items from a module with simplified type handling."""
+ result: list[Playlist | Album | Track | Artist] = []
+ type_counts: dict[MediaType, int] = {
+ MediaType.PLAYLIST: 0,
+ MediaType.ALBUM: 0,
+ MediaType.TRACK: 0,
+ MediaType.ARTIST: 0,
+ }
+
+ module_data = module_info.get("raw_data", {})
+ module_type = module_data.get("type", "")
+
+ # Extract items based on module type
+ if module_type == "HIGHLIGHT_MODULE":
+ self._process_highlight_module(module_data, result, type_counts)
+ elif module_type == "MIXED_TYPES_LIST" or "pagedList" in module_data:
+ self._process_paged_list(module_data, module_type, result, type_counts)
+
+ # Determine the primary content type based on counts
+ primary_type = self._determine_primary_type(type_counts)
+ return result, primary_type
+
+ def _determine_primary_type(self, type_counts: dict[MediaType, int]) -> MediaType:
+ """Determine the primary media type based on item counts."""
+ primary_type = MediaType.PLAYLIST # Default
+ max_count = 0
+ for media_type, count in type_counts.items():
+ if count > max_count:
+ max_count = count
+ primary_type = media_type
+ return primary_type
+
+ def _process_highlight_module(
+ self,
+ module_data: dict[str, Any],
+ result: list[Playlist | Album | Track | Artist],
+ type_counts: dict[MediaType, int],
+ ) -> None:
+ """Process highlights from a HIGHLIGHT_MODULE."""
+ highlights = module_data.get("highlight", [])
+ for highlight in highlights:
+ if isinstance(highlight, dict): # Make sure highlight is a dict
+ highlight_item = highlight.get("item", {})
+ highlight_type = highlight.get("type", "")
+ if isinstance(highlight_item, dict):
+ if parsed_item := self._parse_item(highlight_item, type_counts, highlight_type):
+ result.append(parsed_item)
+
+ def _process_paged_list(
+ self,
+ module_data: dict[str, Any],
+ module_type: str,
+ result: list[Playlist | Album | Track | Artist],
+ type_counts: dict[MediaType, int],
+ ) -> None:
+ """Process items from a paged list module."""
+ paged_list = module_data.get("pagedList", {})
+ items = paged_list.get("items", [])
+
+ # Handle module-specific type inference
+ inferred_type: str | None = None
+ if module_type in {"ALBUM_LIST", "TRACK_LIST", "PLAYLIST_LIST", "MIX_LIST"}:
+ inferred_type = module_type.replace("_LIST", "")
+
+ # Process each item
+ for item in items:
+ if not item or not isinstance(item, dict):
+ continue
+
+ # Use inferred type if no explicit type
+ item_type = item.get("type", inferred_type)
+ if parsed_item := self._parse_item(item, type_counts, item_type):
+ result.append(parsed_item)
+
+ def _parse_item(
+ self,
+ item: dict[str, Any],
+ type_counts: dict[MediaType, int],
+ item_type: str = "",
+ ) -> Playlist | Album | Track | Artist | None:
+ """Parse a single item from Tidal data into a media item."""
+ # Handle nested item structure
+ if isinstance(item, dict) and "type" in item and "item" in item:
+ item_type = item["type"]
+ item = item["item"]
+
+ # If no explicit type, try to infer from structure
+ if not item_type:
+ if "mixType" in item or item.get("subTitle"):
+ item_type = "MIX"
+ elif "uuid" in item:
+ item_type = "PLAYLIST"
+ elif "id" in item and "duration" in item:
+ item_type = "TRACK"
+ elif "id" in item and "artist" in item and "numberOfTracks" in item:
+ item_type = "ALBUM"
+
+ # Parse based on detected type
+ try:
+ if item_type == "MIX":
+ media_item: Playlist | Album | Track | Artist = self.provider._parse_playlist(
+ item, is_mix=True
+ )
+ type_counts[MediaType.PLAYLIST] += 1
+ return media_item
+ elif item_type == "PLAYLIST":
+ media_item = self.provider._parse_playlist(item)
+ type_counts[MediaType.PLAYLIST] += 1
+ return media_item
+ elif item_type == "ALBUM":
+ media_item = self.provider._parse_album(item)
+ type_counts[MediaType.ALBUM] += 1
+ return media_item
+ elif item_type == "TRACK":
+ media_item = self.provider._parse_track(item)
+ type_counts[MediaType.TRACK] += 1
+ return media_item
+ elif item_type == "ARTIST":
+ media_item = self.provider._parse_artist(item)
+ type_counts[MediaType.ARTIST] += 1
+ return media_item
+ return None
+ except (KeyError, ValueError, TypeError) as err:
+ # Data structure errors
+ self.logger.debug("Error parsing item data structure: %s", err)
+ return None
+ except AttributeError as err:
+ # Missing attribute errors
+ self.logger.debug("Missing attribute in item: %s", err)
+ return None
+ except (json.JSONDecodeError, UnicodeError) as err:
+ # JSON/text encoding issues
+ self.logger.debug("Error decoding item content: %s", err)
+ return None
+
+ @classmethod
+ async def from_cache(cls, provider: TidalProvider, page_path: str) -> TidalPageParser | None:
+ """Create a parser instance from cached data if available and valid."""
+ cache_key = f"tidal_page_{page_path}"
+ cached_data = await provider.mass.cache.get(
+ cache_key,
+ category=CACHE_CATEGORY_RECOMMENDATIONS,
+ base_key=provider.lookup_key,
+ )
+ if not cached_data:
+ return None
+
+ parser = cls(provider)
+ parser._page_path = page_path
+ parser._module_map = cached_data.get("module_map", [])
+ parser._content_map = cached_data.get("content_map", {})
+ parser._parsed_at = cached_data.get("parsed_at", 0)
+
+ return parser
+
+ @property
+ def content_stats(self) -> dict[str, int | float]:
+ """Get statistics about the parsed content."""
+ stats = {
+ "modules": len(self._module_map),
+ "cache_age_minutes": (time.time() - self._parsed_at) / 60,
+ }
+
+ for media_type, items in self._content_map.items():
+ stats[f"{media_type.lower()}_count"] = len(items)
+
+ return stats