From 23102444c5c663365dea6a48a41a2d7f0deb404b Mon Sep 17 00:00:00 2001 From: Jozef Kruszynski <60214390+jozefKruszynski@users.noreply.github.com> Date: Wed, 26 Nov 2025 20:28:44 +0100 Subject: [PATCH] Tidal: maintenance improvements (#2654) --- music_assistant/providers/tidal/__init__.py | 1763 +---------------- music_assistant/providers/tidal/api_client.py | 205 ++ .../providers/tidal/auth_manager.py | 6 +- music_assistant/providers/tidal/library.py | 146 ++ music_assistant/providers/tidal/media.py | 228 +++ music_assistant/providers/tidal/parsers.py | 317 +++ music_assistant/providers/tidal/playlist.py | 74 + music_assistant/providers/tidal/provider.py | 244 +++ .../providers/tidal/recommendations.py | 162 ++ music_assistant/providers/tidal/streaming.py | 137 ++ .../providers/tidal/tidal_page_parser.py | 31 +- pyproject.toml | 4 + tests/providers/tidal/__init__.py | 1 + .../tidal/__snapshots__/test_parsers.ambr | 496 +++++ .../tidal/fixtures/albums/album.json | 41 + .../tidal/fixtures/artists/artist.json | 15 + .../providers/tidal/fixtures/pages/home.json | 70 + .../tidal/fixtures/playlists/mix.json | 43 + .../tidal/fixtures/playlists/playlist.json | 23 + .../tidal/fixtures/tracks/track.json | 49 + tests/providers/tidal/test_api_client.py | 217 ++ tests/providers/tidal/test_auth_manager.py | 173 ++ tests/providers/tidal/test_library.py | 226 +++ tests/providers/tidal/test_media.py | 237 +++ tests/providers/tidal/test_media_extended.py | 171 ++ tests/providers/tidal/test_page_parser.py | 58 + .../tidal/test_page_parser_extended.py | 519 +++++ tests/providers/tidal/test_parsers.py | 93 + tests/providers/tidal/test_playlist.py | 82 + tests/providers/tidal/test_provider.py | 379 ++++ tests/providers/tidal/test_recommendations.py | 99 + tests/providers/tidal/test_streaming.py | 363 ++++ 32 files changed, 4906 insertions(+), 1766 deletions(-) create mode 100644 music_assistant/providers/tidal/api_client.py create mode 100644 music_assistant/providers/tidal/library.py create mode 100644 music_assistant/providers/tidal/media.py create mode 100644 music_assistant/providers/tidal/parsers.py create mode 100644 music_assistant/providers/tidal/playlist.py create mode 100644 music_assistant/providers/tidal/provider.py create mode 100644 music_assistant/providers/tidal/recommendations.py create mode 100644 music_assistant/providers/tidal/streaming.py create mode 100644 tests/providers/tidal/__init__.py create mode 100644 tests/providers/tidal/__snapshots__/test_parsers.ambr create mode 100644 tests/providers/tidal/fixtures/albums/album.json create mode 100644 tests/providers/tidal/fixtures/artists/artist.json create mode 100644 tests/providers/tidal/fixtures/pages/home.json create mode 100644 tests/providers/tidal/fixtures/playlists/mix.json create mode 100644 tests/providers/tidal/fixtures/playlists/playlist.json create mode 100644 tests/providers/tidal/fixtures/tracks/track.json create mode 100644 tests/providers/tidal/test_api_client.py create mode 100644 tests/providers/tidal/test_auth_manager.py create mode 100644 tests/providers/tidal/test_library.py create mode 100644 tests/providers/tidal/test_media.py create mode 100644 tests/providers/tidal/test_media_extended.py create mode 100644 tests/providers/tidal/test_page_parser.py create mode 100644 tests/providers/tidal/test_page_parser_extended.py create mode 100644 tests/providers/tidal/test_parsers.py create mode 100644 tests/providers/tidal/test_playlist.py create mode 100644 tests/providers/tidal/test_provider.py create mode 100644 tests/providers/tidal/test_recommendations.py create mode 100644 tests/providers/tidal/test_streaming.py diff --git a/music_assistant/providers/tidal/__init__.py b/music_assistant/providers/tidal/__init__.py index c05705e2..77788cff 100644 --- a/music_assistant/providers/tidal/__init__.py +++ b/music_assistant/providers/tidal/__init__.py @@ -3,64 +3,13 @@ from __future__ import annotations import asyncio -import functools -import json -from collections.abc import Awaitable, Callable -from contextlib import suppress -from datetime import datetime -from typing import TYPE_CHECKING, Any, TypeVar, cast +from typing import TYPE_CHECKING, cast -from aiohttp import ClientConnectionError, ClientResponse -from aiohttp.client_exceptions import ( - ClientConnectorError, - ClientError, - ClientPayloadError, - ClientResponseError, -) from music_assistant_models.config_entries import ConfigEntry, ConfigValueOption, ConfigValueType -from music_assistant_models.enums import ( - AlbumType, - ConfigEntryType, - ContentType, - ExternalID, - ImageType, - MediaType, - ProviderFeature, - ProviderType, - StreamType, -) -from music_assistant_models.errors import ( - LoginFailed, - MediaNotFoundError, - ResourceTemporarilyUnavailable, -) -from music_assistant_models.media_items import ( - Album, - Artist, - AudioFormat, - BrowseFolder, - ItemMapping, - MediaItemImage, - MediaItemType, - Playlist, - ProviderMapping, - RecommendationFolder, - SearchResults, - Track, - UniqueList, -) -from music_assistant_models.streamdetails import StreamDetails - -from music_assistant.controllers.cache import use_cache -from music_assistant.helpers.throttle_retry import ThrottlerManager, throttle_with_retries -from music_assistant.helpers.util import infer_album_type -from music_assistant.models.music_provider import MusicProvider +from music_assistant_models.enums import ConfigEntryType from .auth_manager import ManualAuthenticationHelper, TidalAuthManager from .constants import ( - BROWSE_URL, - CACHE_CATEGORY_ISRC_MAP, - CACHE_CATEGORY_RECOMMENDATIONS, CONF_ACTION_CLEAR_AUTH, CONF_ACTION_COMPLETE_PKCE_LOGIN, CONF_ACTION_START_PKCE_LOGIN, @@ -71,18 +20,13 @@ from .constants import ( CONF_REFRESH_TOKEN, CONF_TEMP_SESSION, CONF_USER_ID, - DEFAULT_LIMIT, LABEL_COMPLETE_PKCE_LOGIN, LABEL_OOPS_URL, LABEL_START_PKCE_LOGIN, - RESOURCES_URL, ) -from .tidal_page_parser import TidalPageParser +from .provider import TidalProvider if TYPE_CHECKING: - from collections.abc import AsyncGenerator - - from aiohttp import ClientResponse from music_assistant_models.config_entries import ProviderConfig from music_assistant_models.provider import ProviderManifest @@ -90,28 +34,6 @@ if TYPE_CHECKING: from music_assistant.models import ProviderInstanceType -T = TypeVar("T") - -SUPPORTED_FEATURES = { - ProviderFeature.LIBRARY_ARTISTS, - ProviderFeature.LIBRARY_ALBUMS, - ProviderFeature.LIBRARY_TRACKS, - ProviderFeature.LIBRARY_PLAYLISTS, - ProviderFeature.ARTIST_ALBUMS, - ProviderFeature.ARTIST_TOPTRACKS, - ProviderFeature.SEARCH, - ProviderFeature.LIBRARY_ARTISTS_EDIT, - ProviderFeature.LIBRARY_ALBUMS_EDIT, - ProviderFeature.LIBRARY_TRACKS_EDIT, - ProviderFeature.LIBRARY_PLAYLISTS_EDIT, - ProviderFeature.PLAYLIST_CREATE, - ProviderFeature.SIMILAR_TRACKS, - ProviderFeature.BROWSE, - ProviderFeature.PLAYLIST_TRACKS_EDIT, - ProviderFeature.RECOMMENDATIONS, -} - - async def setup( mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig ) -> ProviderInstanceType: @@ -126,11 +48,19 @@ async def get_config_entries( values: dict[str, ConfigValueType] | None = None, ) -> tuple[ConfigEntry, ...]: """ - Return Config entries to setup this provider. + Return configuration entries required to set up the Tidal provider. + + Parameters: + mass (MusicAssistant): The MusicAssistant instance. + instance_id (str | None): Optional instance identifier for the provider. + action (str | None): Optional action to perform (e.g., start or complete PKCE login). + values (dict[str, ConfigValueType] | None): Dictionary of current configuration values. - instance_id: id of an existing provider instance (None if new instance setup). - action: [optional] action key called from config entries UI. - values: the (intermediate) raw values for config entries sent with the action. + Returns: + tuple[ConfigEntry, ...]: Tuple of ConfigEntry objects representing the configuration steps. + + The function handles authentication actions and returns the appropriate configuration entries + based on the current state and provided values. """ assert values is not None @@ -311,1666 +241,3 @@ async def get_config_entries( value=cast("str", values.get(CONF_USER_ID)) if values else None, ), ) - - -class TidalProvider(MusicProvider): - """Implementation of a Tidal MusicProvider.""" - - BASE_URL: str = "https://api.tidal.com/v1" - BASE_URL_V2: str = "https://api.tidal.com/v2" - OPEN_API_URL: str = "https://openapi.tidal.com/v2" - - throttler = ThrottlerManager(rate_limit=1, period=2) - - # - # INITIALIZATION & SETUP - # - - def __init__(self, mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig): - """Initialize Tidal provider.""" - super().__init__(mass, manifest, config, SUPPORTED_FEATURES) - self.auth = TidalAuthManager( - http_session=mass.http_session, - config_updater=self._update_auth_config, - logger=self.logger, - ) - self.page_cache_ttl = 3 * 3600 - - def _update_auth_config(self, auth_info: dict[str, Any]) -> None: - """Update auth config with new auth info.""" - self.update_config_value(CONF_AUTH_TOKEN, auth_info["access_token"], encrypted=True) - self.update_config_value(CONF_REFRESH_TOKEN, auth_info["refresh_token"], encrypted=True) - self.update_config_value(CONF_EXPIRY_TIME, auth_info["expires_at"]) - self.update_config_value(CONF_USER_ID, auth_info["userId"]) - - async def handle_async_init(self) -> None: - """Handle async initialization of the provider.""" - # Load auth info from individual config values - access_token = self.config.get_value(CONF_AUTH_TOKEN) - refresh_token = self.config.get_value(CONF_REFRESH_TOKEN) - expires_at = self.config.get_value(CONF_EXPIRY_TIME) - user_id = self.config.get_value(CONF_USER_ID) - - if not access_token or not refresh_token: - raise LoginFailed("Missing authentication data") - - # Handle conversion from ISO format to timestamp if needed - if isinstance(expires_at, str) and "T" in expires_at: - # This looks like an ISO format date - try: - dt = datetime.fromisoformat(expires_at) - # Convert to timestamp - expires_at = dt.timestamp() - # Update the config with the numeric value - self.update_config_value(CONF_EXPIRY_TIME, expires_at) - except ValueError: - self.logger.warning( - "Could not parse expiry time %s, setting to expired", expires_at - ) - expires_at = 0 - - # Create auth data dictionary from individual config values - auth_data = { - "access_token": access_token, - "refresh_token": refresh_token, - "expires_at": expires_at, - "userId": user_id, - } - - # Initialize auth manager - if not await self.auth.initialize(json.dumps(auth_data)): - raise LoginFailed("Failed to authenticate with Tidal") - - # Get user information from sessions API - api_result = await self._get_data("sessions") - user_info = self._extract_data(api_result) - logged_in_user = await self.get_user(str(user_info.get("userId"))) - await self.auth.update_user_info(logged_in_user, str(user_info.get("sessionId"))) - - # - # API REQUEST HELPERS & DECORATORS - # - - @staticmethod - def prepare_api_request(method: Callable[..., Awaitable[T]]) -> Callable[..., Awaitable[T]]: - """Prepare API requests with authentication and common parameters.""" - - @functools.wraps(method) - async def wrapper(self: TidalProvider, endpoint: str, **kwargs: Any) -> T: - # Ensure we have a valid token through auth manager - if not await self.auth.ensure_valid_token(): - raise LoginFailed("Failed to authenticate with Tidal") - - # Add required parameters to every request - params = kwargs.pop("params", {}) or {} - - # Add session ID and country code if available - if self.auth.session_id: - params["sessionId"] = self.auth.session_id - - if self.auth.country_code: - params["countryCode"] = self.auth.country_code - - kwargs["params"] = params - - # Prepare headers - headers = kwargs.pop("headers", {}) or {} - headers["Authorization"] = f"Bearer {self.auth.access_token}" - - # Add locale headers - locale = self.mass.metadata.locale.replace("_", "-") - language = locale.split("-")[0] - headers["Accept-Language"] = f"{locale}, {language};q=0.9, *;q=0.5" - kwargs["headers"] = headers - - return await method(self, endpoint, **kwargs) - - return wrapper - - # - # CORE API METHODS - # - - @throttle_with_retries - @prepare_api_request - async def _get_data( - self, endpoint: str, **kwargs: Any - ) -> dict[str, Any] | tuple[dict[str, Any], str]: - """Get data from Tidal API using mass.http_session.""" - # Check if we want to return the ETag - return_etag = kwargs.pop("return_etag", False) - - base_url = kwargs.pop("base_url", self.BASE_URL) - url = f"{base_url}/{endpoint}" - - self.logger.debug("Making request to Tidal API: %s", endpoint) - - async with self.mass.http_session.get(url, **kwargs) as response: - return await self._handle_response(response, return_etag) - - @prepare_api_request - async def _post_data( - self, - endpoint: str, - data: dict[str, Any] | None = None, - as_form: bool = False, - **kwargs: Any, - ) -> dict[str, Any]: - """Send POST data to Tidal API.""" - base_url = kwargs.pop("base_url", self.BASE_URL) - url = f"{base_url}/{endpoint}" - - if as_form: - # Set content type for form data - headers = kwargs.get("headers", {}) - headers["Content-Type"] = "application/x-www-form-urlencoded" - kwargs["headers"] = headers - # Use data parameter for form-encoded data - async with self.mass.http_session.post(url, data=data, **kwargs) as response: - return cast( - "dict[str, Any]", - await self._handle_response(response, return_etag=False), - ) - # Use json parameter for JSON data (default) - async with self.mass.http_session.post(url, json=data, **kwargs) as response: - return cast( - "dict[str, Any]", - await self._handle_response(response, return_etag=False), - ) - - @prepare_api_request - async def _put_data( - self, - endpoint: str, - data: dict[str, Any] | None = None, - as_form: bool = False, - **kwargs: Any, - ) -> dict[str, Any]: - """Send PUT data to Tidal API.""" - # Use BASE_URL_V2 for PUT requests to mixes endpoints - base_url = kwargs.pop( - "base_url", self.BASE_URL_V2 if "mixes" in endpoint else self.BASE_URL - ) - url = f"{base_url}/{endpoint}" - - if as_form: - # Set content type for form data - headers = kwargs.get("headers", {}) - headers["Content-Type"] = "application/x-www-form-urlencoded" - kwargs["headers"] = headers - # Use data parameter for form-encoded data - async with self.mass.http_session.put(url, data=data, **kwargs) as response: - return cast( - "dict[str, Any]", - await self._handle_response(response, return_etag=False), - ) - # Use json parameter for JSON data (default) - async with self.mass.http_session.put(url, json=data, **kwargs) as response: - return cast( - "dict[str, Any]", - await self._handle_response(response, return_etag=False), - ) - - @prepare_api_request - async def _delete_data( - self, endpoint: str, data: dict[str, Any] | None = None, **kwargs: Any - ) -> dict[str, Any]: - """Delete data from Tidal API using mass.http_session.""" - url = f"{self.BASE_URL}/{endpoint}" - self.logger.debug("Making DELETE request to Tidal API: %s", endpoint) - - # For DELETE requests with a body, we need to use json parameter - async with self.mass.http_session.delete(url, json=data, **kwargs) as response: - return cast("dict[str, Any]", await self._handle_response(response, return_etag=False)) - - async def _handle_response( - self, response: ClientResponse, return_etag: bool = False - ) -> dict[str, Any] | tuple[dict[str, Any], str]: - """Handle API response and common error conditions.""" - # Handle error responses - if response.status == 401: - # Authentication error is handled by the calling method (which will retry) - raise LoginFailed("Authentication failed") - - if response.status == 404: - raise MediaNotFoundError(f"Item not found: {response.url}") - - if response.status == 429: - retry_after = int(response.headers.get("Retry-After", 30)) - raise ResourceTemporarilyUnavailable( - "Tidal Rate limit reached", backoff_time=retry_after - ) - - if response.status == 412: - text = await response.text() - self.logger.error("Precondition failed: %s", text) - raise ResourceTemporarilyUnavailable( - "Resource changed while updating, please try again" - ) - - if response.status >= 400: - text = await response.text() - self.logger.error("API error: %s - %s", response.status, text) - raise ResourceTemporarilyUnavailable("API error") - - # Parse successful response - try: - # Check if there's content to parse - if ( - response.content_length == 0 - or not response.content_type - or response.content_type == "" - ): - # Empty response, return success indicator - data = {"success": True} - else: - data = await response.json() - - # Return with etag if requested - if return_etag: - etag = response.headers.get("ETag", "") - return data, etag - return data - except json.JSONDecodeError as err: - self.logger.error("Failed to parse JSON response: %s", err) - raise ResourceTemporarilyUnavailable("Failed to parse response") from err - except (TypeError, ValueError, KeyError) as err: - self.logger.error("Invalid response format: %s", err) - raise ResourceTemporarilyUnavailable("Invalid response format") from err - - async def _paginate_api( - self, - endpoint: str, - item_key: str = "items", - nested_key: str | None = None, - limit: int = DEFAULT_LIMIT, - cursor_based: bool = False, - **kwargs: Any, - ) -> AsyncGenerator[Any, None]: - """Paginate through all items from a Tidal API endpoint.""" - offset = 0 - cursor = None - - while True: - # Get a batch of items - params = {"limit": limit} - if cursor_based: - if cursor: - params["cursor"] = cursor # Add cursor if available - else: - params["offset"] = offset # Use offset for offset-based pagination - - if "params" in kwargs: - params.update(kwargs.pop("params")) - - api_result = await self._get_data(endpoint, params=params, **kwargs) - response = self._extract_data(api_result) - - # Extract items from response - items = response.get(item_key, []) - if not items: - break - - # Process each item in the batch - for item in items: - if nested_key and nested_key in item and item[nested_key]: - yield item[nested_key] - else: - yield item - # Update cursor or offset for the next batch - if cursor_based: - cursor = response.get("cursor") # Update cursor from the response - if not cursor: - break # Stop if no next cursor is provided - - # Update offset for next batch - offset += len(items) - - def _extract_data( - self, api_result: dict[str, Any] | tuple[dict[str, Any], str] - ) -> dict[str, Any]: - """Extract data from API result that might be tuple of (data, etag).""" - return api_result[0] if isinstance(api_result, tuple) else api_result - - def _extract_data_and_etag( - self, api_result: dict[str, Any] | tuple[dict[str, Any], str] - ) -> tuple[dict[str, Any], str | None]: - """Extract both data and etag from API result.""" - if isinstance(api_result, tuple): - return api_result - return api_result, None - - # - # SEARCH & DISCOVERY - # - - async def get_user(self, prov_user_id: str) -> dict[str, Any]: - """Get user information.""" - api_result = await self._get_data(f"users/{prov_user_id}") - return self._extract_data(api_result) - - @use_cache(3600 * 24 * 14) # Cache for 14 days - async def search( - self, - search_query: str, - media_types: list[MediaType], - limit: int = 5, - ) -> SearchResults: - """Perform search on musicprovider. - - :param search_query: Search query. - :param media_types: A list of media_types to include. - :param limit: Number of items to return in the search (per type). - """ - parsed_results = SearchResults() - - # Filter supported media types and convert to strings for the API - media_type_strings = [] - for media_type in media_types: - if media_type == MediaType.ARTIST: - media_type_strings.append("artists") - elif media_type == MediaType.ALBUM: - media_type_strings.append("albums") - elif media_type == MediaType.TRACK: - media_type_strings.append("tracks") - elif media_type == MediaType.PLAYLIST: - media_type_strings.append("playlists") - - if not media_type_strings: - return parsed_results - - # Add debug logging - self.logger.debug( - "Searching Tidal for %s, types: %s, limit: %d", - search_query, - media_type_strings, - limit, - ) - - api_result = await self._get_data( - "search", - params={ - "query": search_query.replace("'", ""), - "limit": limit, - "types": ",".join(media_type_strings), # Use strings, not enum values - }, - ) - - # Handle potential tuple return (data, etag) - results = self._extract_data(api_result) - - self.logger.debug("Tidal search response keys: %s", list(results.keys())) - - # Check if keys exist and are not None before processing - if "artists" in results and results["artists"] and "items" in results["artists"]: - parsed_results.artists = [ - self._parse_artist(artist) for artist in results["artists"]["items"] - ] - - if "albums" in results and results["albums"] and "items" in results["albums"]: - parsed_results.albums = [ - self._parse_album(album) for album in results["albums"]["items"] - ] - - if "playlists" in results and results["playlists"] and "items" in results["playlists"]: - parsed_results.playlists = [ - self._parse_playlist(playlist) for playlist in results["playlists"]["items"] - ] - - if "tracks" in results and results["tracks"] and "items" in results["tracks"]: - parsed_results.tracks = [ - self._parse_track(track) for track in results["tracks"]["items"] - ] - - self.logger.debug( - "Search results - artists: %d, albums: %d, tracks: %d, playlists: %d", - len(parsed_results.artists), - len(parsed_results.albums), - len(parsed_results.tracks), - len(parsed_results.playlists), - ) - - return parsed_results - - @use_cache(3600 * 24) # Cache for 1 day - async def get_similar_tracks(self, prov_track_id: str, limit: int = 25) -> list[Track]: - """Get similar tracks for given track id.""" - try: - api_result = await self._get_data( - f"tracks/{prov_track_id}/radio", params={"limit": limit} - ) - similar_tracks = self._extract_data(api_result) - return [self._parse_track(track_obj) for track_obj in similar_tracks.get("items", [])] - except ResourceTemporarilyUnavailable: - raise - except (ClientError, KeyError, ValueError) as err: - raise MediaNotFoundError(f"Track {prov_track_id} not found") from err - - # - # ITEM RETRIEVAL METHODS - # - - @use_cache(3600 * 24 * 30) # Cache for 30 days - async def get_artist(self, prov_artist_id: str) -> Artist: - """Get artist details for given artist id.""" - try: - api_result = await self._get_data(f"artists/{prov_artist_id}") - artist_obj = self._extract_data(api_result) - return self._parse_artist(artist_obj) - except ResourceTemporarilyUnavailable: - raise - except (ClientError, KeyError, ValueError) as err: - raise MediaNotFoundError(f"Artist {prov_artist_id} not found") from err - - @use_cache(3600 * 24 * 30) # Cache for 30 days - async def get_album(self, prov_album_id: str) -> Album: - """Get album details for given album id.""" - try: - api_result = await self._get_data(f"albums/{prov_album_id}") - album_obj = self._extract_data(api_result) - return self._parse_album(album_obj) - except ResourceTemporarilyUnavailable: - raise - except (ClientError, KeyError, ValueError) as err: - raise MediaNotFoundError(f"Album {prov_album_id} not found") from err - - @use_cache(3600 * 24 * 30) # Cache for 30 days - async def get_track(self, prov_track_id: str) -> Track: - """Get track details for given track id.""" - try: - api_result = await self._get_data(f"tracks/{prov_track_id}") - track_obj = self._extract_data(api_result) - - lyrics = None - with suppress(MediaNotFoundError): - api_result = await self._get_data(f"tracks/{prov_track_id}/lyrics") - lyrics_data = self._extract_data(api_result) - if lyrics_data: - lyrics = lyrics_data - # Create track with lyrics data - return self._parse_track(track_obj, lyrics=lyrics) - except ResourceTemporarilyUnavailable: - raise - except (ClientError, KeyError, ValueError) as err: - raise MediaNotFoundError(f"Track {prov_track_id} not found") from err - - @use_cache(3600 * 24 * 30) # Cache for 30 days - async def get_playlist(self, prov_playlist_id: str) -> Playlist: - """Get playlist details for given playlist id.""" - # Check if this is a mix by ID prefix - is_mix = prov_playlist_id.startswith("mix_") - - if is_mix: - # Strip prefix and use mix API - actual_id = prov_playlist_id[4:] # Remove "mix_" prefix - try: - return await self._get_mix_details(actual_id) - except ResourceTemporarilyUnavailable: - raise - except (ClientError, KeyError, ValueError) as err: - raise MediaNotFoundError(f"Mix {prov_playlist_id} not found") from err - - # Try regular playlist endpoint - try: - api_result = await self._get_data(f"playlists/{prov_playlist_id}") - playlist_obj = self._extract_data(api_result) - return self._parse_playlist(playlist_obj) - except MediaNotFoundError: - # If not found, try as a Tidal mix (might be unidentified mix) - self.logger.debug("Playlist %s not found, trying as Tidal Mix", prov_playlist_id) - try: - return await self._get_mix_details(prov_playlist_id) - except ResourceTemporarilyUnavailable: - raise - except (ClientError, KeyError, ValueError) as err: - # Re-raise the original error with the requested ID - raise MediaNotFoundError(f"Playlist {prov_playlist_id} not found") from err - except ResourceTemporarilyUnavailable: - raise - except (ClientError, KeyError, ValueError) as err: - raise MediaNotFoundError(f"Playlist {prov_playlist_id} not found") from err - - async def _get_mix_details(self, prov_mix_id: str) -> Playlist: - """Get details for a Tidal Mix.""" - try: - params = {"mixId": prov_mix_id, "deviceType": "BROWSER"} - api_result = await self._get_data("pages/mix", params=params) - tidal_mix = self._extract_data(api_result) - - # Extract mix details from page data - if "title" not in tidal_mix: - raise MediaNotFoundError(f"Mix {prov_mix_id} not found") - - # Create basic mix object with required fields - mix_obj = { - "id": prov_mix_id, - "title": tidal_mix.get("title", "Unknown Mix"), - "updated": tidal_mix.get("lastUpdated", ""), - "images": {}, # Initialize empty images dict - } - - # Safely extract the mix object and its images from the header module - rows = tidal_mix.get("rows", []) - if rows and isinstance(rows, list) and len(rows) > 0: - first_row = rows[0] - if isinstance(first_row, dict): - modules = first_row.get("modules", []) - if modules and isinstance(modules, list) and len(modules) > 0: - header_module = modules[0] - if isinstance(header_module, dict): - mix_data = header_module.get("mix", {}) - if isinstance(mix_data, dict): - # Get images if they exist - if "images" in mix_data and isinstance(mix_data["images"], dict): - mix_obj["images"] = mix_data["images"] - self.logger.debug( - "Successfully extracted mix images from header module" - ) - - # Get subtitle if it exists - subtitle = mix_data.get("subTitle") - if subtitle: - mix_obj["subTitle"] = subtitle - - # Safely check if we have useful images - images = mix_obj.get("images", {}) - if images and any(key in images for key in ["MEDIUM", "LARGE", "SMALL"]): - self.logger.debug("Found images for mix %s: %s", prov_mix_id, list(images.keys())) - else: - self.logger.debug("No images found for mix %s", prov_mix_id) - - return self._parse_playlist(mix_obj, is_mix=True) - except ResourceTemporarilyUnavailable: - raise - except (ClientError, KeyError, ValueError) as err: - raise MediaNotFoundError(f"Mix {prov_mix_id} not found") from err - - @use_cache(3600 * 24 * 30) # Cache for 30 days - async def get_album_tracks(self, prov_album_id: str) -> list[Track]: - """Get album tracks for given album id.""" - try: - api_result = await self._get_data( - f"albums/{prov_album_id}/tracks", params={"limit": 250} - ) - album_tracks = self._extract_data(api_result) - return [self._parse_track(track_obj) for track_obj in album_tracks.get("items", [])] - except ResourceTemporarilyUnavailable: - raise - except (ClientError, KeyError, ValueError) as err: - raise MediaNotFoundError(f"Album {prov_album_id} not found") from err - - @use_cache(3600 * 24 * 7) # Cache for 7 days - async def get_artist_albums(self, prov_artist_id: str) -> list[Album]: - """Get a list of all albums for the given artist.""" - try: - api_result = await self._get_data( - f"artists/{prov_artist_id}/albums", params={"limit": 250} - ) - artist_albums = self._extract_data(api_result) - return [self._parse_album(album_obj) for album_obj in artist_albums.get("items", [])] - except ResourceTemporarilyUnavailable: - raise - except (ClientError, KeyError, ValueError) as err: - raise MediaNotFoundError(f"Artist {prov_artist_id} not found") from err - - @use_cache(3600 * 24 * 7) # Cache for 7 days - async def get_artist_toptracks(self, prov_artist_id: str) -> list[Track]: - """Get a list of 10 most popular tracks for the given artist.""" - try: - api_result = await self._get_data( - f"artists/{prov_artist_id}/toptracks", params={"limit": 10, "offset": 0} - ) - artist_top_tracks = self._extract_data(api_result) - return [ - self._parse_track(track_obj) for track_obj in artist_top_tracks.get("items", []) - ] - except ResourceTemporarilyUnavailable: - raise - except (ClientError, KeyError, ValueError) as err: - raise MediaNotFoundError(f"Artist {prov_artist_id} not found") from err - - @use_cache(3600 * 3) # Cache for 3 hours - async def get_playlist_tracks(self, prov_playlist_id: str, page: int = 0) -> list[Track]: - """Get playlist tracks for either regular playlists or Tidal mixes.""" - page_size = 200 - offset = page * page_size - - # Check if this is a mix by ID prefix - is_mix = prov_playlist_id.startswith("mix_") - - if is_mix: - # Strip prefix and use mix API - actual_id = prov_playlist_id[4:] # Remove "mix_" prefix - try: - return await self._get_mix_playlist_tracks(actual_id, page_size, offset) - except ResourceTemporarilyUnavailable: - raise - except (ClientError, KeyError, ValueError) as err: - raise MediaNotFoundError(f"Mix playlist {prov_playlist_id} not found") from err - - # Otherwise try regular endpoint first, fall back only if needed - try: - return await self._get_regular_playlist_tracks(prov_playlist_id, page_size, offset) - except MediaNotFoundError: - self.logger.debug("Playlist not found, trying as Tidal Mix") - try: - return await self._get_mix_playlist_tracks(prov_playlist_id, page_size, offset) - except ResourceTemporarilyUnavailable: - raise - except (ClientError, KeyError, ValueError) as err: - # Re-raise the original error with the requested ID - raise MediaNotFoundError(f"Playlist {prov_playlist_id} not found") from err - - async def _get_regular_playlist_tracks( - self, prov_playlist_id: str, page_size: int, offset: int - ) -> list[Track]: - """Get tracks from a regular Tidal playlist.""" - api_result = await self._get_data( - f"playlists/{prov_playlist_id}/tracks", - params={"limit": page_size, "offset": offset}, - ) - tidal_tracks = self._extract_data(api_result) - - return self._process_track_results(tidal_tracks.get("items", []), offset) - - async def _get_mix_playlist_tracks( - self, prov_playlist_id: str, page_size: int, offset: int - ) -> list[Track]: - """Get tracks from a Tidal Mix playlist.""" - try: - params = {"mixId": prov_playlist_id, "deviceType": "BROWSER"} - api_result = await self._get_data("pages/mix", params=params) - tidal_mix = self._extract_data(api_result) - - # Verify we have the expected structure - if "rows" not in tidal_mix or len(tidal_mix["rows"]) < 2: - raise MediaNotFoundError(f"Invalid mix structure for {prov_playlist_id}") - - module = tidal_mix["rows"][1]["modules"][0] if len(tidal_mix["rows"]) > 1 else None - if not module or "pagedList" not in module: - raise MediaNotFoundError(f"Invalid mix module for {prov_playlist_id}") - - all_tracks = module["pagedList"].get("items", []) - - # Manually paginate the results - start_idx = min(offset, len(all_tracks)) - end_idx = min(offset + page_size, len(all_tracks)) - paginated_tracks = all_tracks[start_idx:end_idx] - - self.logger.debug( - "Mix tracks - total: %d, page: %d, returning: %d tracks", - len(all_tracks), - offset // page_size, - len(paginated_tracks), - ) - - return self._process_track_results(paginated_tracks, offset) - except ResourceTemporarilyUnavailable: - raise - except (ClientError, KeyError, ValueError) as err: - raise MediaNotFoundError(f"Playlist {prov_playlist_id} not found") from err - - @use_cache(expiration=3600, category=CACHE_CATEGORY_RECOMMENDATIONS) - async def recommendations(self) -> list[RecommendationFolder]: - """Get this provider's recommendations organized into folders.""" - results: list[RecommendationFolder] = [] - - # Pages to fetch - pages = [ - "pages/home", - "pages/for_you", - "pages/hi_res", - "pages/explore_new_music", - "pages/explore_top_music", - ] - - # Dictionary to track items by module title to combine duplicates - combined_modules: dict[str, list[Playlist | Album | Track | Artist]] = {} - module_content_types: dict[str, MediaType] = {} - module_page_names: dict[str, str] = {} - - try: - # Get all Tidal provider instances - await the coroutine - all_tidal_configs = await self.mass.config.get_provider_configs(ProviderType.MUSIC) - # Filter to only Tidal configs - tidal_configs = [config for config in all_tidal_configs if config.domain == self.domain] - # Sort by instance_id to get a consistent "first" instance - sorted_instances = sorted(tidal_configs, key=lambda x: x.instance_id) - # Process pages and collect modules - await self._process_recommendation_pages( - pages, - combined_modules, - module_content_types, - module_page_names, - sorted_instances, - ) - - # Create recommendation folders from combined modules - results = self._create_recommendation_folders( - combined_modules, module_content_types, module_page_names, sorted_instances - ) - - self.logger.debug("Created %d recommendation folders from Tidal", len(results)) - - except (ClientError, ResourceTemporarilyUnavailable) as err: - # Network-related errors - self.logger.warning("Network error fetching Tidal recommendations: %s", err) - except (KeyError, ValueError, TypeError, json.JSONDecodeError) as err: - # Data parsing errors - self.logger.warning("Error parsing Tidal recommendations data: %s", err) - except ( - ClientConnectionError, - ClientConnectorError, - ClientResponseError, - ClientPayloadError, - ) as err: - # More specific network errors - self.logger.warning("Network error in Tidal recommendations: %s", err) - - return results - - async def _process_recommendation_pages( - self, - pages: list[str], - combined_modules: dict[str, list[Playlist | Album | Track | Artist]], - module_content_types: dict[str, MediaType], - module_page_names: dict[str, str], - sorted_instances: list[ProviderConfig], - ) -> None: - """Process recommendation pages and collect modules.""" - # Check if there are multiple Tidal instances configured - show_user_identifier = len(sorted_instances) > 1 - - for page_path in pages: - # Get page content - page_parser = await self.get_page_content(page_path) - page_name = page_path.split("/")[-1].replace("_", " ").title() - - # For "Home" page with multiple instances, only process for the first instance - # Check if we should skip this page for this instance - if page_path in ("pages/home", "pages/explore_top_music") and show_user_identifier: - # Only process home page for the first instance - if sorted_instances and self.instance_id != sorted_instances[0].instance_id: - self.logger.debug( - "Skipping '%s' page for instance %s (not first instance)", - page_name, - self.instance_id, - ) - continue - - # Process all modules in a single pass - await self._process_page_modules( - page_parser, page_name, combined_modules, module_content_types, module_page_names - ) - - async def _process_page_modules( - self, - page_parser: TidalPageParser, - page_name: str, - combined_modules: dict[str, list[Playlist | Album | Track | Artist]], - module_content_types: dict[str, MediaType], - module_page_names: dict[str, str], - ) -> None: - """Process all modules from a single page.""" - for module_info in page_parser._module_map: - try: - module_title = module_info.get("title", "Unknown") - - # Skip modules without proper titles or with "Videos" in the title - if not module_title or module_title == "Unknown" or "Videos" in module_title: - continue - - # Get module items - module_items, content_type = page_parser.get_module_items(module_info) - - # Skip empty modules - if not module_items: - continue - - # Create a user-specific key to prevent mixing content between users - user_specific_key = f"{self.auth.user_id}_{module_title}" - - # For all modules, collect items based on user-specific title - if user_specific_key not in combined_modules: - combined_modules[user_specific_key] = [] - module_content_types[user_specific_key] = content_type - module_page_names[user_specific_key] = page_name - else: - # If we already have this module title, update the content type - # if this module has more items than we already collected - current_items_count = len(combined_modules[user_specific_key]) - if len(module_items) > current_items_count: - module_content_types[user_specific_key] = content_type - - # Add items to the combined collection - combined_modules[user_specific_key].extend(module_items) - - except (KeyError, ValueError, TypeError, AttributeError) as err: - self.logger.warning( - "Error processing module %s from %s: %s", - module_info.get("title", "Unknown"), - page_name, - err, - ) - - def _create_recommendation_folders( - self, - combined_modules: dict[str, list[Playlist | Album | Track | Artist]], - module_content_types: dict[str, MediaType], - module_page_names: dict[str, str], - sorted_instances: list[ProviderConfig], - ) -> list[RecommendationFolder]: - """Create recommendation folders from combined modules.""" - results: list[RecommendationFolder] = [] - # Check if there are multiple Tidal instances configured - show_user_identifier = len(sorted_instances) > 1 - - # Helper function to determine icon based on content type - def get_icon_for_type(media_type: MediaType) -> str: - if media_type == MediaType.PLAYLIST: - return "mdi-playlist-music" - elif media_type == MediaType.ALBUM: - return "mdi-album" - elif media_type == MediaType.TRACK: - return "mdi-file-music" - elif media_type == MediaType.ARTIST: - return "mdi-account-music" - return "mdi-motion-play" # Default for mixed content - - for user_specific_key, items in combined_modules.items(): - # Extract the original module title by removing user_id prefix - # Format is "userid_module_title", so we remove the user_id and the underscore - user_id_prefix = f"{self.auth.user_id}_" - if user_specific_key.startswith(user_id_prefix): - module_title = user_specific_key[len(user_id_prefix) :] - else: - # Fallback if format is unexpected - module_title = user_specific_key - - # Use unique items list to prevent duplicates - unique_items = UniqueList(items) - - # Create a sanitized unique ID using the user-specific key - item_id = "".join( - c - for c in user_specific_key.lower().replace(" ", "_").replace("-", "_") - if c.isalnum() or c == "_" - ) - - # Get content type and page source - content_type = module_content_types.get(user_specific_key, MediaType.PLAYLIST) - page_name = module_page_names.get(user_specific_key, "Tidal") - - # Create folder name - only add user identifier if: - # 1. Multiple instances exist - # 2. AND it's not from the "Home" page (which is shared) - if show_user_identifier and page_name not in ("Home", "Explore Top Music"): - # Get a user-friendly identifier for the folder name - # Use the account owner name if available, otherwise user_id - user_identifier = None - if self.auth.user and self.auth.user.profile_name: - user_identifier = self.auth.user.profile_name - elif self.auth.user and self.auth.user.user_name: - user_identifier = self.auth.user.user_name - else: - user_identifier = str(self.auth.user_id) - - folder_name = f"{module_title} ({user_identifier})" - else: - folder_name = module_title - - # Create folder with combined items - folder = RecommendationFolder( - item_id=item_id, - name=folder_name, # Display the title with user identifier - provider=self.lookup_key, - items=UniqueList[MediaItemType | ItemMapping | BrowseFolder](unique_items), - subtitle=f"From {page_name} • {len(unique_items)} items", - translation_key=item_id, - icon=get_icon_for_type(content_type), - ) - results.append(folder) - - # Log a message if we combined multiple sources - if len(unique_items) < len(items): - self.logger.debug( - "Combined %d items into %d unique items for '%s'", - len(items), - len(unique_items), - module_title, - ) - - return results - - def _process_track_results( - self, track_objects: list[dict[str, Any]], offset: int - ) -> list[Track]: - """Process track objects into Track objects with positions.""" - result: list[Track] = [] - for index, track_obj in enumerate(track_objects, 1): - try: - track = self._parse_track(track_obj) - track.position = offset + index - result.append(track) - except (KeyError, TypeError) as err: - self.logger.warning("Error parsing track: %s", err) - continue - return result - - async def get_stream_details( - self, item_id: str, media_type: MediaType = MediaType.TRACK - ) -> StreamDetails: - """Return the content details for the given track when it will be streamed.""" - # Try direct track lookup first with exception handling - try: - track = await self.get_track(item_id) - except MediaNotFoundError: - self.logger.info( - "Track %s not found, attempting fallback by ISRC lookup", - item_id, - ) - track_result = await self._get_track_by_isrc(item_id) - if not track_result: - raise MediaNotFoundError(f"Track {item_id} not found") - track = track_result - - quality = self.config.get_value(CONF_QUALITY) - - # Request stream manifest - async with self.throttler.bypass(): - api_result = await self._get_data( - f"tracks/{item_id}/playbackinfopostpaywall", - params={ - "playbackmode": "STREAM", - "audioquality": quality, - "assetpresentation": "FULL", - }, - ) - stream_data = self._extract_data(api_result) - - # Extract streaming information - manifest_type = stream_data.get("manifestMimeType", "") - is_mpd = "dash+xml" in manifest_type - - if is_mpd and "manifest" in stream_data: - url = f"data:application/dash+xml;base64,{stream_data['manifest']}" - else: - # For non-MPD streams, use the direct URL - urls = stream_data.get("urls", []) - if not urls: - raise MediaNotFoundError(f"No stream URL for track {item_id}") - url = urls[0] - - # Determine audio format info - bit_depth = stream_data.get("bitDepth", 16) - sample_rate = stream_data.get("sampleRate", 44100) - audio_quality: str | None = stream_data.get("audioQuality") - if audio_quality in ("HIRES_LOSSLESS", "HI_RES_LOSSLESS", "LOSSLESS"): - content_type = ContentType.FLAC - elif codec := stream_data.get("codec"): - content_type = ContentType.try_parse(codec) - else: - content_type = ContentType.MP4 - - return StreamDetails( - item_id=track.item_id, - provider=self.lookup_key, - audio_format=AudioFormat( - content_type=content_type, - sample_rate=sample_rate, - bit_depth=bit_depth, - channels=2, - ), - stream_type=StreamType.HTTP, - duration=track.duration, - path=url, - can_seek=True, - allow_seek=True, - ) - - async def _get_track_by_isrc(self, item_id: str) -> Track | None: - """Get track by ISRC from library item, with caching.""" - # Try to get from cache first - cached_track_id = await self.mass.cache.get( - item_id, provider=self.instance_id, category=CACHE_CATEGORY_ISRC_MAP - ) - - if cached_track_id: - self.logger.debug("Using cached track id") - try: - api_result = await self._get_data(f"tracks/{cached_track_id}") - track_data = self._extract_data(api_result) - return self._parse_track(track_data) - except MediaNotFoundError: - # Track no longer exists, invalidate cache - await self.mass.cache.delete( - item_id, provider=self.instance_id, category=CACHE_CATEGORY_ISRC_MAP - ) - - # Lookup by ISRC if no cache or cached track not found - library_track = await self.mass.music.tracks.get_library_item_by_prov_id( - item_id, self.instance_id - ) - if not library_track: - return None - - isrc = next( - ( - id_value - for id_type, id_value in library_track.external_ids - if id_type == ExternalID.ISRC - ), - None, - ) - if not isrc: - return None - - self.logger.debug("Attempting track lookup by ISRC: %s", isrc) - - # Get tracks by ISRC using direct API - api_result = await self._get_data( - "/tracks", - params={ - "filter[isrc]": isrc, - }, - base_url=self.OPEN_API_URL, - ) - tracks_data = self._extract_data(api_result) - - if not tracks_data and not tracks_data.get("data"): - return None - - track_data = tracks_data["data"][0] - track_id = str(track_data["id"]) - - # Cache the mapping for future use - await self.mass.cache.set( - key=item_id, - data=track_id, - provider=self.instance_id, - category=CACHE_CATEGORY_ISRC_MAP, - persistent=True, - expiration=(86400 * 90), - ) - - return await self.get_track(track_id) - - def get_item_mapping(self, media_type: MediaType, key: str, name: str) -> ItemMapping: - """Create a generic item mapping.""" - return ItemMapping( - media_type=media_type, - item_id=key, - provider=self.lookup_key, - name=name, - ) - - # - # LIBRARY MANAGEMENT - # - - async def get_page_content(self, page_path: str = "pages/home") -> TidalPageParser: - """Get a lazy page parser for a Tidal page.""" - # Try to get from cache first - cached_parser = await TidalPageParser.from_cache(self, page_path) - if cached_parser: - self.logger.debug( - "Using cached page content for '%s' (age: %.1f minutes)", - page_path, - cached_parser.content_stats.get("cache_age_minutes", 0), - ) - return cached_parser - - # Not in cache or expired, fetch fresh content - try: - # Get the page structure - self.logger.debug("Fetching fresh page content for '%s'", page_path) - locale = self.mass.metadata.locale.replace("_", "-") - api_result = await self._get_data( - page_path, - base_url="https://listen.tidal.com/v1", - params={ - "locale": locale, - "deviceType": "BROWSER", - "countryCode": self.auth.country_code or "US", - }, - ) - - # Extract and build lazy parser - page_data = self._extract_data(api_result) or {} - parser = TidalPageParser(self) - parser.parse_page_structure(page_data, page_path) - - self.logger.debug("Page '%s' indexed with: %s", page_path, parser.content_stats) - - # Cache the parser data - cache_data = { - "module_map": parser._module_map, - "content_map": parser._content_map, - "parsed_at": parser._parsed_at, - } - await self.mass.cache.set( - key=page_path, - data=cache_data, - provider=self.instance_id, - category=CACHE_CATEGORY_RECOMMENDATIONS, - expiration=self.page_cache_ttl, - ) - - return parser - except ResourceTemporarilyUnavailable: - # Network-related errors - propagate - raise - except (ClientError, ClientConnectorError, ClientPayloadError) as err: - # Network-related errors - self.logger.error("Network error fetching Tidal page: %s", err) - return TidalPageParser(self) # Return empty parser - except (KeyError, ValueError, TypeError, json.JSONDecodeError) as err: - # Data parsing errors - self.logger.error("Error parsing Tidal page data: %s", err) - return TidalPageParser(self) # Return empty parser - - async def get_library_artists(self) -> AsyncGenerator[Artist, None]: - """Retrieve all library artists from Tidal.""" - user_id = self.auth.user_id - path = f"users/{user_id}/favorites/artists" - - async for artist_item in self._paginate_api(path, nested_key="item"): - if artist_item and artist_item.get("id"): - yield self._parse_artist(artist_item) - - async def get_library_albums(self) -> AsyncGenerator[Album, None]: - """Retrieve all library albums from Tidal.""" - user_id = self.auth.user_id - path = f"users/{user_id}/favorites/albums" - - async for album_item in self._paginate_api(path, nested_key="item"): - if album_item and album_item.get("id"): - yield self._parse_album(album_item) - - async def get_library_tracks(self) -> AsyncGenerator[Track, None]: - """Retrieve library tracks from Tidal.""" - user_id = self.auth.user_id - path = f"users/{user_id}/favorites/tracks" - - async for track_item in self._paginate_api(path, nested_key="item"): - if track_item and track_item.get("id"): - yield self._parse_track(track_item) - - async def get_library_playlists(self) -> AsyncGenerator[Playlist, None]: - """Retrieve all library playlists from the provider.""" - user_id = self.auth.user_id - mix_path = "favorites/mixes" - - async for mix_item in self._paginate_api( - mix_path, - item_key="items", - base_url=self.BASE_URL_V2, - cursor_based=True, - ): - if mix_item and mix_item.get("id"): - yield self._parse_playlist(mix_item, is_mix=True) - - playlist_path = f"users/{user_id}/playlistsAndFavoritePlaylists" - - async for playlist_item in self._paginate_api(playlist_path, nested_key="playlist"): - if playlist_item and playlist_item.get("uuid"): - yield self._parse_playlist(playlist_item) - - async def library_add(self, item: MediaItemType) -> bool: - """Add item to library.""" - endpoint, data, is_mix = self._get_library_endpoint_data( - item.item_id, item.media_type, "add" - ) - - if not endpoint: - return False - - try: - if is_mix: - await self._put_data(endpoint, data=data, as_form=True) - else: - endpoint = f"users/{self.auth.user_id}/{endpoint}" - await self._post_data(endpoint, data=data, as_form=True) - return True - except (ClientError, MediaNotFoundError, ResourceTemporarilyUnavailable) as err: - self.logger.warning( - "Failed to add %s:%s to library: %s", item.media_type, item.item_id, err - ) - return False - - async def library_remove(self, prov_item_id: str, media_type: MediaType) -> bool: - """Remove item from library.""" - endpoint, data, is_mix = self._get_library_endpoint_data(prov_item_id, media_type, "remove") - - if not endpoint: - return False - - try: - if is_mix: - await self._put_data(endpoint, data=data, as_form=True) - else: - endpoint = f"users/{self.auth.user_id}/{endpoint}" - await self._delete_data(endpoint) - return True - except (ClientError, MediaNotFoundError, ResourceTemporarilyUnavailable) as err: - self.logger.warning( - "Failed to remove %s:%s from library: %s", media_type, prov_item_id, err - ) - return False - - def _get_library_endpoint_data( - self, item_id: str, media_type: MediaType, operation: str - ) -> tuple[str | None, dict[str, Any], bool]: - """Get the endpoint, data, and mix flag for library operations.""" - is_mix = False - data = {} - - # Check if this is a mix by ID prefix - if media_type == MediaType.PLAYLIST and item_id.startswith("mix_"): - is_mix = True - # Strip prefix for API calls - mix_id = item_id[4:] # Remove "mix_" prefix - - if operation == "add": - endpoint = "favorites/mixes/add" - data = {"mixIds": mix_id, "onArtifactNotFound": "FAIL", "deviceType": "BROWSER"} - else: # remove - endpoint = "favorites/mixes/remove" - data = {"mixIds": mix_id, "deviceType": "BROWSER"} - return endpoint, data, is_mix - - # Regular items - if media_type == MediaType.ARTIST: - if operation == "add": - endpoint = "favorites/artists" - data = {"artistId": item_id} - else: - endpoint = f"favorites/artists/{item_id}" - elif media_type == MediaType.ALBUM: - if operation == "add": - endpoint = "favorites/albums" - data = {"albumId": item_id} - else: - endpoint = f"favorites/albums/{item_id}" - elif media_type == MediaType.TRACK: - if operation == "add": - endpoint = "favorites/tracks" - data = {"trackId": item_id} - else: - endpoint = f"favorites/tracks/{item_id}" - elif media_type == MediaType.PLAYLIST: - if operation == "add": - endpoint = "favorites/playlists" - data = {"uuids": item_id} - else: - endpoint = f"favorites/playlists/{item_id}" - else: - return None, {}, False - - return endpoint, data, is_mix - - # - # PLAYLIST MANAGEMENT - # - - async def create_playlist(self, name: str) -> Playlist: - """Create a new playlist on provider with given name.""" - # Create playlist using form-encoded data - data = {"title": name, "description": ""} - - try: - playlist_obj = await self._post_data( - f"users/{self.auth.user_id}/playlists", data=data, as_form=True - ) - - return self._parse_playlist(playlist_obj) - except (ClientResponseError, MediaNotFoundError, LoginFailed) as err: - self.logger.error("API error creating playlist: %s", err) - raise - except (ClientConnectorError, ClientPayloadError) as err: - # Network or payload errors - self.logger.error("Network error creating playlist: %s", err) - raise ResourceTemporarilyUnavailable("Failed to create playlist") from err - except (KeyError, ValueError, TypeError) as err: - # Data parsing errors - self.logger.error("Data error creating playlist: %s", err) - raise ResourceTemporarilyUnavailable("Failed to create playlist") from err - - async def add_playlist_tracks(self, prov_playlist_id: str, prov_track_ids: list[str]) -> None: - """Add track(s) to playlist.""" - try: - # Get playlist details first with ETag - api_result = await self._get_data(f"playlists/{prov_playlist_id}", return_etag=True) - playlist_obj, etag = self._extract_data_and_etag(api_result) - - # Send using form-encoded data like the synchronous library - data = { - "onArtifactNotFound": "SKIP", - "trackIds": ",".join(map(str, prov_track_ids)), - "toIndex": playlist_obj["numberOfTracks"], - "onDupes": "SKIP", - } - - # Force using form data instead of JSON and include ETag - headers = {"If-None-Match": etag} if etag else {} - await self._post_data( - f"playlists/{prov_playlist_id}/items", - data=data, - as_form=True, - headers=headers, - ) - - except (MediaNotFoundError, ClientResponseError) as err: - raise MediaNotFoundError(f"Playlist {prov_playlist_id} not found") from err - except (ClientConnectorError, ClientPayloadError) as err: - # Network errors - self.logger.error("Network error adding tracks to playlist: %s", err) - raise ResourceTemporarilyUnavailable("Failed to add tracks to playlist") from err - except (KeyError, ValueError) as err: - # Data errors - self.logger.error("Data error adding tracks to playlist: %s", err) - raise ResourceTemporarilyUnavailable("Failed to add tracks to playlist") from err - - async def remove_playlist_tracks( - self, prov_playlist_id: str, positions_to_remove: tuple[int, ...] - ) -> None: - """Remove track(s) from playlist.""" - # Get playlist with ETag first - api_result = await self._get_data(f"playlists/{prov_playlist_id}", return_etag=True) - _, etag = self._extract_data_and_etag(api_result) - - # Format positions as string in URL path - # Tidal can use directly indices in path, not track IDs in the body - position_string = ",".join([str(pos - 1) for pos in positions_to_remove]) - - # Use DELETE with If-None-Match header - # Tidal uses this incorrectly, but it's required - headers = {"If-None-Match": etag} if etag else {} - - # Make a direct DELETE request to the endpoint with positions in the URL path - await self._delete_data( - f"playlists/{prov_playlist_id}/items/{position_string}", headers=headers - ) - - # - # ITEM PARSERS - # - - def _parse_artist(self, artist_obj: dict[str, Any]) -> Artist: - """Parse tidal artist object to generic layout.""" - artist_id = str(artist_obj["id"]) - artist = Artist( - item_id=artist_id, - provider=self.lookup_key, - name=artist_obj["name"], - provider_mappings={ - ProviderMapping( - item_id=artist_id, - provider_domain=self.domain, - provider_instance=self.instance_id, - # NOTE: don't use the /browse endpoint as it's - # not working for musicbrainz lookups - url=f"https://tidal.com/artist/{artist_id}", - ) - }, - ) - # metadata - if artist_obj["picture"]: - picture_id = artist_obj["picture"].replace("-", "/") - image_url = f"{RESOURCES_URL}/{picture_id}/750x750.jpg" - artist.metadata.images = UniqueList( - [ - MediaItemImage( - type=ImageType.THUMB, - path=image_url, - provider=self.lookup_key, - remotely_accessible=True, - ) - ] - ) - - return artist - - def _parse_album(self, album_obj: dict[str, Any]) -> Album: - """Parse tidal album object to generic layout.""" - name = album_obj.get("title", "Unknown Album") - version = album_obj.get("version", "") or "" - album_id = str(album_obj.get("id", "")) - - album = Album( - item_id=album_id, - provider=self.lookup_key, - name=name, - version=version, - provider_mappings={ - ProviderMapping( - item_id=album_id, - provider_domain=self.domain, - provider_instance=self.instance_id, - audio_format=AudioFormat( - content_type=ContentType.FLAC, - ), - url=f"https://tidal.com/album/{album_id}", - available=album_obj.get("streamReady", True), # Default to available - ) - }, - ) - - # Safely handle artists array - various_artist_album: bool = False - for artist_obj in album_obj.get("artists", []): - try: - if artist_obj.get("name") == "Various Artists": - various_artist_album = True - album.artists.append(self._parse_artist(artist_obj)) - except (KeyError, TypeError) as err: - self.logger.warning("Error parsing artist in album %s: %s", name, err) - - # Safely determine album type - album_type = album_obj.get("type", "ALBUM") - if album_type == "COMPILATION" or various_artist_album: - album.album_type = AlbumType.COMPILATION - elif album_type == "ALBUM": - album.album_type = AlbumType.ALBUM - elif album_type == "EP": - album.album_type = AlbumType.EP - elif album_type == "SINGLE": - album.album_type = AlbumType.SINGLE - - # Try inference - override if it finds something more specific - inferred_type = infer_album_type(name, version) - if inferred_type in (AlbumType.SOUNDTRACK, AlbumType.LIVE): - album.album_type = inferred_type - - # Safely parse year - if release_date := album_obj.get("releaseDate", ""): - try: - album.year = int(release_date.split("-")[0]) - except (ValueError, IndexError): - self.logger.debug("Invalid release date format: %s", release_date) - with suppress(ValueError): - album.metadata.release_date = datetime.fromisoformat(release_date) - - # Safely set metadata - upc = album_obj.get("upc") - if upc: - album.external_ids.add((ExternalID.BARCODE, upc)) - - album.metadata.copyright = album_obj.get("copyright", "") - album.metadata.explicit = album_obj.get("explicit", False) - album.metadata.popularity = album_obj.get("popularity", 0) - - # Safely handle cover image - cover = album_obj.get("cover") - if cover: - picture_id = cover.replace("-", "/") - image_url = f"{RESOURCES_URL}/{picture_id}/750x750.jpg" - album.metadata.images = UniqueList( - [ - MediaItemImage( - type=ImageType.THUMB, - path=image_url, - provider=self.lookup_key, - remotely_accessible=True, - ) - ] - ) - - return album - - def _parse_track( - self, - track_obj: dict[str, Any], - lyrics: dict[str, str] | None = None, - ) -> Track: - """Parse tidal track object to generic layout.""" - version = track_obj.get("version", "") or "" - track_id = str(track_obj.get("id", 0)) - media_metadata = track_obj.get("mediaMetadata", {}) - tags = media_metadata.get("tags", []) - hi_res_lossless = any(tag in tags for tag in ["HIRES_LOSSLESS", "HI_RES_LOSSLESS"]) - track = Track( - item_id=track_id, - provider=self.lookup_key, - name=track_obj.get("title", "Unknown"), - version=version, - duration=track_obj.get("duration", 0), - provider_mappings={ - ProviderMapping( - item_id=str(track_id), - provider_domain=self.domain, - provider_instance=self.instance_id, - audio_format=AudioFormat( - content_type=ContentType.FLAC, - bit_depth=24 if hi_res_lossless else 16, - ), - url=f"https://tidal.com/track/{track_id}", - available=track_obj["streamReady"], - ) - }, - disc_number=track_obj.get("volumeNumber", 0) or 0, - track_number=track_obj.get("trackNumber", 0) or 0, - ) - if "isrc" in track_obj: - track.external_ids.add((ExternalID.ISRC, track_obj["isrc"])) - track.artists = UniqueList() - for track_artist in track_obj["artists"]: - artist = self._parse_artist(track_artist) - track.artists.append(artist) - # metadata - track.metadata.explicit = track_obj["explicit"] - track.metadata.popularity = track_obj["popularity"] - if "copyright" in track_obj: - track.metadata.copyright = track_obj["copyright"] - if lyrics and "lyrics" in lyrics: - track.metadata.lyrics = lyrics["lyrics"] - if lyrics and "subtitles" in lyrics: - track.metadata.lrc_lyrics = lyrics["subtitles"] - if track_obj["album"]: - # Here we use an ItemMapping as Tidal returns - # minimal data when getting an Album from a Track - track.album = self.get_item_mapping( - media_type=MediaType.ALBUM, - key=str(track_obj["album"]["id"]), - name=track_obj["album"]["title"], - ) - if track_obj["album"]["cover"]: - picture_id = track_obj["album"]["cover"].replace("-", "/") - image_url = f"{RESOURCES_URL}/{picture_id}/750x750.jpg" - track.metadata.images = UniqueList( - [ - MediaItemImage( - type=ImageType.THUMB, - path=image_url, - provider=self.lookup_key, - remotely_accessible=True, - ) - ] - ) - return track - - def _parse_playlist(self, playlist_obj: dict[str, Any], is_mix: bool = False) -> Playlist: - """Parse tidal playlist object to generic layout.""" - # Get ID based on playlist type - raw_id = str(playlist_obj.get("id" if is_mix else "uuid", "")) - - # Add prefix for mixes to distinguish them - playlist_id = f"mix_{raw_id}" if is_mix else raw_id - - # Owner logic differs between types - if is_mix: - owner_name = "Created by Tidal" - is_editable = False - else: - creator_id = None - creator = playlist_obj.get("creator", {}) - if creator: - creator_id = creator.get("id") - is_editable = bool(creator_id and str(creator_id) == str(self.auth.user_id)) - - owner_name = "Tidal" - if is_editable: - if self.auth.user.profile_name: - owner_name = self.auth.user.profile_name - elif self.auth.user.user_name: - owner_name = self.auth.user.user_name - elif self.auth.user_id: - owner_name = str(self.auth.user_id) - - # URL path differs by type - use raw_id for URLs - url_path = "mix" if is_mix else "playlist" - - playlist = Playlist( - item_id=playlist_id, - provider=self.instance_id if is_editable else self.lookup_key, - name=playlist_obj.get("title", "Unknown"), - owner=owner_name, - provider_mappings={ - ProviderMapping( - item_id=playlist_id, # Use raw ID for provider mapping - provider_domain=self.domain, - provider_instance=self.instance_id, - url=f"{BROWSE_URL}/{url_path}/{raw_id}", - ) - }, - is_editable=is_editable, - ) - - # Metadata - different fields based on type - - # Add the description from the subtitle for mixes - if is_mix: - subtitle = playlist_obj.get("subTitle") - if subtitle: - playlist.metadata.description = subtitle - - # Handle images differently based on type - if is_mix: - if pictures := playlist_obj.get("images", {}).get("MEDIUM"): - image_url = pictures.get("url", "") - if image_url: - playlist.metadata.images = UniqueList( - [ - MediaItemImage( - type=ImageType.THUMB, - path=image_url, - provider=self.lookup_key, - remotely_accessible=True, - ) - ] - ) - elif picture := (playlist_obj.get("squareImage") or playlist_obj.get("image")): - picture_id = picture.replace("-", "/") - image_url = f"{RESOURCES_URL}/{picture_id}/750x750.jpg" - playlist.metadata.images = UniqueList( - [ - MediaItemImage( - type=ImageType.THUMB, - path=image_url, - provider=self.lookup_key, - remotely_accessible=True, - ) - ] - ) - - return playlist diff --git a/music_assistant/providers/tidal/api_client.py b/music_assistant/providers/tidal/api_client.py new file mode 100644 index 00000000..a200646d --- /dev/null +++ b/music_assistant/providers/tidal/api_client.py @@ -0,0 +1,205 @@ +"""API Client for Tidal.""" + +from __future__ import annotations + +import json +from typing import TYPE_CHECKING, Any, cast + +from music_assistant_models.errors import ( + LoginFailed, + MediaNotFoundError, + ResourceTemporarilyUnavailable, +) + +from music_assistant.helpers.throttle_retry import ThrottlerManager, throttle_with_retries + +if TYPE_CHECKING: + from collections.abc import AsyncGenerator + + from aiohttp import ClientResponse + + from .provider import TidalProvider + + +class TidalAPIClient: + """Client for interacting with Tidal API.""" + + BASE_URL: str = "https://api.tidal.com/v1" + BASE_URL_V2: str = "https://api.tidal.com/v2" + OPEN_API_URL: str = "https://openapi.tidal.com/v2" + + # Define throttler here for use by the client + throttler = ThrottlerManager(rate_limit=1, period=2) + + def __init__(self, provider: TidalProvider): + """Initialize API client.""" + self.provider = provider + self.auth = provider.auth + self.logger = provider.logger + self.mass = provider.mass + + @throttle_with_retries # type: ignore[type-var] + async def get( + self, endpoint: str, **kwargs: Any + ) -> dict[str, Any] | tuple[dict[str, Any], str]: + """Get data from Tidal API.""" + return await self._request("GET", endpoint, **kwargs) + + async def get_data(self, endpoint: str, **kwargs: Any) -> dict[str, Any]: + """Get data from Tidal API, discarding headers/ETags.""" + result = await self.get(endpoint, **kwargs) + return result[0] if isinstance(result, tuple) else result + + async def post( + self, + endpoint: str, + data: dict[str, Any] | None = None, + as_form: bool = False, + **kwargs: Any, + ) -> dict[str, Any]: + """Send POST data to Tidal API.""" + if as_form: + kwargs.setdefault("headers", {})["Content-Type"] = "application/x-www-form-urlencoded" + kwargs["data"] = data + else: + kwargs["json"] = data + + return cast("dict[str, Any]", await self._request("POST", endpoint, **kwargs)) + + async def put( + self, + endpoint: str, + data: dict[str, Any] | None = None, + as_form: bool = False, + **kwargs: Any, + ) -> dict[str, Any]: + """Send PUT data to Tidal API.""" + # Special handling for mixes which use V2 + if "mixes" in endpoint and "base_url" not in kwargs: + kwargs["base_url"] = self.BASE_URL_V2 + + if as_form: + kwargs.setdefault("headers", {})["Content-Type"] = "application/x-www-form-urlencoded" + kwargs["data"] = data + else: + kwargs["json"] = data + + return cast("dict[str, Any]", await self._request("PUT", endpoint, **kwargs)) + + async def delete( + self, endpoint: str, data: dict[str, Any] | None = None, **kwargs: Any + ) -> dict[str, Any]: + """Delete data from Tidal API.""" + kwargs["json"] = data + return cast("dict[str, Any]", await self._request("DELETE", endpoint, **kwargs)) + + async def _request( + self, method: str, endpoint: str, **kwargs: Any + ) -> dict[str, Any] | tuple[dict[str, Any], str]: + """Handle API requests internally.""" + if not await self.auth.ensure_valid_token(): + raise LoginFailed("Failed to authenticate with Tidal") + + # Prepare URL + base_url = kwargs.pop("base_url", self.BASE_URL) + url = f"{base_url}/{endpoint}" + + # Prepare Headers + headers = kwargs.pop("headers", {}) + headers["Authorization"] = f"Bearer {self.auth.access_token}" + + locale = self.mass.metadata.locale.replace("_", "-") + language = locale.split("-")[0] + headers["Accept-Language"] = f"{locale}, {language};q=0.9, *;q=0.5" + + # Prepare Params + params = kwargs.pop("params", {}) or {} + if self.auth.session_id: + params["sessionId"] = self.auth.session_id + if self.auth.country_code: + params["countryCode"] = self.auth.country_code + + # Extract special handling flags + return_etag = kwargs.pop("return_etag", False) + + self.logger.debug("Making %s request to Tidal API: %s", method, endpoint) + + async with self.mass.http_session.request( + method, url, headers=headers, params=params, **kwargs + ) as response: + return await self._handle_response(response, return_etag) + + async def _handle_response( + self, response: ClientResponse, return_etag: bool = False + ) -> dict[str, Any] | tuple[dict[str, Any], str]: + """Handle API response and common error conditions.""" + if response.status == 401: + raise LoginFailed("Authentication failed") + if response.status == 404: + raise MediaNotFoundError(f"Item not found: {response.url}") + if response.status == 429: + retry_after = int(response.headers.get("Retry-After", 30)) + raise ResourceTemporarilyUnavailable( + "Tidal Rate limit reached", backoff_time=retry_after + ) + if response.status >= 400: + text = await response.text() + self.logger.error("API error: %s - %s", response.status, text) + raise ResourceTemporarilyUnavailable("API error") + + try: + if not response.content_length or response.content_length == 0: + data = {"success": True} + else: + data = await response.json() + + if return_etag: + etag = response.headers.get("ETag", "") + return data, etag + return data + except json.JSONDecodeError as err: + raise ResourceTemporarilyUnavailable("Failed to parse response") from err + + async def paginate( + self, + endpoint: str, + item_key: str = "items", + nested_key: str | None = None, + limit: int = 50, + cursor_based: bool = False, + **kwargs: Any, + ) -> AsyncGenerator[Any, None]: + """Paginate through all items from a Tidal API endpoint.""" + offset = 0 + cursor = None + + while True: + params = {"limit": limit} + if cursor_based: + if cursor: + params["cursor"] = cursor + else: + params["offset"] = offset + + if "params" in kwargs: + params.update(kwargs.pop("params")) + + api_result = await self.get(endpoint, params=params, **kwargs) + response = api_result[0] if isinstance(api_result, tuple) else api_result + + items = response.get(item_key, []) + if not items: + break + + for item in items: + if nested_key and nested_key in item and item[nested_key]: + yield item[nested_key] + else: + yield item + + if cursor_based: + cursor = response.get("cursor") + if not cursor: + break + else: + offset += len(items) diff --git a/music_assistant/providers/tidal/auth_manager.py b/music_assistant/providers/tidal/auth_manager.py index 8eea9201..0022e645 100644 --- a/music_assistant/providers/tidal/auth_manager.py +++ b/music_assistant/providers/tidal/auth_manager.py @@ -82,7 +82,7 @@ class TidalAuthManager: self.http_session = http_session self.update_config = config_updater self.logger = logger - self._auth_info = None + self._auth_info: dict[str, Any] | None = None self.user = TidalUser() async def initialize(self, auth_data: str) -> bool: @@ -126,7 +126,7 @@ class TidalAuthManager: return False # Check if token is expired - expires_at = self._auth_info.get("expires_at", 0) # type: ignore[unreachable] + expires_at = self._auth_info.get("expires_at", 0) if expires_at > time.time() + TOKEN_REFRESH_BUFFER: return True @@ -138,7 +138,7 @@ class TidalAuthManager: if not self._auth_info: return False - refresh_token = self._auth_info.get("refresh_token") # type: ignore[unreachable] + refresh_token = self._auth_info.get("refresh_token") if not refresh_token: return False diff --git a/music_assistant/providers/tidal/library.py b/music_assistant/providers/tidal/library.py new file mode 100644 index 00000000..4e0ea294 --- /dev/null +++ b/music_assistant/providers/tidal/library.py @@ -0,0 +1,146 @@ +"""Library management for Tidal.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, Any + +from aiohttp.client_exceptions import ClientError +from music_assistant_models.enums import MediaType +from music_assistant_models.errors import MediaNotFoundError, ResourceTemporarilyUnavailable + +from .parsers import parse_album, parse_artist, parse_playlist, parse_track + +if TYPE_CHECKING: + from collections.abc import AsyncGenerator + + from music_assistant_models.media_items import Album, Artist, MediaItemType, Playlist, Track + + from .provider import TidalProvider + + +class TidalLibraryManager: + """Manages Tidal library operations.""" + + def __init__(self, provider: TidalProvider): + """Initialize library manager.""" + self.provider = provider + self.api = provider.api + self.auth = provider.auth + self.logger = provider.logger + + async def get_artists(self) -> AsyncGenerator[Artist, None]: + """Retrieve library artists.""" + path = f"users/{self.auth.user_id}/favorites/artists" + async for item in self.api.paginate(path, nested_key="item"): + if item and item.get("id"): + yield parse_artist(self.provider, item) + + async def get_albums(self) -> AsyncGenerator[Album, None]: + """Retrieve library albums.""" + path = f"users/{self.auth.user_id}/favorites/albums" + async for item in self.api.paginate(path, nested_key="item"): + if item and item.get("id"): + yield parse_album(self.provider, item) + + async def get_tracks(self) -> AsyncGenerator[Track, None]: + """Retrieve library tracks.""" + path = f"users/{self.auth.user_id}/favorites/tracks" + async for item in self.api.paginate(path, nested_key="item"): + if item and item.get("id"): + yield parse_track(self.provider, item) + + async def get_playlists(self) -> AsyncGenerator[Playlist, None]: + """Retrieve library playlists.""" + # 1. Get favorite mixes + async for item in self.api.paginate( + "favorites/mixes", item_key="items", base_url=self.api.BASE_URL_V2, cursor_based=True + ): + if item and item.get("id"): + yield parse_playlist(self.provider, item, is_mix=True) + + # 2. Get user playlists + path = f"users/{self.auth.user_id}/playlistsAndFavoritePlaylists" + async for item in self.api.paginate(path, nested_key="playlist"): + if item and item.get("uuid"): + yield parse_playlist(self.provider, item) + + async def add_item(self, item: MediaItemType) -> bool: + """Add item to library.""" + endpoint, data, is_mix = self._get_endpoint_data(item.item_id, item.media_type, "add") + if not endpoint: + return False + + try: + if is_mix: + await self.api.put(endpoint, data=data, as_form=True) + else: + await self.api.post( + f"users/{self.auth.user_id}/{endpoint}", data=data, as_form=True + ) + return True + except (ClientError, MediaNotFoundError, ResourceTemporarilyUnavailable): + return False + + async def remove_item(self, prov_item_id: str, media_type: MediaType) -> bool: + """Remove item from library.""" + endpoint, data, is_mix = self._get_endpoint_data(prov_item_id, media_type, "remove") + if not endpoint: + return False + + try: + if is_mix: + await self.api.put(endpoint, data=data, as_form=True) + else: + await self.api.delete(f"users/{self.auth.user_id}/{endpoint}") + return True + except (ClientError, MediaNotFoundError, ResourceTemporarilyUnavailable): + return False + + def _get_endpoint_data( + self, item_id: str, media_type: MediaType, operation: str + ) -> tuple[str | None, dict[str, Any], bool]: + """Get endpoint and data for library operations.""" + if media_type == MediaType.PLAYLIST and item_id.startswith("mix_"): + mix_id = item_id[4:] + if operation == "add": + return ( + "favorites/mixes/add", + { + "mixIds": mix_id, + "onArtifactNotFound": "FAIL", + "deviceType": "BROWSER", + }, + True, + ) + return ( + "favorites/mixes/remove", + {"mixIds": mix_id, "deviceType": "BROWSER"}, + True, + ) + + if media_type == MediaType.ARTIST: + return ( + ("favorites/artists", {"artistId": item_id}, False) + if operation == "add" + else (f"favorites/artists/{item_id}", {}, False) + ) + if media_type == MediaType.ALBUM: + return ( + ("favorites/albums", {"albumId": item_id}, False) + if operation == "add" + else (f"favorites/albums/{item_id}", {}, False) + ) + if media_type == MediaType.TRACK: + return ( + ("favorites/tracks", {"trackId": item_id}, False) + if operation == "add" + else (f"favorites/tracks/{item_id}", {}, False) + ) + if media_type == MediaType.PLAYLIST: + return ( + ("favorites/playlists", {"uuids": item_id}, False) + if operation == "add" + else (f"favorites/playlists/{item_id}", {}, False) + ) + + return None, {}, False diff --git a/music_assistant/providers/tidal/media.py b/music_assistant/providers/tidal/media.py new file mode 100644 index 00000000..93bfb398 --- /dev/null +++ b/music_assistant/providers/tidal/media.py @@ -0,0 +1,228 @@ +"""Media retrieval operations for Tidal.""" + +from __future__ import annotations + +from contextlib import suppress +from typing import TYPE_CHECKING, Any + +from aiohttp.client_exceptions import ClientError +from music_assistant_models.enums import MediaType +from music_assistant_models.errors import MediaNotFoundError +from music_assistant_models.media_items import SearchResults + +from .parsers import parse_album, parse_artist, parse_playlist, parse_track + +if TYPE_CHECKING: + from music_assistant_models.media_items import Album, Artist, Playlist, Track + + from .provider import TidalProvider + + +class TidalMediaManager: + """Handles retrieval of media items from Tidal.""" + + def __init__(self, provider: TidalProvider): + """Initialize media retriever.""" + self.provider = provider + self.api = provider.api + self.logger = provider.logger + + async def search( + self, search_query: str, media_types: list[MediaType], limit: int = 5 + ) -> SearchResults: + """Perform search on Tidal.""" + parsed_results = SearchResults() + media_type_strings = [] + + if MediaType.ARTIST in media_types: + media_type_strings.append("artists") + if MediaType.ALBUM in media_types: + media_type_strings.append("albums") + if MediaType.TRACK in media_types: + media_type_strings.append("tracks") + if MediaType.PLAYLIST in media_types: + media_type_strings.append("playlists") + + if not media_type_strings: + return parsed_results + + results = await self.api.get_data( + "search", + params={ + "query": search_query.replace("'", ""), + "limit": limit, + "types": ",".join(media_type_strings), + }, + ) + + if "artists" in results and results["artists"].get("items"): + parsed_results.artists = [ + parse_artist(self.provider, x) for x in results["artists"]["items"] + ] + if "albums" in results and results["albums"].get("items"): + parsed_results.albums = [ + parse_album(self.provider, x) for x in results["albums"]["items"] + ] + if "playlists" in results and results["playlists"].get("items"): + parsed_results.playlists = [ + parse_playlist(self.provider, x) for x in results["playlists"]["items"] + ] + if "tracks" in results and results["tracks"].get("items"): + parsed_results.tracks = [ + parse_track(self.provider, x) for x in results["tracks"]["items"] + ] + return parsed_results + + async def get_artist(self, prov_artist_id: str) -> Artist: + """Get artist details.""" + try: + data = await self.api.get_data(f"artists/{prov_artist_id}") + return parse_artist(self.provider, data) + except (ClientError, KeyError, ValueError) as err: + raise MediaNotFoundError(f"Artist {prov_artist_id} not found") from err + + async def get_album(self, prov_album_id: str) -> Album: + """Get album details.""" + try: + data = await self.api.get_data(f"albums/{prov_album_id}") + return parse_album(self.provider, data) + except (ClientError, KeyError, ValueError) as err: + raise MediaNotFoundError(f"Album {prov_album_id} not found") from err + + async def get_track(self, prov_track_id: str) -> Track: + """Get track details.""" + try: + track_obj = await self.api.get_data(f"tracks/{prov_track_id}") + + lyrics = None + with suppress(MediaNotFoundError): + lyrics = await self.api.get_data(f"tracks/{prov_track_id}/lyrics") + + return parse_track(self.provider, track_obj, lyrics=lyrics) + except (ClientError, KeyError, ValueError) as err: + raise MediaNotFoundError(f"Track {prov_track_id} not found") from err + + async def get_playlist(self, prov_playlist_id: str) -> Playlist: + """Get playlist details.""" + if prov_playlist_id.startswith("mix_"): + return await self._get_mix_details(prov_playlist_id[4:]) + + try: + data = await self.api.get_data(f"playlists/{prov_playlist_id}") + return parse_playlist(self.provider, data) + except MediaNotFoundError: + return await self._get_mix_details(prov_playlist_id) + except (ClientError, KeyError, ValueError) as err: + raise MediaNotFoundError(f"Playlist {prov_playlist_id} not found") from err + + async def _get_mix_details(self, prov_mix_id: str) -> Playlist: + """Get details for a Tidal Mix.""" + try: + params = {"mixId": prov_mix_id, "deviceType": "BROWSER"} + tidal_mix = await self.api.get_data("pages/mix", params=params) + + mix_obj = { + "id": prov_mix_id, + "title": tidal_mix.get("title", "Unknown Mix"), + "updated": tidal_mix.get("lastUpdated", ""), + "images": {}, + } + + # Try to extract images from rows/modules structure + rows = tidal_mix.get("rows", []) + if rows and (modules := rows[0].get("modules")): + if mix_data := modules[0].get("mix"): + mix_obj["images"] = mix_data.get("images", {}) + + if "subTitle" not in mix_obj: + mix_obj["subTitle"] = tidal_mix.get("subTitle", "") + + return parse_playlist(self.provider, mix_obj, is_mix=True) + except (ClientError, KeyError, ValueError) as err: + raise MediaNotFoundError(f"Mix {prov_mix_id} not found") from err + + async def get_album_tracks(self, prov_album_id: str) -> list[Track]: + """Get album tracks.""" + try: + data = await self.api.get_data(f"albums/{prov_album_id}/tracks", params={"limit": 250}) + return [parse_track(self.provider, x) for x in data.get("items", [])] + except (ClientError, KeyError, ValueError) as err: + raise MediaNotFoundError(f"Album {prov_album_id} not found") from err + + async def get_artist_albums(self, prov_artist_id: str) -> list[Album]: + """Get artist albums.""" + try: + data = await self.api.get_data( + f"artists/{prov_artist_id}/albums", params={"limit": 250} + ) + return [parse_album(self.provider, x) for x in data.get("items", [])] + except (ClientError, KeyError, ValueError) as err: + raise MediaNotFoundError(f"Artist {prov_artist_id} not found") from err + + async def get_artist_toptracks(self, prov_artist_id: str) -> list[Track]: + """Get artist top tracks.""" + try: + data = await self.api.get_data( + f"artists/{prov_artist_id}/toptracks", params={"limit": 10, "offset": 0} + ) + return [parse_track(self.provider, x) for x in data.get("items", [])] + except (ClientError, KeyError, ValueError) as err: + raise MediaNotFoundError(f"Artist {prov_artist_id} not found") from err + + async def get_similar_tracks(self, prov_track_id: str, limit: int = 25) -> list[Track]: + """Get similar tracks.""" + try: + data = await self.api.get_data(f"tracks/{prov_track_id}/radio", params={"limit": limit}) + return [parse_track(self.provider, x) for x in data.get("items", [])] + except (ClientError, KeyError, ValueError) as err: + raise MediaNotFoundError(f"Track {prov_track_id} not found") from err + + async def get_playlist_tracks(self, prov_playlist_id: str, page: int = 0) -> list[Track]: + """Get playlist tracks.""" + page_size = 200 + offset = page * page_size + + if prov_playlist_id.startswith("mix_"): + return await self._get_mix_tracks(prov_playlist_id[4:], page_size, offset) + + try: + data = await self.api.get_data( + f"playlists/{prov_playlist_id}/tracks", + params={"limit": page_size, "offset": offset}, + ) + return self._process_tracks(data.get("items", []), offset) + except MediaNotFoundError: + return await self._get_mix_tracks(prov_playlist_id, page_size, offset) + + async def _get_mix_tracks(self, mix_id: str, limit: int, offset: int) -> list[Track]: + """Get tracks from a mix.""" + try: + params = {"mixId": mix_id, "deviceType": "BROWSER"} + data = await self.api.get_data("pages/mix", params=params) + + # Mix tracks are usually in the second row + rows = data.get("rows", []) + if len(rows) < 2: + raise MediaNotFoundError(f"Mix {mix_id} has no tracks") + + modules = rows[1].get("modules", []) + if not modules or "pagedList" not in modules[0]: + raise MediaNotFoundError(f"Mix {mix_id} has no tracks") + + all_items = modules[0]["pagedList"].get("items", []) + # Manual pagination for mixes + paged_items = all_items[offset : offset + limit] + return self._process_tracks(paged_items, offset) + except (ClientError, KeyError, ValueError) as err: + raise MediaNotFoundError(f"Mix {mix_id} not found") from err + + def _process_tracks(self, items: list[dict[str, Any]], offset: int) -> list[Track]: + result = [] + for idx, item in enumerate(items, 1): + try: + track = parse_track(self.provider, item) + track.position = offset + idx + result.append(track) + except (KeyError, TypeError): + continue + return result diff --git a/music_assistant/providers/tidal/parsers.py b/music_assistant/providers/tidal/parsers.py new file mode 100644 index 00000000..f2caa4fb --- /dev/null +++ b/music_assistant/providers/tidal/parsers.py @@ -0,0 +1,317 @@ +"""Parsers for Tidal API responses.""" + +from __future__ import annotations + +from contextlib import suppress +from datetime import datetime +from typing import TYPE_CHECKING, Any + +from music_assistant_models.enums import ( + AlbumType, + ContentType, + ExternalID, + ImageType, + MediaType, +) +from music_assistant_models.media_items import ( + Album, + Artist, + AudioFormat, + MediaItemImage, + Playlist, + ProviderMapping, + Track, + UniqueList, +) + +from music_assistant.helpers.util import infer_album_type + +from .constants import BROWSE_URL, RESOURCES_URL + +if TYPE_CHECKING: + from .provider import TidalProvider + + +def parse_artist(provider: TidalProvider, artist_obj: dict[str, Any]) -> Artist: + """Parse tidal artist object to generic layout.""" + artist_id = str(artist_obj["id"]) + artist = Artist( + item_id=artist_id, + provider=provider.lookup_key, + name=artist_obj["name"], + provider_mappings={ + ProviderMapping( + item_id=artist_id, + provider_domain=provider.domain, + provider_instance=provider.instance_id, + # NOTE: don't use the /browse endpoint as it's + # not working for musicbrainz lookups + url=f"https://tidal.com/artist/{artist_id}", + ) + }, + ) + # metadata + if artist_obj["picture"]: + picture_id = artist_obj["picture"].replace("-", "/") + image_url = f"{RESOURCES_URL}/{picture_id}/750x750.jpg" + artist.metadata.images = UniqueList( + [ + MediaItemImage( + type=ImageType.THUMB, + path=image_url, + provider=provider.lookup_key, + remotely_accessible=True, + ) + ] + ) + + return artist + + +def parse_album(provider: TidalProvider, album_obj: dict[str, Any]) -> Album: + """Parse tidal album object to generic layout.""" + name = album_obj.get("title", "Unknown Album") + version = album_obj.get("version", "") or "" + album_id = str(album_obj.get("id", "")) + + album = Album( + item_id=album_id, + provider=provider.lookup_key, + name=name, + version=version, + provider_mappings={ + ProviderMapping( + item_id=album_id, + provider_domain=provider.domain, + provider_instance=provider.instance_id, + audio_format=AudioFormat( + content_type=ContentType.FLAC, + ), + url=f"https://tidal.com/album/{album_id}", + available=album_obj.get("streamReady", True), # Default to available + ) + }, + ) + + # Safely handle artists array + various_artist_album: bool = False + for artist_obj in album_obj.get("artists", []): + try: + if artist_obj.get("name") == "Various Artists": + various_artist_album = True + album.artists.append(parse_artist(provider, artist_obj)) + except (KeyError, TypeError) as err: + provider.logger.warning("Error parsing artist in album %s: %s", name, err) + + # Safely determine album type + album_type = album_obj.get("type", "ALBUM") + if album_type == "COMPILATION" or various_artist_album: + album.album_type = AlbumType.COMPILATION + elif album_type == "ALBUM": + album.album_type = AlbumType.ALBUM + elif album_type == "EP": + album.album_type = AlbumType.EP + elif album_type == "SINGLE": + album.album_type = AlbumType.SINGLE + + # Try inference - override if it finds something more specific + inferred_type = infer_album_type(name, version) + if inferred_type in (AlbumType.SOUNDTRACK, AlbumType.LIVE): + album.album_type = inferred_type + + # Safely parse year + if release_date := album_obj.get("releaseDate", ""): + try: + album.year = int(release_date.split("-")[0]) + except (ValueError, IndexError): + provider.logger.debug("Invalid release date format: %s", release_date) + with suppress(ValueError): + album.metadata.release_date = datetime.fromisoformat(release_date) + + # Safely set metadata + upc = album_obj.get("upc") + if upc: + album.external_ids.add((ExternalID.BARCODE, upc)) + + album.metadata.copyright = album_obj.get("copyright", "") + album.metadata.explicit = album_obj.get("explicit", False) + album.metadata.popularity = album_obj.get("popularity", 0) + + # Safely handle cover image + cover = album_obj.get("cover") + if cover: + picture_id = cover.replace("-", "/") + image_url = f"{RESOURCES_URL}/{picture_id}/750x750.jpg" + album.metadata.images = UniqueList( + [ + MediaItemImage( + type=ImageType.THUMB, + path=image_url, + provider=provider.lookup_key, + remotely_accessible=True, + ) + ] + ) + + return album + + +def parse_track( + provider: TidalProvider, + track_obj: dict[str, Any], + lyrics: dict[str, str] | None = None, +) -> Track: + """Parse tidal track object to generic layout.""" + version = track_obj.get("version", "") or "" + track_id = str(track_obj.get("id", 0)) + media_metadata = track_obj.get("mediaMetadata", {}) + tags = media_metadata.get("tags", []) + hi_res_lossless = any(tag in tags for tag in ["HIRES_LOSSLESS", "HI_RES_LOSSLESS"]) + track = Track( + item_id=track_id, + provider=provider.lookup_key, + name=track_obj.get("title", "Unknown"), + version=version, + duration=track_obj.get("duration", 0), + provider_mappings={ + ProviderMapping( + item_id=str(track_id), + provider_domain=provider.domain, + provider_instance=provider.instance_id, + audio_format=AudioFormat( + content_type=ContentType.FLAC, + bit_depth=24 if hi_res_lossless else 16, + ), + url=f"https://tidal.com/track/{track_id}", + available=track_obj["streamReady"], + ) + }, + disc_number=track_obj.get("volumeNumber", 0) or 0, + track_number=track_obj.get("trackNumber", 0) or 0, + ) + if "isrc" in track_obj: + track.external_ids.add((ExternalID.ISRC, track_obj["isrc"])) + track.artists = UniqueList() + for track_artist in track_obj["artists"]: + artist = parse_artist(provider, track_artist) + track.artists.append(artist) + # metadata + track.metadata.explicit = track_obj["explicit"] + track.metadata.popularity = track_obj["popularity"] + if "copyright" in track_obj: + track.metadata.copyright = track_obj["copyright"] + if lyrics and "lyrics" in lyrics: + track.metadata.lyrics = lyrics["lyrics"] + if lyrics and "subtitles" in lyrics: + track.metadata.lrc_lyrics = lyrics["subtitles"] + if track_obj["album"]: + # Here we use an ItemMapping as Tidal returns + # minimal data when getting an Album from a Track + track.album = provider.get_item_mapping( + media_type=MediaType.ALBUM, + key=str(track_obj["album"]["id"]), + name=track_obj["album"]["title"], + ) + if track_obj["album"]["cover"]: + picture_id = track_obj["album"]["cover"].replace("-", "/") + image_url = f"{RESOURCES_URL}/{picture_id}/750x750.jpg" + track.metadata.images = UniqueList( + [ + MediaItemImage( + type=ImageType.THUMB, + path=image_url, + provider=provider.lookup_key, + remotely_accessible=True, + ) + ] + ) + return track + + +def parse_playlist( + provider: TidalProvider, playlist_obj: dict[str, Any], is_mix: bool = False +) -> Playlist: + """Parse tidal playlist object to generic layout.""" + # Get ID based on playlist type + raw_id = str(playlist_obj.get("id" if is_mix else "uuid", "")) + + # Add prefix for mixes to distinguish them + playlist_id = f"mix_{raw_id}" if is_mix else raw_id + + # Owner logic differs between types + if is_mix: + owner_name = "Created by Tidal" + is_editable = False + else: + creator_id = None + creator = playlist_obj.get("creator", {}) + if creator: + creator_id = creator.get("id") + is_editable = bool(creator_id and str(creator_id) == str(provider.auth.user_id)) + + owner_name = "Tidal" + if is_editable: + if provider.auth.user.profile_name: + owner_name = provider.auth.user.profile_name + elif provider.auth.user.user_name: + owner_name = provider.auth.user.user_name + elif provider.auth.user_id: + owner_name = str(provider.auth.user_id) + + # URL path differs by type - use raw_id for URLs + url_path = "mix" if is_mix else "playlist" + + playlist = Playlist( + item_id=playlist_id, + provider=provider.instance_id if is_editable else provider.lookup_key, + name=playlist_obj.get("title", "Unknown"), + owner=owner_name, + provider_mappings={ + ProviderMapping( + item_id=playlist_id, # Use raw ID for provider mapping + provider_domain=provider.domain, + provider_instance=provider.instance_id, + url=f"{BROWSE_URL}/{url_path}/{raw_id}", + ) + }, + is_editable=is_editable, + ) + + # Metadata - different fields based on type + + # Add the description from the subtitle for mixes + if is_mix: + subtitle = playlist_obj.get("subTitle") + if subtitle: + playlist.metadata.description = subtitle + + # Handle images differently based on type + if is_mix: + if pictures := playlist_obj.get("images", {}).get("MEDIUM"): + image_url = pictures.get("url", "") + if image_url: + playlist.metadata.images = UniqueList( + [ + MediaItemImage( + type=ImageType.THUMB, + path=image_url, + provider=provider.lookup_key, + remotely_accessible=True, + ) + ] + ) + elif picture := (playlist_obj.get("squareImage") or playlist_obj.get("image")): + picture_id = picture.replace("-", "/") + image_url = f"{RESOURCES_URL}/{picture_id}/750x750.jpg" + playlist.metadata.images = UniqueList( + [ + MediaItemImage( + type=ImageType.THUMB, + path=image_url, + provider=provider.lookup_key, + remotely_accessible=True, + ) + ] + ) + + return playlist diff --git a/music_assistant/providers/tidal/playlist.py b/music_assistant/providers/tidal/playlist.py new file mode 100644 index 00000000..7988f593 --- /dev/null +++ b/music_assistant/providers/tidal/playlist.py @@ -0,0 +1,74 @@ +"""Playlist management for Tidal.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +from aiohttp.client_exceptions import ClientError +from music_assistant_models.errors import ResourceTemporarilyUnavailable + +from .parsers import parse_playlist + +if TYPE_CHECKING: + from music_assistant_models.media_items import Playlist + + from .provider import TidalProvider + + +class TidalPlaylistManager: + """Manages Tidal playlist operations.""" + + def __init__(self, provider: TidalProvider): + """Initialize playlist manager.""" + self.provider = provider + self.api = provider.api + self.auth = provider.auth + self.logger = provider.logger + + async def create(self, name: str) -> Playlist: + """Create a new playlist.""" + try: + data = {"title": name, "description": ""} + result = await self.api.post( + f"users/{self.auth.user_id}/playlists", data=data, as_form=True + ) + return parse_playlist(self.provider, result) + except ClientError as err: + raise ResourceTemporarilyUnavailable("Failed to create playlist") from err + + async def add_tracks(self, prov_playlist_id: str, prov_track_ids: list[str]) -> None: + """Add tracks to playlist.""" + try: + # Get ETag first + api_result = await self.api.get(f"playlists/{prov_playlist_id}", return_etag=True) + playlist_obj = api_result[0] if isinstance(api_result, tuple) else api_result + etag = api_result[1] if isinstance(api_result, tuple) else None + + data = { + "onArtifactNotFound": "SKIP", + "trackIds": ",".join(map(str, prov_track_ids)), + "toIndex": playlist_obj.get("numberOfTracks", 0), + "onDupes": "SKIP", + } + headers = {"If-None-Match": etag} if etag else {} + + await self.api.post( + f"playlists/{prov_playlist_id}/items", data=data, as_form=True, headers=headers + ) + except ClientError as err: + raise ResourceTemporarilyUnavailable("Failed to add tracks") from err + + async def remove_tracks(self, prov_playlist_id: str, positions: tuple[int, ...]) -> None: + """Remove tracks from playlist.""" + try: + # Get ETag first + api_result = await self.api.get(f"playlists/{prov_playlist_id}", return_etag=True) + etag = api_result[1] if isinstance(api_result, tuple) else None + + # Tidal uses 0-based indices in URL path + indices = ",".join(str(pos - 1) for pos in positions) + headers = {"If-None-Match": etag} if etag else {} + + await self.api.delete(f"playlists/{prov_playlist_id}/items/{indices}", headers=headers) + except ClientError as err: + raise ResourceTemporarilyUnavailable("Failed to remove tracks") from err diff --git a/music_assistant/providers/tidal/provider.py b/music_assistant/providers/tidal/provider.py new file mode 100644 index 00000000..b4afbc91 --- /dev/null +++ b/music_assistant/providers/tidal/provider.py @@ -0,0 +1,244 @@ +"""Tidal music provider implementation.""" + +from __future__ import annotations + +import json +from datetime import datetime +from typing import TYPE_CHECKING, Any + +from music_assistant_models.enums import MediaType, ProviderFeature +from music_assistant_models.errors import LoginFailed +from music_assistant_models.media_items import ( + Album, + Artist, + ItemMapping, + MediaItemType, + Playlist, + RecommendationFolder, + SearchResults, + Track, +) + +from music_assistant.controllers.cache import use_cache +from music_assistant.models.music_provider import MusicProvider + +from .api_client import TidalAPIClient +from .auth_manager import TidalAuthManager +from .constants import ( + CACHE_CATEGORY_RECOMMENDATIONS, + CONF_AUTH_TOKEN, + CONF_EXPIRY_TIME, + CONF_REFRESH_TOKEN, + CONF_USER_ID, +) +from .library import TidalLibraryManager +from .media import TidalMediaManager +from .playlist import TidalPlaylistManager +from .recommendations import TidalRecommendationManager +from .streaming import TidalStreamingManager + +if TYPE_CHECKING: + from collections.abc import AsyncGenerator + + from music_assistant_models.config_entries import ProviderConfig + from music_assistant_models.provider import ProviderManifest + from music_assistant_models.streamdetails import StreamDetails + + from music_assistant.mass import MusicAssistant + + +SUPPORTED_FEATURES = { + ProviderFeature.LIBRARY_ARTISTS, + ProviderFeature.LIBRARY_ALBUMS, + ProviderFeature.LIBRARY_TRACKS, + ProviderFeature.LIBRARY_PLAYLISTS, + ProviderFeature.ARTIST_ALBUMS, + ProviderFeature.ARTIST_TOPTRACKS, + ProviderFeature.SEARCH, + ProviderFeature.LIBRARY_ARTISTS_EDIT, + ProviderFeature.LIBRARY_ALBUMS_EDIT, + ProviderFeature.LIBRARY_TRACKS_EDIT, + ProviderFeature.LIBRARY_PLAYLISTS_EDIT, + ProviderFeature.PLAYLIST_CREATE, + ProviderFeature.SIMILAR_TRACKS, + ProviderFeature.BROWSE, + ProviderFeature.PLAYLIST_TRACKS_EDIT, + ProviderFeature.RECOMMENDATIONS, +} + + +class TidalProvider(MusicProvider): + """Implementation of a Tidal MusicProvider.""" + + def __init__(self, mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig): + """Initialize Tidal provider.""" + super().__init__(mass, manifest, config, SUPPORTED_FEATURES) + self.auth = TidalAuthManager( + http_session=mass.http_session, + config_updater=self._update_auth_config, + logger=self.logger, + ) + self.api = TidalAPIClient(self) + self.library = TidalLibraryManager(self) + self.media = TidalMediaManager(self) + self.playlists = TidalPlaylistManager(self) + self.recommendations_manager = TidalRecommendationManager(self) + self.streaming = TidalStreamingManager(self) + + def _update_auth_config(self, auth_info: dict[str, Any]) -> None: + """Update auth config with new auth info.""" + self.update_config_value(CONF_AUTH_TOKEN, auth_info["access_token"], encrypted=True) + self.update_config_value(CONF_REFRESH_TOKEN, auth_info["refresh_token"], encrypted=True) + self.update_config_value(CONF_EXPIRY_TIME, auth_info["expires_at"]) + self.update_config_value(CONF_USER_ID, auth_info["userId"]) + + async def handle_async_init(self) -> None: + """Handle async initialization of the provider.""" + access_token = self.config.get_value(CONF_AUTH_TOKEN) + refresh_token = self.config.get_value(CONF_REFRESH_TOKEN) + expires_at = self.config.get_value(CONF_EXPIRY_TIME) + user_id = self.config.get_value(CONF_USER_ID) + + if not access_token or not refresh_token: + raise LoginFailed("Missing authentication data") + + if isinstance(expires_at, str) and "T" in expires_at: + try: + dt = datetime.fromisoformat(expires_at) + expires_at = dt.timestamp() + self.update_config_value(CONF_EXPIRY_TIME, expires_at) + except ValueError: + expires_at = 0 + + auth_data = { + "access_token": access_token, + "refresh_token": refresh_token, + "expires_at": expires_at, + "userId": user_id, + } + + if not await self.auth.initialize(json.dumps(auth_data)): + raise LoginFailed("Failed to authenticate with Tidal") + + api_result = await self.api.get("sessions") + user_info = api_result[0] if isinstance(api_result, tuple) else api_result + logged_in_user = await self.get_user(str(user_info.get("userId"))) + await self.auth.update_user_info(logged_in_user, str(user_info.get("sessionId"))) + + async def get_user(self, prov_user_id: str) -> dict[str, Any]: + """Get user information.""" + return await self.api.get_data(f"users/{prov_user_id}") + + @use_cache(3600 * 24 * 14) + async def search( + self, search_query: str, media_types: list[MediaType], limit: int = 5 + ) -> SearchResults: + """Perform search on musicprovider.""" + return await self.media.search(search_query, media_types, limit) + + @use_cache(3600 * 24) + async def get_similar_tracks(self, prov_track_id: str, limit: int = 25) -> list[Track]: + """Get similar tracks for given track id.""" + return await self.media.get_similar_tracks(prov_track_id, limit) + + @use_cache(3600 * 24 * 30) + async def get_artist(self, prov_artist_id: str) -> Artist: + """Get artist details for given artist id.""" + return await self.media.get_artist(prov_artist_id) + + @use_cache(3600 * 24 * 30) + async def get_album(self, prov_album_id: str) -> Album: + """Get album details for given album id.""" + return await self.media.get_album(prov_album_id) + + @use_cache(3600 * 24 * 30) + async def get_track(self, prov_track_id: str) -> Track: + """Get track details for given track id.""" + return await self.media.get_track(prov_track_id) + + @use_cache(3600 * 24 * 30) + async def get_playlist(self, prov_playlist_id: str) -> Playlist: + """Get playlist details for given playlist id.""" + return await self.media.get_playlist(prov_playlist_id) + + @use_cache(3600 * 24 * 30) + async def get_album_tracks(self, prov_album_id: str) -> list[Track]: + """Get album tracks for given album id.""" + return await self.media.get_album_tracks(prov_album_id) + + @use_cache(3600 * 24 * 7) + async def get_artist_albums(self, prov_artist_id: str) -> list[Album]: + """Get a list of all albums for the given artist.""" + return await self.media.get_artist_albums(prov_artist_id) + + @use_cache(3600 * 24 * 7) + async def get_artist_toptracks(self, prov_artist_id: str) -> list[Track]: + """Get a list of 10 most popular tracks for the given artist.""" + return await self.media.get_artist_toptracks(prov_artist_id) + + @use_cache(3600 * 3) + async def get_playlist_tracks(self, prov_playlist_id: str, page: int = 0) -> list[Track]: + """Get playlist tracks.""" + return await self.media.get_playlist_tracks(prov_playlist_id, page) + + async def get_stream_details( + self, item_id: str, media_type: MediaType = MediaType.TRACK + ) -> StreamDetails: + """Return the content details for the given track when it will be streamed.""" + return await self.streaming.get_stream_details(item_id) + + def get_item_mapping(self, media_type: MediaType, key: str, name: str) -> ItemMapping: + """Create a generic item mapping.""" + return ItemMapping( + media_type=media_type, + item_id=key, + provider=self.lookup_key, + name=name, + ) + + async def get_library_artists(self) -> AsyncGenerator[Artist, None]: + """Retrieve all library artists from Tidal.""" + async for item in self.library.get_artists(): + yield item + + async def get_library_albums(self) -> AsyncGenerator[Album, None]: + """Retrieve all library albums from Tidal.""" + async for item in self.library.get_albums(): + yield item + + async def get_library_tracks(self) -> AsyncGenerator[Track, None]: + """Retrieve library tracks from Tidal.""" + async for item in self.library.get_tracks(): + yield item + + async def get_library_playlists(self) -> AsyncGenerator[Playlist, None]: + """Retrieve all library playlists from the provider.""" + async for item in self.library.get_playlists(): + yield item + + async def library_add(self, item: MediaItemType) -> bool: + """Add item to library.""" + return await self.library.add_item(item) + + async def library_remove(self, prov_item_id: str, media_type: MediaType) -> bool: + """Remove item from library.""" + return await self.library.remove_item(prov_item_id, media_type) + + async def create_playlist(self, name: str) -> Playlist: + """Create a new playlist on provider with given name.""" + return await self.playlists.create(name) + + async def add_playlist_tracks(self, prov_playlist_id: str, prov_track_ids: list[str]) -> None: + """Add track(s) to playlist.""" + await self.playlists.add_tracks(prov_playlist_id, prov_track_ids) + + async def remove_playlist_tracks( + self, prov_playlist_id: str, positions_to_remove: tuple[int, ...] + ) -> None: + """Remove track(s) from playlist.""" + await self.playlists.remove_tracks(prov_playlist_id, positions_to_remove) + + @use_cache(expiration=3600, category=CACHE_CATEGORY_RECOMMENDATIONS) + async def recommendations(self) -> list[RecommendationFolder]: + """Get this provider's recommendations organized into folders.""" + return await self.recommendations_manager.get_recommendations() diff --git a/music_assistant/providers/tidal/recommendations.py b/music_assistant/providers/tidal/recommendations.py new file mode 100644 index 00000000..844c72d5 --- /dev/null +++ b/music_assistant/providers/tidal/recommendations.py @@ -0,0 +1,162 @@ +"""Recommendation logic for Tidal.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +from music_assistant_models.enums import MediaType, ProviderType +from music_assistant_models.media_items import ( + Album, + Artist, + BrowseFolder, + ItemMapping, + MediaItemType, + Playlist, + RecommendationFolder, + Track, + UniqueList, +) + +from .constants import CACHE_CATEGORY_RECOMMENDATIONS +from .tidal_page_parser import TidalPageParser + +if TYPE_CHECKING: + from .provider import TidalProvider + + +class TidalRecommendationManager: + """Manages Tidal recommendations.""" + + def __init__(self, provider: TidalProvider): + """Initialize recommendation manager.""" + self.provider = provider + self.api = provider.api + self.auth = provider.auth + self.logger = provider.logger + self.mass = provider.mass + self.page_cache_ttl = 3 * 3600 + + async def get_recommendations(self) -> list[RecommendationFolder]: + """Get this provider's recommendations organized into folders.""" + results: list[RecommendationFolder] = [] + pages = [ + "pages/home", + "pages/for_you", + "pages/hi_res", + "pages/explore_new_music", + "pages/explore_top_music", + ] + combined_modules: dict[str, list[Playlist | Album | Track | Artist]] = {} + module_content_types: dict[str, MediaType] = {} + module_page_names: dict[str, str] = {} + + try: + all_tidal_configs = await self.mass.config.get_provider_configs(ProviderType.MUSIC) + tidal_configs = [ + config for config in all_tidal_configs if config.domain == self.provider.domain + ] + sorted_instances = sorted(tidal_configs, key=lambda x: x.instance_id) + show_user_identifier = len(sorted_instances) > 1 + + for page_path in pages: + parser = await self.get_page_content(page_path) + page_name = page_path.split("/")[-1].replace("_", " ").title() + + if page_path in ("pages/home", "pages/explore_top_music") and show_user_identifier: + if ( + sorted_instances + and self.provider.instance_id != sorted_instances[0].instance_id + ): + continue + + for module_info in parser._module_map: + title = module_info.get("title", "Unknown") + if not title or title == "Unknown" or "Videos" in title: + continue + + items, content_type = parser.get_module_items(module_info) + if not items: + continue + + key = f"{self.auth.user_id}_{title}" + if key not in combined_modules: + combined_modules[key] = [] + module_content_types[key] = content_type + module_page_names[key] = page_name + + combined_modules[key].extend(items) + + for key, items in combined_modules.items(): + user_id_prefix = f"{self.auth.user_id}_" + title = key.removeprefix(user_id_prefix) + + unique_items = UniqueList(items) + item_id = "".join( + c for c in key.lower().replace(" ", "_") if c.isalnum() or c == "_" + ) + content_type = module_content_types.get(key, MediaType.PLAYLIST) + page_name = module_page_names.get(key, "Tidal") + + folder_name = title + if show_user_identifier and page_name not in ("Home", "Explore Top Music"): + user_name = ( + self.auth.user.profile_name + or self.auth.user.user_name + or str(self.auth.user_id) + ) + folder_name = f"{title} ({user_name})" + + results.append( + RecommendationFolder( + item_id=item_id, + name=folder_name, + provider=self.provider.lookup_key, + items=UniqueList[MediaItemType | ItemMapping | BrowseFolder](unique_items), + subtitle=f"From {page_name} • {len(unique_items)} items", + translation_key=item_id, + icon="mdi-playlist-music" + if content_type == MediaType.PLAYLIST + else "mdi-album", + ) + ) + + except Exception as err: + self.logger.warning("Error fetching recommendations: %s", err) + + return results + + async def get_page_content(self, page_path: str = "pages/home") -> TidalPageParser: + """Get a lazy page parser for a Tidal page.""" + if cached := await TidalPageParser.from_cache(self.provider, page_path): + return cached + + try: + locale = self.mass.metadata.locale.replace("_", "-") + api_result = await self.api.get( + page_path, + base_url="https://listen.tidal.com/v1", + params={ + "locale": locale, + "deviceType": "BROWSER", + "countryCode": self.auth.country_code or "US", + }, + ) + + data = api_result[0] if isinstance(api_result, tuple) else api_result + parser = TidalPageParser(self.provider) + parser.parse_page_structure(data or {}, page_path) + + await self.mass.cache.set( + key=page_path, + data={ + "module_map": parser._module_map, + "content_map": parser._content_map, + "parsed_at": parser._parsed_at, + }, + provider=self.provider.instance_id, + category=CACHE_CATEGORY_RECOMMENDATIONS, + expiration=self.page_cache_ttl, + ) + return parser + except Exception: + return TidalPageParser(self.provider) diff --git a/music_assistant/providers/tidal/streaming.py b/music_assistant/providers/tidal/streaming.py new file mode 100644 index 00000000..194bf95f --- /dev/null +++ b/music_assistant/providers/tidal/streaming.py @@ -0,0 +1,137 @@ +"""Streaming operations for Tidal.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +from music_assistant_models.enums import ContentType, ExternalID, StreamType +from music_assistant_models.errors import MediaNotFoundError +from music_assistant_models.media_items import AudioFormat +from music_assistant_models.streamdetails import StreamDetails + +from .constants import CACHE_CATEGORY_ISRC_MAP, CONF_QUALITY + +if TYPE_CHECKING: + from music_assistant_models.media_items import Track + + from .provider import TidalProvider + + +class TidalStreamingManager: + """Manages Tidal streaming operations.""" + + def __init__(self, provider: TidalProvider): + """Initialize streaming manager.""" + self.provider = provider + self.api = provider.api + self.mass = provider.mass + + async def get_stream_details(self, item_id: str) -> StreamDetails: + """Get stream details for a track.""" + # 1. Try direct lookup + try: + track = await self.provider.get_track(item_id) + except MediaNotFoundError: + # 2. Fallback to ISRC lookup + if isrc_track := await self._get_track_by_isrc(item_id): + track = isrc_track + else: + raise MediaNotFoundError(f"Track {item_id} not found") + + quality = self.provider.config.get_value(CONF_QUALITY) + + # 3. Get playback info + async with self.api.throttler.bypass(): + api_result = await self.api.get( + f"tracks/{track.item_id}/playbackinfopostpaywall", + params={ + "playbackmode": "STREAM", + "assetpresentation": "FULL", + "audioquality": quality, + }, + ) + + stream_data = api_result[0] if isinstance(api_result, tuple) else api_result + + # 4. Parse stream URL + manifest_type = stream_data.get("manifestMimeType", "") + if "dash+xml" in manifest_type and "manifest" in stream_data: + url = f"data:application/dash+xml;base64,{stream_data['manifest']}" + else: + urls = stream_data.get("urls", []) + if not urls: + raise MediaNotFoundError("No stream URL found") + url = urls[0] + + # 5. Determine format + audio_quality = stream_data.get("audioQuality") + if audio_quality in ("HIRES_LOSSLESS", "HI_RES_LOSSLESS", "LOSSLESS"): + content_type = ContentType.FLAC + elif codec := stream_data.get("codec"): + content_type = ContentType.try_parse(codec) + else: + content_type = ContentType.MP4 + + return StreamDetails( + item_id=track.item_id, + provider=self.provider.lookup_key, + audio_format=AudioFormat( + content_type=content_type, + sample_rate=stream_data.get("sampleRate", 44100), + bit_depth=stream_data.get("bitDepth", 16), + channels=2, + ), + stream_type=StreamType.HTTP, + duration=track.duration, + path=url, + can_seek=True, + allow_seek=True, + ) + + async def _get_track_by_isrc(self, item_id: str) -> Track | None: + """Lookup track by ISRC with caching.""" + # Check cache + if cached_id := await self.mass.cache.get( + item_id, provider=self.provider.instance_id, category=CACHE_CATEGORY_ISRC_MAP + ): + try: + return await self.provider.get_track(cached_id) + except MediaNotFoundError: + await self.mass.cache.delete( + item_id, provider=self.provider.instance_id, category=CACHE_CATEGORY_ISRC_MAP + ) + + # Get library item to find ISRC + lib_track = await self.mass.music.tracks.get_library_item_by_prov_id( + item_id, self.provider.instance_id + ) + if not lib_track: + return None + + isrc = next((x[1] for x in lib_track.external_ids if x[0] == ExternalID.ISRC), None) + if not isrc: + return None + + # Lookup by ISRC + api_result = await self.api.get( + "/tracks", params={"filter[isrc]": isrc}, base_url=self.api.OPEN_API_URL + ) + data = api_result[0] if isinstance(api_result, tuple) else api_result + + data_items = data.get("data", []) + if not data_items: + return None + + track_id = str(data_items[0]["id"]) + + # Cache result + await self.mass.cache.set( + key=item_id, + data=track_id, + provider=self.provider.instance_id, + category=CACHE_CATEGORY_ISRC_MAP, + persistent=True, + expiration=86400 * 90, + ) + + return await self.provider.get_track(track_id) diff --git a/music_assistant/providers/tidal/tidal_page_parser.py b/music_assistant/providers/tidal/tidal_page_parser.py index c6833909..a7e664aa 100644 --- a/music_assistant/providers/tidal/tidal_page_parser.py +++ b/music_assistant/providers/tidal/tidal_page_parser.py @@ -9,11 +9,12 @@ from typing import TYPE_CHECKING, Any from music_assistant_models.enums import MediaType from .constants import CACHE_CATEGORY_RECOMMENDATIONS +from .parsers import parse_album, parse_artist, parse_playlist, parse_track if TYPE_CHECKING: from music_assistant_models.media_items import Album, Artist, Playlist, Track - from music_assistant.providers.tidal import TidalProvider + from .provider import TidalProvider class TidalPageParser: @@ -128,7 +129,7 @@ class TidalPageParser: is_mix = "mixId" in item or "mixType" in item try: - playlist = self.provider._parse_playlist(item, is_mix=is_mix) + playlist = parse_playlist(self.provider, item, is_mix=is_mix) result.append(playlist) type_counts[MediaType.PLAYLIST] += 1 except (KeyError, ValueError, TypeError) as err: @@ -147,7 +148,7 @@ class TidalPageParser: for item in items: if isinstance(item, dict): try: - track = self.provider._parse_track(item) + track = parse_track(self.provider, item) result.append(track) type_counts[MediaType.TRACK] += 1 except (KeyError, ValueError, TypeError) as err: @@ -166,7 +167,7 @@ class TidalPageParser: for item in items: if isinstance(item, dict): try: - album = self.provider._parse_album(item) + album = parse_album(self.provider, item) result.append(album) type_counts[MediaType.ALBUM] += 1 except (KeyError, ValueError, TypeError) as err: @@ -185,7 +186,7 @@ class TidalPageParser: for item in items: if isinstance(item, dict): try: - artist = self.provider._parse_artist(item) + artist = parse_artist(self.provider, item) result.append(artist) type_counts[MediaType.ARTIST] += 1 except (KeyError, ValueError, TypeError) as err: @@ -204,7 +205,7 @@ class TidalPageParser: for item in items: if isinstance(item, dict): try: - mix = self.provider._parse_playlist(item, is_mix=True) + mix = parse_playlist(self.provider, item, is_mix=True) result.append(mix) type_counts[MediaType.PLAYLIST] += 1 except (KeyError, ValueError, TypeError) as err: @@ -339,39 +340,39 @@ class TidalPageParser: # Parse based on detected type try: if item_type == "MIX": - media_item: Playlist | Album | Track | Artist = self.provider._parse_playlist( - item, is_mix=True + media_item: Playlist | Album | Track | Artist = parse_playlist( + self.provider, item, is_mix=True ) type_counts[MediaType.PLAYLIST] += 1 return media_item elif item_type == "PLAYLIST": - media_item = self.provider._parse_playlist(item) + media_item = parse_playlist(self.provider, item) type_counts[MediaType.PLAYLIST] += 1 return media_item elif item_type == "ALBUM": - media_item = self.provider._parse_album(item) + media_item = parse_album(self.provider, item) type_counts[MediaType.ALBUM] += 1 return media_item elif item_type == "TRACK": - media_item = self.provider._parse_track(item) + media_item = parse_track(self.provider, item) type_counts[MediaType.TRACK] += 1 return media_item elif item_type == "ARTIST": - media_item = self.provider._parse_artist(item) + media_item = parse_artist(self.provider, item) type_counts[MediaType.ARTIST] += 1 return media_item else: # Last resort - try to infer from structure for unlabeled items if "uuid" in item: - media_item = self.provider._parse_playlist(item) + media_item = parse_playlist(self.provider, item) type_counts[MediaType.PLAYLIST] += 1 return media_item elif "id" in item and "title" in item and "duration" in item: - media_item = self.provider._parse_track(item) + media_item = parse_track(self.provider, item) type_counts[MediaType.TRACK] += 1 return media_item elif "id" in item and "title" in item and "numberOfTracks" in item: - media_item = self.provider._parse_album(item) + media_item = parse_album(self.provider, item) type_counts[MediaType.ALBUM] += 1 return media_item diff --git a/pyproject.toml b/pyproject.toml index 54b83c65..bda73ee1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -164,6 +164,10 @@ line-ending = "lf" [tool.pytest.ini_options] addopts = "--cov music_assistant" asyncio_mode = "auto" +filterwarnings = [ + # Suppress Python 3.13 AsyncMock internal warnings about unawaited coroutines + "ignore:coroutine.*was never awaited:RuntimeWarning", +] [tool.ruff.lint] ignore = [ diff --git a/tests/providers/tidal/__init__.py b/tests/providers/tidal/__init__.py new file mode 100644 index 00000000..d0ab68c9 --- /dev/null +++ b/tests/providers/tidal/__init__.py @@ -0,0 +1 @@ +"""Tests for Tidal.""" diff --git a/tests/providers/tidal/__snapshots__/test_parsers.ambr b/tests/providers/tidal/__snapshots__/test_parsers.ambr new file mode 100644 index 00000000..be38d7f0 --- /dev/null +++ b/tests/providers/tidal/__snapshots__/test_parsers.ambr @@ -0,0 +1,496 @@ +# serializer version: 1 +# name: test_parse_album[album] + dict({ + 'album_type': 'album', + 'artists': list([ + dict({ + 'external_ids': list([ + ]), + 'favorite': False, + 'is_playable': True, + 'item_id': '12345', + 'media_type': 'artist', + 'metadata': dict({ + 'chapters': None, + 'copyright': None, + 'description': None, + 'explicit': None, + 'genres': None, + 'grouping': None, + 'images': list([ + dict({ + 'path': 'https://resources.tidal.com/images/1234/5678/90ab/cdef/750x750.jpg', + 'provider': 'tidal', + 'remotely_accessible': True, + 'type': 'thumb', + }), + ]), + 'label': None, + 'languages': None, + 'last_refresh': None, + 'links': None, + 'lrc_lyrics': None, + 'lyrics': None, + 'mood': None, + 'performers': None, + 'popularity': None, + 'preview': None, + 'release_date': None, + 'review': None, + 'style': None, + }), + 'name': 'Test Artist', + 'position': None, + 'provider': 'tidal', + 'provider_mappings': list([ + dict({ + 'audio_format': dict({ + 'bit_depth': 16, + 'bit_rate': 0, + 'channels': 2, + 'codec_type': '?', + 'content_type': '?', + 'output_format_str': '?', + 'sample_rate': 44100, + }), + 'available': True, + 'details': None, + 'in_library': None, + 'item_id': '12345', + 'provider_domain': 'tidal', + 'provider_instance': 'tidal_instance', + 'url': 'https://tidal.com/artist/12345', + }), + ]), + 'sort_name': 'test artist', + 'translation_key': None, + 'uri': 'tidal://artist/12345', + 'version': '', + }), + ]), + 'external_ids': list([ + list([ + 'barcode', + '123456789012', + ]), + ]), + 'favorite': False, + 'is_playable': True, + 'item_id': '67890', + 'media_type': 'album', + 'metadata': dict({ + 'chapters': None, + 'copyright': '℗ 2023 Test Label', + 'description': None, + 'explicit': False, + 'genres': None, + 'grouping': None, + 'images': list([ + dict({ + 'path': 'https://resources.tidal.com/images/abcd/ef01/2345/6789/750x750.jpg', + 'provider': 'tidal', + 'remotely_accessible': True, + 'type': 'thumb', + }), + ]), + 'label': None, + 'languages': None, + 'last_refresh': None, + 'links': None, + 'lrc_lyrics': None, + 'lyrics': None, + 'mood': None, + 'performers': None, + 'popularity': 50, + 'preview': None, + 'release_date': '2023-01-01T00:00:00', + 'review': None, + 'style': None, + }), + 'name': 'Test Album', + 'position': None, + 'provider': 'tidal', + 'provider_mappings': list([ + dict({ + 'audio_format': dict({ + 'bit_depth': 16, + 'bit_rate': 0, + 'channels': 2, + 'codec_type': '?', + 'content_type': 'flac', + 'output_format_str': 'flac', + 'sample_rate': 44100, + }), + 'available': True, + 'details': None, + 'in_library': None, + 'item_id': '67890', + 'provider_domain': 'tidal', + 'provider_instance': 'tidal_instance', + 'url': 'https://tidal.com/album/67890', + }), + ]), + 'sort_name': 'test album', + 'translation_key': None, + 'uri': 'tidal://album/67890', + 'version': 'Deluxe Edition', + 'year': 2023, + }) +# --- +# name: test_parse_artist[artist] + dict({ + 'external_ids': list([ + ]), + 'favorite': False, + 'is_playable': True, + 'item_id': '12345', + 'media_type': 'artist', + 'metadata': dict({ + 'chapters': None, + 'copyright': None, + 'description': None, + 'explicit': None, + 'genres': None, + 'grouping': None, + 'images': list([ + dict({ + 'path': 'https://resources.tidal.com/images/1234/5678/90ab/cdef/750x750.jpg', + 'provider': 'tidal', + 'remotely_accessible': True, + 'type': 'thumb', + }), + ]), + 'label': None, + 'languages': None, + 'last_refresh': None, + 'links': None, + 'lrc_lyrics': None, + 'lyrics': None, + 'mood': None, + 'performers': None, + 'popularity': None, + 'preview': None, + 'release_date': None, + 'review': None, + 'style': None, + }), + 'name': 'Test Artist', + 'position': None, + 'provider': 'tidal', + 'provider_mappings': list([ + dict({ + 'audio_format': dict({ + 'bit_depth': 16, + 'bit_rate': 0, + 'channels': 2, + 'codec_type': '?', + 'content_type': '?', + 'output_format_str': '?', + 'sample_rate': 44100, + }), + 'available': True, + 'details': None, + 'in_library': None, + 'item_id': '12345', + 'provider_domain': 'tidal', + 'provider_instance': 'tidal_instance', + 'url': 'https://tidal.com/artist/12345', + }), + ]), + 'sort_name': 'test artist', + 'translation_key': None, + 'uri': 'tidal://artist/12345', + 'version': '', + }) +# --- +# name: test_parse_playlist[mix] + dict({ + 'external_ids': list([ + ]), + 'favorite': False, + 'is_editable': False, + 'is_playable': True, + 'item_id': 'mix_mix_123', + 'media_type': 'playlist', + 'metadata': dict({ + 'chapters': None, + 'copyright': None, + 'description': 'A personalized mix just for you', + 'explicit': None, + 'genres': None, + 'grouping': None, + 'images': list([ + dict({ + 'path': 'http://example.com/mix-medium.jpg', + 'provider': 'tidal', + 'remotely_accessible': True, + 'type': 'thumb', + }), + ]), + 'label': None, + 'languages': None, + 'last_refresh': None, + 'links': None, + 'lrc_lyrics': None, + 'lyrics': None, + 'mood': None, + 'performers': None, + 'popularity': None, + 'preview': None, + 'release_date': None, + 'review': None, + 'style': None, + }), + 'name': 'My Daily Discovery', + 'owner': 'Created by Tidal', + 'position': None, + 'provider': 'tidal', + 'provider_mappings': list([ + dict({ + 'audio_format': dict({ + 'bit_depth': 16, + 'bit_rate': 0, + 'channels': 2, + 'codec_type': '?', + 'content_type': '?', + 'output_format_str': '?', + 'sample_rate': 44100, + }), + 'available': True, + 'details': None, + 'in_library': None, + 'item_id': 'mix_mix_123', + 'provider_domain': 'tidal', + 'provider_instance': 'tidal_instance', + 'url': 'https://tidal.com/browse/mix/mix_123', + }), + ]), + 'sort_name': 'my daily discovery', + 'translation_key': None, + 'uri': 'tidal://playlist/mix_mix_123', + 'version': '', + }) +# --- +# name: test_parse_playlist[playlist] + dict({ + 'external_ids': list([ + ]), + 'favorite': False, + 'is_editable': False, + 'is_playable': True, + 'item_id': 'aabbcc-1122-3344-5566', + 'media_type': 'playlist', + 'metadata': dict({ + 'chapters': None, + 'copyright': None, + 'description': None, + 'explicit': None, + 'genres': None, + 'grouping': None, + 'images': list([ + dict({ + 'path': 'https://resources.tidal.com/images/playlist/square/image/id/750x750.jpg', + 'provider': 'tidal', + 'remotely_accessible': True, + 'type': 'thumb', + }), + ]), + 'label': None, + 'languages': None, + 'last_refresh': None, + 'links': None, + 'lrc_lyrics': None, + 'lyrics': None, + 'mood': None, + 'performers': None, + 'popularity': None, + 'preview': None, + 'release_date': None, + 'review': None, + 'style': None, + }), + 'name': 'Test Playlist', + 'owner': 'Tidal', + 'position': None, + 'provider': 'tidal', + 'provider_mappings': list([ + dict({ + 'audio_format': dict({ + 'bit_depth': 16, + 'bit_rate': 0, + 'channels': 2, + 'codec_type': '?', + 'content_type': '?', + 'output_format_str': '?', + 'sample_rate': 44100, + }), + 'available': True, + 'details': None, + 'in_library': None, + 'item_id': 'aabbcc-1122-3344-5566', + 'provider_domain': 'tidal', + 'provider_instance': 'tidal_instance', + 'url': 'https://tidal.com/browse/playlist/aabbcc-1122-3344-5566', + }), + ]), + 'sort_name': 'test playlist', + 'translation_key': None, + 'uri': 'tidal://playlist/aabbcc-1122-3344-5566', + 'version': '', + }) +# --- +# name: test_parse_track[track] + dict({ + 'album': dict({ + 'available': True, + 'external_ids': list([ + ]), + 'image': None, + 'is_playable': True, + 'item_id': '67890', + 'media_type': 'album', + 'name': 'Test Album', + 'provider': 'tidal', + 'sort_name': 'test album', + 'translation_key': None, + 'uri': 'tidal://album/67890', + 'version': '', + }), + 'artists': list([ + dict({ + 'external_ids': list([ + ]), + 'favorite': False, + 'is_playable': True, + 'item_id': '12345', + 'media_type': 'artist', + 'metadata': dict({ + 'chapters': None, + 'copyright': None, + 'description': None, + 'explicit': None, + 'genres': None, + 'grouping': None, + 'images': list([ + dict({ + 'path': 'https://resources.tidal.com/images/1234/5678/90ab/cdef/750x750.jpg', + 'provider': 'tidal', + 'remotely_accessible': True, + 'type': 'thumb', + }), + ]), + 'label': None, + 'languages': None, + 'last_refresh': None, + 'links': None, + 'lrc_lyrics': None, + 'lyrics': None, + 'mood': None, + 'performers': None, + 'popularity': None, + 'preview': None, + 'release_date': None, + 'review': None, + 'style': None, + }), + 'name': 'Test Artist', + 'position': None, + 'provider': 'tidal', + 'provider_mappings': list([ + dict({ + 'audio_format': dict({ + 'bit_depth': 16, + 'bit_rate': 0, + 'channels': 2, + 'codec_type': '?', + 'content_type': '?', + 'output_format_str': '?', + 'sample_rate': 44100, + }), + 'available': True, + 'details': None, + 'in_library': None, + 'item_id': '12345', + 'provider_domain': 'tidal', + 'provider_instance': 'tidal_instance', + 'url': 'https://tidal.com/artist/12345', + }), + ]), + 'sort_name': 'test artist', + 'translation_key': None, + 'uri': 'tidal://artist/12345', + 'version': '', + }), + ]), + 'disc_number': 1, + 'duration': 180, + 'external_ids': list([ + list([ + 'isrc', + 'US1234567890', + ]), + ]), + 'favorite': False, + 'is_playable': True, + 'item_id': '112233', + 'last_played': 0, + 'media_type': 'track', + 'metadata': dict({ + 'chapters': None, + 'copyright': '℗ 2023 Test Label', + 'description': None, + 'explicit': False, + 'genres': None, + 'grouping': None, + 'images': list([ + dict({ + 'path': 'https://resources.tidal.com/images/abcd/ef01/2345/6789/750x750.jpg', + 'provider': 'tidal', + 'remotely_accessible': True, + 'type': 'thumb', + }), + ]), + 'label': None, + 'languages': None, + 'last_refresh': None, + 'links': None, + 'lrc_lyrics': None, + 'lyrics': None, + 'mood': None, + 'performers': None, + 'popularity': 60, + 'preview': None, + 'release_date': None, + 'review': None, + 'style': None, + }), + 'name': 'Test Track', + 'position': None, + 'provider': 'tidal', + 'provider_mappings': list([ + dict({ + 'audio_format': dict({ + 'bit_depth': 24, + 'bit_rate': 0, + 'channels': 2, + 'codec_type': '?', + 'content_type': 'flac', + 'output_format_str': 'flac', + 'sample_rate': 44100, + }), + 'available': True, + 'details': None, + 'in_library': None, + 'item_id': '112233', + 'provider_domain': 'tidal', + 'provider_instance': 'tidal_instance', + 'url': 'https://tidal.com/track/112233', + }), + ]), + 'sort_name': 'test track', + 'track_number': 1, + 'translation_key': None, + 'uri': 'tidal://track/112233', + 'version': 'Remastered', + }) +# --- diff --git a/tests/providers/tidal/fixtures/albums/album.json b/tests/providers/tidal/fixtures/albums/album.json new file mode 100644 index 00000000..7e27d683 --- /dev/null +++ b/tests/providers/tidal/fixtures/albums/album.json @@ -0,0 +1,41 @@ +{ + "id": 67890, + "title": "Test Album", + "version": "Deluxe Edition", + "artists": [ + { + "id": 12345, + "name": "Test Artist", + "picture": "1234-5678-90ab-cdef", + "type": "MAIN" + } + ], + "type": "ALBUM", + "releaseDate": "2023-01-01", + "availabilityDate": "2023-01-01", + "duration": 2400, + "numberOfTracks": 12, + "numberOfVolumes": 1, + "upc": "123456789012", + "copyright": "℗ 2023 Test Label", + "explicit": false, + "popularity": 50, + "cover": "abcd-ef01-2345-6789", + "videoCover": null, + "streamReady": true, + "streamStartDate": "2023-01-01T00:00:00.000+0000", + "allowStreaming": true, + "premiumStreamingOnly": false, + "numberOfVideos": 0, + "audioQuality": "LOSSLESS", + "audioModes": [ + "STEREO" + ], + "mediaMetadata": { + "tags": [ + "LOSSLESS", + "HIRES_LOSSLESS" + ] + }, + "url": "http://www.tidal.com/album/67890" +} diff --git a/tests/providers/tidal/fixtures/artists/artist.json b/tests/providers/tidal/fixtures/artists/artist.json new file mode 100644 index 00000000..a6d286f6 --- /dev/null +++ b/tests/providers/tidal/fixtures/artists/artist.json @@ -0,0 +1,15 @@ +{ + "id": 12345, + "name": "Test Artist", + "picture": "1234-5678-90ab-cdef", + "url": "http://www.tidal.com/artist/12345", + "artistTypes": [ + "ARTIST" + ], + "popularity": 75, + "banner": "banner-1234-5678-90ab", + "releaseDateOriginal": "2010-01-01", + "mixes": { + "ARTIST_MIX": "artist_mix_id_123" + } +} diff --git a/tests/providers/tidal/fixtures/pages/home.json b/tests/providers/tidal/fixtures/pages/home.json new file mode 100644 index 00000000..6426987f --- /dev/null +++ b/tests/providers/tidal/fixtures/pages/home.json @@ -0,0 +1,70 @@ +{ + "rows": [ + { + "modules": [ + { + "title": "My Playlists", + "type": "PLAYLIST_LIST", + "pagedList": { + "items": [ + { + "uuid": "aabbcc-1122-3344-5566", + "title": "Test Playlist", + "creator": { + "id": 99999 + }, + "image": "playlist-image-id" + } + ] + } + } + ] + }, + { + "modules": [ + { + "title": "New Albums", + "type": "ALBUM_LIST", + "pagedList": { + "items": [ + { + "id": 67890, + "title": "Test Album", + "artists": [ + { + "id": 12345, + "name": "Test Artist" + } + ], + "type": "ALBUM" + } + ] + } + } + ] + }, + { + "modules": [ + { + "title": "My Mixes", + "type": "MIX_LIST", + "pagedList": { + "items": [ + { + "id": "mix_123", + "title": "My Mix", + "subTitle": "A mix for you", + "images": { + "MEDIUM": { + "url": "http://example.com/mix.jpg" + } + }, + "mixId": "mix_123" + } + ] + } + } + ] + } + ] +} diff --git a/tests/providers/tidal/fixtures/playlists/mix.json b/tests/providers/tidal/fixtures/playlists/mix.json new file mode 100644 index 00000000..01c4c7c9 --- /dev/null +++ b/tests/providers/tidal/fixtures/playlists/mix.json @@ -0,0 +1,43 @@ +{ + "id": "mix_123", + "title": "My Daily Discovery", + "subTitle": "A personalized mix just for you", + "mixType": "DISCOVERY_MIX", + "contentBehavior": "UNREPEATABLE", + "sharingImages": { + "SMALL": { + "width": 320, + "height": 320, + "url": "http://example.com/mix-small.jpg" + }, + "MEDIUM": { + "width": 640, + "height": 640, + "url": "http://example.com/mix-medium.jpg" + }, + "LARGE": { + "width": 1280, + "height": 1280, + "url": "http://example.com/mix-large.jpg" + } + }, + "images": { + "SMALL": { + "width": 320, + "height": 320, + "url": "http://example.com/mix-small.jpg" + }, + "MEDIUM": { + "width": 640, + "height": 640, + "url": "http://example.com/mix-medium.jpg" + }, + "LARGE": { + "width": 1280, + "height": 1280, + "url": "http://example.com/mix-large.jpg" + } + }, + "updated": "2023-06-15T00:00:00.000+0000", + "dateAdded": "2023-01-01T00:00:00.000+0000" +} diff --git a/tests/providers/tidal/fixtures/playlists/playlist.json b/tests/providers/tidal/fixtures/playlists/playlist.json new file mode 100644 index 00000000..2d2e6fd1 --- /dev/null +++ b/tests/providers/tidal/fixtures/playlists/playlist.json @@ -0,0 +1,23 @@ +{ + "uuid": "aabbcc-1122-3344-5566", + "title": "Test Playlist", + "description": "A test playlist for testing", + "creator": { + "id": 99999, + "name": "Test User", + "picture": null + }, + "type": "USER", + "publicPlaylist": true, + "created": "2023-01-01T00:00:00.000+0000", + "lastUpdated": "2023-06-15T12:00:00.000+0000", + "numberOfTracks": 25, + "numberOfVideos": 0, + "duration": 5400, + "popularity": 45, + "image": "playlist-image-id", + "squareImage": "playlist-square-image-id", + "url": "http://www.tidal.com/playlist/aabbcc-1122-3344-5566", + "promotedArtists": [], + "lastItemAddedAt": "2023-06-15T12:00:00.000+0000" +} diff --git a/tests/providers/tidal/fixtures/tracks/track.json b/tests/providers/tidal/fixtures/tracks/track.json new file mode 100644 index 00000000..04917ae4 --- /dev/null +++ b/tests/providers/tidal/fixtures/tracks/track.json @@ -0,0 +1,49 @@ +{ + "id": 112233, + "title": "Test Track", + "version": "Remastered", + "duration": 180, + "replayGain": -8.5, + "peak": 0.95, + "allowStreaming": true, + "streamReady": true, + "streamStartDate": "2023-01-01T00:00:00.000+0000", + "premiumStreamingOnly": false, + "trackNumber": 1, + "volumeNumber": 1, + "isrc": "US1234567890", + "copyright": "℗ 2023 Test Label", + "artists": [ + { + "id": 12345, + "name": "Test Artist", + "picture": "1234-5678-90ab-cdef", + "type": "MAIN" + } + ], + "album": { + "id": 67890, + "title": "Test Album", + "cover": "abcd-ef01-2345-6789", + "videoCover": null, + "releaseDate": "2023-01-01" + }, + "explicit": false, + "audioQuality": "LOSSLESS", + "audioModes": [ + "STEREO" + ], + "mediaMetadata": { + "tags": [ + "LOSSLESS", + "HIRES_LOSSLESS" + ] + }, + "popularity": 60, + "mixes": { + "TRACK_MIX": "track_mix_id_456" + }, + "url": "http://www.tidal.com/track/112233", + "djReady": true, + "stemReady": false +} diff --git a/tests/providers/tidal/test_api_client.py b/tests/providers/tidal/test_api_client.py new file mode 100644 index 00000000..67f399b0 --- /dev/null +++ b/tests/providers/tidal/test_api_client.py @@ -0,0 +1,217 @@ +"""Test Tidal API Client.""" + +from typing import Any +from unittest.mock import AsyncMock, MagicMock, Mock, patch + +import pytest +from aiohttp import ClientResponse +from music_assistant_models.errors import ( + LoginFailed, + MediaNotFoundError, + RetriesExhausted, +) + +from music_assistant.providers.tidal.api_client import TidalAPIClient + + +@pytest.fixture +def provider_mock() -> Mock: + """Return a mock provider.""" + provider = Mock() + provider.auth = AsyncMock() + provider.auth.ensure_valid_token.return_value = True + provider.auth.access_token = "token" + provider.auth.session_id = "session" + provider.auth.country_code = "US" + provider.mass = Mock() + provider.mass.http_session = AsyncMock() + provider.mass.metadata.locale = "en_US" + provider.logger = Mock() + return provider + + +@pytest.fixture +def api_client(provider_mock: Mock) -> TidalAPIClient: + """Return a TidalAPIClient instance.""" + return TidalAPIClient(provider_mock) + + +async def test_get_success(api_client: TidalAPIClient, provider_mock: Mock) -> None: + """Test successful GET request.""" + response = AsyncMock(spec=ClientResponse) + response.status = 200 + response.json.return_value = {"data": "test"} + + # Create a mock that acts as an async context manager + request_ctx = AsyncMock() + request_ctx.__aenter__.return_value = response + + # The request method itself should be a MagicMock (not AsyncMock) + # that returns the context manager + provider_mock.mass.http_session.request = MagicMock(return_value=request_ctx) + + result = await api_client.get("test/endpoint") + assert result == {"data": "test"} + + +async def test_get_401_error(api_client: TidalAPIClient, provider_mock: Mock) -> None: + """Test GET request with 401 error.""" + response = AsyncMock(spec=ClientResponse) + response.status = 401 + + request_ctx = AsyncMock() + request_ctx.__aenter__.return_value = response + provider_mock.mass.http_session.request = MagicMock(return_value=request_ctx) + + with pytest.raises(LoginFailed): + await api_client.get("test/endpoint") + + +async def test_get_404_error(api_client: TidalAPIClient, provider_mock: Mock) -> None: + """Test GET request with 404 error.""" + response = AsyncMock(spec=ClientResponse) + response.status = 404 + response.url = "http://test/endpoint" + + request_ctx = AsyncMock() + request_ctx.__aenter__.return_value = response + provider_mock.mass.http_session.request = MagicMock(return_value=request_ctx) + + with pytest.raises(MediaNotFoundError): + await api_client.get("test/endpoint") + + +async def test_get_429_error(api_client: TidalAPIClient, provider_mock: Mock) -> None: + """Test GET request with 429 error.""" + with patch("asyncio.sleep"): + response = AsyncMock(spec=ClientResponse) + response.status = 429 + response.headers = {"Retry-After": "10"} + + request_ctx = AsyncMock() + request_ctx.__aenter__.return_value = response + provider_mock.mass.http_session.request = MagicMock(return_value=request_ctx) + + with pytest.raises(RetriesExhausted): + await api_client.get("test/endpoint") + + +async def test_post_success(api_client: TidalAPIClient, provider_mock: Mock) -> None: + """Test successful POST request.""" + response = AsyncMock(spec=ClientResponse) + response.status = 200 + response.json.return_value = {"success": True} + + request_ctx = AsyncMock() + request_ctx.__aenter__.return_value = response + provider_mock.mass.http_session.request = MagicMock(return_value=request_ctx) + + result = await api_client.post("test/endpoint", data={"key": "value"}) + assert result == {"success": True} + + +async def test_paginate(api_client: TidalAPIClient, provider_mock: Mock) -> None: + """Test pagination.""" + # Mock first page response + response1 = AsyncMock(spec=ClientResponse) + response1.status = 200 + response1.json.return_value = {"items": [{"id": 1}, {"id": 2}], "totalNumberOfItems": 4} + + # Mock second page response + response2 = AsyncMock(spec=ClientResponse) + response2.status = 200 + response2.json.return_value = {"items": [{"id": 3}, {"id": 4}], "totalNumberOfItems": 4} + + # Mock empty response to stop iteration + response3 = AsyncMock(spec=ClientResponse) + response3.status = 200 + response3.json.return_value = {"items": []} + + ctx1 = AsyncMock() + ctx1.__aenter__.return_value = response1 + + ctx2 = AsyncMock() + ctx2.__aenter__.return_value = response2 + + ctx3 = AsyncMock() + ctx3.__aenter__.return_value = response3 + + provider_mock.mass.http_session.request = MagicMock(side_effect=[ctx1, ctx2, ctx3]) + + items: list[dict[str, Any]] = [] + async for item in api_client.paginate("test/endpoint", limit=2): + items.append(item) + + assert len(items) == 4 + assert items[0]["id"] == 1 + assert items[3]["id"] == 4 + + +async def test_delete_success(api_client: TidalAPIClient, provider_mock: Mock) -> None: + """Test successful DELETE request.""" + response = AsyncMock(spec=ClientResponse) + response.status = 204 + + request_ctx = AsyncMock() + request_ctx.__aenter__.return_value = response + provider_mock.mass.http_session.request = MagicMock(return_value=request_ctx) + + await api_client.delete("test/endpoint/123") + + # Verify DELETE was called + provider_mock.mass.http_session.request.assert_called_once() + call_args = provider_mock.mass.http_session.request.call_args + assert call_args[0][0] == "DELETE" + + +async def test_delete_with_headers(api_client: TidalAPIClient, provider_mock: Mock) -> None: + """Test DELETE request with custom headers.""" + response = AsyncMock(spec=ClientResponse) + response.status = 204 + + request_ctx = AsyncMock() + request_ctx.__aenter__.return_value = response + provider_mock.mass.http_session.request = MagicMock(return_value=request_ctx) + + await api_client.delete("test/endpoint/123", headers={"If-Match": "etag123"}) + + # Verify headers were passed + call_args = provider_mock.mass.http_session.request.call_args + assert "If-Match" in call_args[1]["headers"] + assert call_args[1]["headers"]["If-Match"] == "etag123" + + +async def test_put_success(api_client: TidalAPIClient, provider_mock: Mock) -> None: + """Test successful PUT request.""" + response = AsyncMock(spec=ClientResponse) + response.status = 200 + response.json.return_value = {"updated": True} + + request_ctx = AsyncMock() + request_ctx.__aenter__.return_value = response + provider_mock.mass.http_session.request = MagicMock(return_value=request_ctx) + + result = await api_client.put("test/endpoint", data={"key": "value"}) + assert result == {"updated": True} + + # Verify PUT was called + call_args = provider_mock.mass.http_session.request.call_args + assert call_args[0][0] == "PUT" + + +async def test_put_with_form_data(api_client: TidalAPIClient, provider_mock: Mock) -> None: + """Test PUT request with form data.""" + response = AsyncMock(spec=ClientResponse) + response.status = 200 + response.json.return_value = {"success": True} + + request_ctx = AsyncMock() + request_ctx.__aenter__.return_value = response + provider_mock.mass.http_session.request = MagicMock(return_value=request_ctx) + + result = await api_client.put("test/endpoint", data={"key": "value"}, as_form=True) + assert result == {"success": True} + + # Verify form data was used + call_args = provider_mock.mass.http_session.request.call_args + assert "data" in call_args[1] diff --git a/tests/providers/tidal/test_auth_manager.py b/tests/providers/tidal/test_auth_manager.py new file mode 100644 index 00000000..e2f76c1b --- /dev/null +++ b/tests/providers/tidal/test_auth_manager.py @@ -0,0 +1,173 @@ +"""Test Tidal Auth Manager.""" + +import json +import time +from unittest.mock import AsyncMock, Mock, patch + +import pytest +from aiohttp import ClientSession +from music_assistant_models.errors import LoginFailed + +from music_assistant.providers.tidal.auth_manager import ( + ManualAuthenticationHelper, + TidalAuthManager, +) + + +@pytest.fixture +def http_session() -> AsyncMock: + """Return a mock http session.""" + return AsyncMock(spec=ClientSession) + + +@pytest.fixture +def config_updater() -> Mock: + """Return a mock config updater.""" + return Mock() + + +@pytest.fixture +def auth_manager(http_session: AsyncMock, config_updater: Mock) -> TidalAuthManager: + """Return a TidalAuthManager instance.""" + logger = Mock() + return TidalAuthManager(http_session, config_updater, logger) + + +async def test_initialize_success(auth_manager: TidalAuthManager) -> None: + """Test successful initialization.""" + auth_data = json.dumps( + { + "access_token": "token", + "refresh_token": "refresh", + "expires_at": time.time() + 3600, + "client_id": "client_id", + } + ) + assert await auth_manager.initialize(auth_data) is True + assert auth_manager.access_token == "token" + + +async def test_initialize_invalid_json(auth_manager: TidalAuthManager) -> None: + """Test initialization with invalid JSON.""" + assert await auth_manager.initialize("invalid") is False + + +async def test_ensure_valid_token_valid(auth_manager: TidalAuthManager) -> None: + """Test ensure_valid_token with valid token.""" + auth_manager._auth_info = {"expires_at": time.time() + 3600} + assert await auth_manager.ensure_valid_token() is True + + +async def test_ensure_valid_token_expired( + auth_manager: TidalAuthManager, http_session: AsyncMock, config_updater: Mock +) -> None: + """Test ensure_valid_token with expired token.""" + auth_manager._auth_info = { + "expires_at": time.time() - 3600, + "refresh_token": "refresh", + "client_id": "client_id", + } + + # Mock refresh response + response = AsyncMock() + response.status = 200 + response.json.return_value = { + "access_token": "new_token", + "expires_in": 3600, + "refresh_token": "new_refresh", + } + http_session.post.return_value.__aenter__.return_value = response + + assert await auth_manager.ensure_valid_token() is True + assert auth_manager.access_token == "new_token" + config_updater.assert_called_once() + + +async def test_refresh_token_failure( + auth_manager: TidalAuthManager, http_session: AsyncMock +) -> None: + """Test refresh_token failure.""" + auth_manager._auth_info = { + "refresh_token": "refresh", + "client_id": "client_id", + } + + # Mock refresh response failure + response = AsyncMock() + response.status = 400 + response.text.return_value = "Bad Request" + http_session.post.return_value.__aenter__.return_value = response + + assert await auth_manager.refresh_token() is False + + +@patch("music_assistant.providers.tidal.auth_manager.pkce") +@patch("music_assistant.providers.tidal.auth_manager.app_var") +@pytest.mark.usefixtures("auth_manager") +async def test_generate_auth_url(mock_app_var: Mock, mock_pkce: Mock) -> None: + """Test generate_auth_url.""" + mock_pkce.generate_pkce_pair.return_value = ("verifier", "challenge") + mock_app_var.side_effect = ["client_id", "client_secret"] + + mass = Mock() + mass.loop.call_soon_threadsafe = Mock() + auth_helper = ManualAuthenticationHelper(mass, "session_id") + + result = await TidalAuthManager.generate_auth_url(auth_helper, "HIGH") + + assert "code_verifier" in result + assert "client_unique_key" in result + mass.loop.call_soon_threadsafe.assert_called_once() + + +async def test_process_pkce_login_success(http_session: AsyncMock) -> None: + """Test process_pkce_login success.""" + auth_params = json.dumps( + { + "code_verifier": "verifier", + "client_unique_key": "key", + "client_id": "id", + "client_secret": "secret", + "quality": "HIGH", + } + ) + redirect_url = "https://tidal.com/android/login/auth?code=auth_code" + + # Mock token response + token_response = AsyncMock() + token_response.status = 200 + token_response.json.return_value = { + "access_token": "access", + "refresh_token": "refresh", + "expires_in": 3600, + } + + # Mock user info response + user_response = AsyncMock() + user_response.status = 200 + user_response.json.return_value = { + "id": "user_id", + "username": "user", + } + + http_session.post.return_value.__aenter__.return_value = token_response + http_session.get.return_value.__aenter__.return_value = user_response + + result = await TidalAuthManager.process_pkce_login(http_session, auth_params, redirect_url) + + assert result["access_token"] == "access" + assert result["id"] == "user_id" + + +async def test_process_pkce_login_missing_code(http_session: AsyncMock) -> None: + """Test process_pkce_login missing code.""" + auth_params = json.dumps( + { + "code_verifier": "verifier", + "client_unique_key": "key", + } + ) + redirect_url = "https://tidal.com/android/login/auth" + + with pytest.raises(LoginFailed, match="No authorization code"): + await TidalAuthManager.process_pkce_login(http_session, auth_params, redirect_url) diff --git a/tests/providers/tidal/test_library.py b/tests/providers/tidal/test_library.py new file mode 100644 index 00000000..0d28dfc1 --- /dev/null +++ b/tests/providers/tidal/test_library.py @@ -0,0 +1,226 @@ +"""Test Tidal Library Manager.""" + +from collections.abc import AsyncGenerator +from typing import Any +from unittest.mock import AsyncMock, MagicMock, Mock, patch + +import pytest +from music_assistant_models.enums import MediaType +from music_assistant_models.media_items import ItemMapping + +from music_assistant.providers.tidal.library import TidalLibraryManager + + +@pytest.fixture +def provider_mock() -> Mock: + """Return a mock provider.""" + provider = Mock() + provider.lookup_key = "tidal" + provider.domain = "tidal" + provider.instance_id = "tidal_instance" + provider.auth.user_id = "12345" + provider.api = AsyncMock() + provider.api.get_data.return_value = {"items": []} + provider.api.paginate = MagicMock() + + # Configure async iterator for paginate + async def async_iter(*_args: Any, **_kwargs: Any) -> AsyncGenerator[Any, None]: + for item in provider.api.paginate.return_value: + yield item + + provider.api.paginate.side_effect = async_iter + provider.api.paginate.return_value = [] + + provider.logger = Mock() + + def get_item_mapping(media_type: MediaType, key: str, name: str) -> ItemMapping: + return ItemMapping( + media_type=media_type, + item_id=key, + provider=provider.lookup_key, + name=name, + ) + + provider.get_item_mapping.side_effect = get_item_mapping + + return provider + + +@pytest.fixture +def library_manager(provider_mock: Mock) -> TidalLibraryManager: + """Return a TidalLibraryManager instance.""" + return TidalLibraryManager(provider_mock) + + +@patch("music_assistant.providers.tidal.library.parse_artist") +async def test_get_artists( + mock_parse_artist: Mock, library_manager: TidalLibraryManager, provider_mock: Mock +) -> None: + """Test get_artists.""" + provider_mock.api.paginate.return_value = [{"id": 1, "name": "Test Artist"}] + mock_parse_artist.return_value = Mock(item_id="1") + + artists = [a async for a in library_manager.get_artists()] + + assert len(artists) == 1 + assert artists[0].item_id == "1" + provider_mock.api.paginate.assert_called_with( + "users/12345/favorites/artists", + nested_key="item", + ) + mock_parse_artist.assert_called_once() + + +@patch("music_assistant.providers.tidal.library.parse_album") +async def test_get_albums( + mock_parse_album: Mock, library_manager: TidalLibraryManager, provider_mock: Mock +) -> None: + """Test get_albums.""" + provider_mock.api.paginate.return_value = [{"id": 1, "title": "Test Album"}] + mock_parse_album.return_value = Mock(item_id="1") + + albums = [a async for a in library_manager.get_albums()] + + assert len(albums) == 1 + assert albums[0].item_id == "1" + provider_mock.api.paginate.assert_called_with( + "users/12345/favorites/albums", + nested_key="item", + ) + mock_parse_album.assert_called_once() + + +@patch("music_assistant.providers.tidal.library.parse_track") +async def test_get_tracks( + mock_parse_track: Mock, library_manager: TidalLibraryManager, provider_mock: Mock +) -> None: + """Test get_tracks.""" + provider_mock.api.paginate.return_value = [{"id": 1, "title": "Test Track"}] + mock_parse_track.return_value = Mock(item_id="1") + + tracks = [t async for t in library_manager.get_tracks()] + + assert len(tracks) == 1 + assert tracks[0].item_id == "1" + provider_mock.api.paginate.assert_called_with( + "users/12345/favorites/tracks", + nested_key="item", + ) + mock_parse_track.assert_called_once() + + +@patch("music_assistant.providers.tidal.library.parse_playlist") +async def test_get_playlists( + mock_parse_playlist: Mock, library_manager: TidalLibraryManager, provider_mock: Mock +) -> None: + """Test get_playlists.""" + # Mock mixes response + mixes_response = [{"id": "mix_1", "title": "Mix 1"}] + # Mock playlists response + playlists_response = [{"uuid": "pl_1", "title": "Playlist 1"}] + + # Configure paginate side effect + async def paginate_side_effect( + endpoint: str, **_kwargs: Any + ) -> AsyncGenerator[dict[str, Any], None]: + if "mixes" in endpoint: + for item in mixes_response: + yield item + else: + for item in playlists_response: + yield item + + provider_mock.api.paginate.side_effect = paginate_side_effect + + # Setup mock return values + mock_parse_playlist.side_effect = [ + Mock(item_id="mix_1"), + Mock(item_id="pl_1"), + ] + + playlists = [p async for p in library_manager.get_playlists()] + + assert len(playlists) == 2 + assert playlists[0].item_id == "mix_1" + assert playlists[1].item_id == "pl_1" + assert mock_parse_playlist.call_count == 2 + + +async def test_add_item_artist(library_manager: TidalLibraryManager, provider_mock: Mock) -> None: + """Test add_item for artist.""" + item = Mock(item_id="123", media_type=MediaType.ARTIST) + await library_manager.add_item(item) + + provider_mock.api.post.assert_called_with( + "users/12345/favorites/artists", + data={"artistId": "123"}, + as_form=True, + ) + + +async def test_add_item_album(library_manager: TidalLibraryManager, provider_mock: Mock) -> None: + """Test add_item for album.""" + item = Mock(item_id="123", media_type=MediaType.ALBUM) + await library_manager.add_item(item) + + provider_mock.api.post.assert_called_with( + "users/12345/favorites/albums", + data={"albumId": "123"}, + as_form=True, + ) + + +async def test_add_item_track(library_manager: TidalLibraryManager, provider_mock: Mock) -> None: + """Test add_item for track.""" + item = Mock(item_id="123", media_type=MediaType.TRACK) + await library_manager.add_item(item) + + provider_mock.api.post.assert_called_with( + "users/12345/favorites/tracks", + data={"trackId": "123"}, + as_form=True, + ) + + +async def test_add_item_playlist(library_manager: TidalLibraryManager, provider_mock: Mock) -> None: + """Test add_item for playlist.""" + item = Mock(item_id="123", media_type=MediaType.PLAYLIST) + await library_manager.add_item(item) + + provider_mock.api.post.assert_called_with( + "users/12345/favorites/playlists", + data={"uuids": "123"}, + as_form=True, + ) + + +async def test_remove_item_artist( + library_manager: TidalLibraryManager, provider_mock: Mock +) -> None: + """Test remove_item for artist.""" + await library_manager.remove_item("123", MediaType.ARTIST) + + provider_mock.api.delete.assert_called_with("users/12345/favorites/artists/123") + + +async def test_remove_item_album(library_manager: TidalLibraryManager, provider_mock: Mock) -> None: + """Test remove_item for album.""" + await library_manager.remove_item("123", MediaType.ALBUM) + + provider_mock.api.delete.assert_called_with("users/12345/favorites/albums/123") + + +async def test_remove_item_track(library_manager: TidalLibraryManager, provider_mock: Mock) -> None: + """Test remove_item for track.""" + await library_manager.remove_item("123", MediaType.TRACK) + + provider_mock.api.delete.assert_called_with("users/12345/favorites/tracks/123") + + +async def test_remove_item_playlist( + library_manager: TidalLibraryManager, provider_mock: Mock +) -> None: + """Test remove_item for playlist.""" + await library_manager.remove_item("123", MediaType.PLAYLIST) + + provider_mock.api.delete.assert_called_with("users/12345/favorites/playlists/123") diff --git a/tests/providers/tidal/test_media.py b/tests/providers/tidal/test_media.py new file mode 100644 index 00000000..05549c31 --- /dev/null +++ b/tests/providers/tidal/test_media.py @@ -0,0 +1,237 @@ +"""Test Tidal Media Manager.""" + +from typing import Any +from unittest.mock import AsyncMock, MagicMock, Mock, patch + +import pytest +from music_assistant_models.enums import MediaType +from music_assistant_models.media_items import ItemMapping + +from music_assistant.providers.tidal.media import TidalMediaManager + + +@pytest.fixture +def provider_mock() -> Mock: + """Return a mock provider.""" + provider = Mock() + provider.lookup_key = "tidal" + provider.domain = "tidal" + provider.instance_id = "tidal_instance" + provider.auth.user_id = "12345" + provider.auth.country_code = "US" + provider.api = AsyncMock() + provider.api.get_data.return_value = {} + provider.api.paginate = MagicMock() + + async def async_iter(*_args: Any, **_kwargs: Any) -> Any: + for item in provider.api.paginate.return_value: + yield item + + provider.api.paginate.side_effect = async_iter + provider.api.paginate.return_value = [] + + provider.logger = Mock() + + def get_item_mapping(media_type: MediaType, key: str, name: str) -> ItemMapping: + return ItemMapping( + media_type=media_type, + item_id=key, + provider=provider.lookup_key, + name=name, + ) + + provider.get_item_mapping.side_effect = get_item_mapping + + return provider + + +@pytest.fixture +def media_manager(provider_mock: Mock) -> TidalMediaManager: + """Return a TidalMediaManager instance.""" + return TidalMediaManager(provider_mock) + + +@patch("music_assistant.providers.tidal.media.parse_artist") +@patch("music_assistant.providers.tidal.media.parse_album") +@patch("music_assistant.providers.tidal.media.parse_track") +@patch("music_assistant.providers.tidal.media.parse_playlist") +async def test_search( + mock_parse_playlist: Mock, + mock_parse_track: Mock, + mock_parse_album: Mock, + mock_parse_artist: Mock, + media_manager: TidalMediaManager, + provider_mock: Mock, +) -> None: + """Test search.""" + provider_mock.api.get_data.return_value = { + "artists": {"items": [{"id": 1}]}, + "albums": {"items": [{"id": 1}]}, + "tracks": {"items": [{"id": 1}]}, + "playlists": {"items": [{"uuid": "1"}]}, + } + + mock_parse_artist.return_value = Mock(item_id="1", media_type=MediaType.ARTIST) + mock_parse_album.return_value = Mock(item_id="1", media_type=MediaType.ALBUM) + mock_parse_track.return_value = Mock(item_id="1", media_type=MediaType.TRACK) + mock_parse_playlist.return_value = Mock(item_id="1", media_type=MediaType.PLAYLIST) + + results = await media_manager.search( + "query", [MediaType.ARTIST, MediaType.ALBUM, MediaType.TRACK, MediaType.PLAYLIST] + ) + + assert len(results.artists) == 1 + assert len(results.albums) == 1 + assert len(results.tracks) == 1 + assert len(results.playlists) == 1 + + mock_parse_artist.assert_called() + mock_parse_album.assert_called() + mock_parse_track.assert_called() + mock_parse_playlist.assert_called() + + provider_mock.api.get_data.assert_called_with( + "search", + params={ + "query": "query", + "types": "artists,albums,tracks,playlists", + "limit": 5, + }, + ) + + +@patch("music_assistant.providers.tidal.media.parse_artist") +async def test_get_artist( + mock_parse_artist: Mock, media_manager: TidalMediaManager, provider_mock: Mock +) -> None: + """Test get_artist.""" + provider_mock.api.get_data.return_value = {"id": 1, "name": "Test Artist"} + mock_parse_artist.return_value = Mock(item_id="1") + + artist = await media_manager.get_artist("1") + + assert artist.item_id == "1" + provider_mock.api.get_data.assert_called_with("artists/1") + mock_parse_artist.assert_called_once() + + +@patch("music_assistant.providers.tidal.media.parse_album") +async def test_get_album( + mock_parse_album: Mock, media_manager: TidalMediaManager, provider_mock: Mock +) -> None: + """Test get_album.""" + provider_mock.api.get_data.return_value = {"id": 1, "title": "Test Album"} + mock_parse_album.return_value = Mock(item_id="1") + + album = await media_manager.get_album("1") + + assert album.item_id == "1" + provider_mock.api.get_data.assert_called_with("albums/1") + mock_parse_album.assert_called_once() + + +@patch("music_assistant.providers.tidal.media.parse_track") +async def test_get_track( + mock_parse_track: Mock, media_manager: TidalMediaManager, provider_mock: Mock +) -> None: + """Test get_track.""" + provider_mock.api.get_data.side_effect = [ + {"id": 1, "title": "Test Track"}, # Track data + {"lyrics": "Test Lyrics"}, # Lyrics data + ] + mock_parse_track.return_value = Mock(item_id="1") + + track = await media_manager.get_track("1") + + assert track.item_id == "1" + assert provider_mock.api.get_data.call_count == 2 + provider_mock.api.get_data.assert_any_call("tracks/1") + provider_mock.api.get_data.assert_any_call("tracks/1/lyrics") + mock_parse_track.assert_called_once() + + +@patch("music_assistant.providers.tidal.media.parse_playlist") +async def test_get_playlist( + mock_parse_playlist: Mock, media_manager: TidalMediaManager, provider_mock: Mock +) -> None: + """Test get_playlist.""" + provider_mock.api.get_data.return_value = {"uuid": "1", "title": "Test Playlist"} + mock_parse_playlist.return_value = Mock(item_id="1") + + playlist = await media_manager.get_playlist("1") + + assert playlist.item_id == "1" + provider_mock.api.get_data.assert_called_with("playlists/1") + mock_parse_playlist.assert_called_once() + + +@patch("music_assistant.providers.tidal.media.parse_track") +async def test_get_album_tracks( + mock_parse_track: Mock, media_manager: TidalMediaManager, provider_mock: Mock +) -> None: + """Test get_album_tracks.""" + provider_mock.api.get_data.return_value = {"items": [{"id": 1}]} + mock_parse_track.return_value = Mock(item_id="1") + + tracks = await media_manager.get_album_tracks("1") + + assert len(tracks) == 1 + assert tracks[0].item_id == "1" + provider_mock.api.get_data.assert_called_with( + "albums/1/tracks", + params={"limit": 250}, + ) + + +@patch("music_assistant.providers.tidal.media.parse_album") +async def test_get_artist_albums( + mock_parse_album: Mock, media_manager: TidalMediaManager, provider_mock: Mock +) -> None: + """Test get_artist_albums.""" + provider_mock.api.get_data.return_value = {"items": [{"id": 1}]} + mock_parse_album.return_value = Mock(item_id="1") + + albums = await media_manager.get_artist_albums("1") + + assert len(albums) == 1 + assert albums[0].item_id == "1" + provider_mock.api.get_data.assert_called_with( + "artists/1/albums", + params={"limit": 250}, + ) + + +@patch("music_assistant.providers.tidal.media.parse_track") +async def test_get_artist_toptracks( + mock_parse_track: Mock, media_manager: TidalMediaManager, provider_mock: Mock +) -> None: + """Test get_artist_toptracks.""" + provider_mock.api.get_data.return_value = {"items": [{"id": 1}]} + mock_parse_track.return_value = Mock(item_id="1") + + tracks = await media_manager.get_artist_toptracks("1") + + assert len(tracks) == 1 + assert tracks[0].item_id == "1" + provider_mock.api.get_data.assert_called_with( + "artists/1/toptracks", + params={"limit": 10, "offset": 0}, + ) + + +@patch("music_assistant.providers.tidal.media.parse_track") +async def test_get_playlist_tracks( + mock_parse_track: Mock, media_manager: TidalMediaManager, provider_mock: Mock +) -> None: + """Test get_playlist_tracks.""" + provider_mock.api.get_data.return_value = {"items": [{"id": 1}]} + mock_parse_track.return_value = Mock(item_id="1") + + tracks = await media_manager.get_playlist_tracks("1") + + assert len(tracks) == 1 + assert tracks[0].item_id == "1" + provider_mock.api.get_data.assert_called_with( + "playlists/1/tracks", + params={"limit": 200, "offset": 0}, + ) diff --git a/tests/providers/tidal/test_media_extended.py b/tests/providers/tidal/test_media_extended.py new file mode 100644 index 00000000..6ba419de --- /dev/null +++ b/tests/providers/tidal/test_media_extended.py @@ -0,0 +1,171 @@ +"""Additional tests for Tidal Media Manager - Mix operations and similar tracks.""" + +from unittest.mock import AsyncMock, Mock, patch + +import pytest +from music_assistant_models.enums import MediaType +from music_assistant_models.errors import MediaNotFoundError +from music_assistant_models.media_items import ItemMapping + +from music_assistant.providers.tidal.media import TidalMediaManager + + +@pytest.fixture +def provider_mock() -> Mock: + """Return a mock provider.""" + provider = Mock() + provider.lookup_key = "tidal" + provider.domain = "tidal" + provider.instance_id = "tidal_instance" + provider.auth.user_id = "12345" + provider.auth.country_code = "US" + provider.api = AsyncMock() + provider.api.get_data.return_value = {} + provider.logger = Mock() + + def get_item_mapping(media_type: MediaType, key: str, name: str) -> ItemMapping: + return ItemMapping( + media_type=media_type, + item_id=key, + provider=provider.lookup_key, + name=name, + ) + + provider.get_item_mapping.side_effect = get_item_mapping + + return provider + + +@pytest.fixture +def media_manager(provider_mock: Mock) -> TidalMediaManager: + """Return a TidalMediaManager instance.""" + return TidalMediaManager(provider_mock) + + +@patch("music_assistant.providers.tidal.media.parse_playlist") +async def test_get_playlist_mix( + mock_parse_playlist: Mock, media_manager: TidalMediaManager, provider_mock: Mock +) -> None: + """Test get_playlist with mix ID.""" + provider_mock.api.get_data.return_value = { + "title": "My Mix", + "rows": [ + {"modules": [{"mix": {"images": {"MEDIUM": {"url": "http://example.com/mix.jpg"}}}}]}, + ], + "lastUpdated": "2023-01-01", + } + mock_parse_playlist.return_value = Mock(item_id="mix_123") + + playlist = await media_manager.get_playlist("mix_123") + + assert playlist.item_id == "mix_123" + provider_mock.api.get_data.assert_called_with( + "pages/mix", + params={"mixId": "123", "deviceType": "BROWSER"}, + ) + mock_parse_playlist.assert_called_once() + # Verify is_mix=True was passed + assert mock_parse_playlist.call_args[1]["is_mix"] is True + + +@patch("music_assistant.providers.tidal.media.parse_playlist") +async def test_get_playlist_fallback_to_mix( + mock_parse_playlist: Mock, media_manager: TidalMediaManager, provider_mock: Mock +) -> None: + """Test get_playlist falls back to mix lookup on MediaNotFoundError.""" + # First call raises error, second succeeds + provider_mock.api.get_data.side_effect = [ + MediaNotFoundError("Playlist not found"), + { + "title": "My Mix", + "rows": [{"modules": [{"mix": {"images": {}}}]}], + }, + ] + mock_parse_playlist.return_value = Mock(item_id="123") + + playlist = await media_manager.get_playlist("123") + + assert playlist.item_id == "123" + assert provider_mock.api.get_data.call_count == 2 + # First call as playlist + provider_mock.api.get_data.assert_any_call("playlists/123") + # Second call as mix + provider_mock.api.get_data.assert_any_call( + "pages/mix", + params={"mixId": "123", "deviceType": "BROWSER"}, + ) + + +@patch("music_assistant.providers.tidal.media.parse_track") +async def test_get_similar_tracks( + mock_parse_track: Mock, media_manager: TidalMediaManager, provider_mock: Mock +) -> None: + """Test get_similar_tracks.""" + provider_mock.api.get_data.return_value = {"items": [{"id": 1}, {"id": 2}, {"id": 3}]} + mock_parse_track.return_value = Mock(item_id="1") + + tracks = await media_manager.get_similar_tracks("123", limit=25) + + assert len(tracks) == 3 + provider_mock.api.get_data.assert_called_with( + "tracks/123/radio", + params={"limit": 25}, + ) + + +@patch("music_assistant.providers.tidal.media.parse_track") +async def test_get_playlist_tracks_mix( + mock_parse_track: Mock, media_manager: TidalMediaManager, provider_mock: Mock +) -> None: + """Test get_playlist_tracks with mix ID.""" + provider_mock.api.get_data.return_value = { + "rows": [ + {}, # First row is mix info + { # Second row has tracks + "modules": [{"pagedList": {"items": [{"id": 1}, {"id": 2}]}}] + }, + ] + } + + # Mock track with position attribute + def create_track(item_id: int, position: int) -> Mock: + track = Mock(item_id=str(item_id)) + track.position = position + return track + + mock_parse_track.side_effect = [ + create_track(1, 1), + create_track(2, 2), + ] + + tracks = await media_manager.get_playlist_tracks("mix_123") + + assert len(tracks) == 2 + assert tracks[0].position == 1 + assert tracks[1].position == 2 + provider_mock.api.get_data.assert_called_with( + "pages/mix", + params={"mixId": "123", "deviceType": "BROWSER"}, + ) + + +async def test_get_mix_details_no_rows( + media_manager: TidalMediaManager, provider_mock: Mock +) -> None: + """Test _get_mix_details raises error when no rows.""" + provider_mock.api.get_data.return_value = {"rows": []} + + with pytest.raises(MediaNotFoundError, match="Mix 123 has no tracks"): + await media_manager.get_playlist_tracks("mix_123") + + +async def test_search_empty_results(media_manager: TidalMediaManager, provider_mock: Mock) -> None: + """Test search with empty results.""" + provider_mock.api.get_data.return_value = {} + + results = await media_manager.search("query", [MediaType.ARTIST]) + + assert len(results.artists) == 0 + assert len(results.albums) == 0 + assert len(results.tracks) == 0 + assert len(results.playlists) == 0 diff --git a/tests/providers/tidal/test_page_parser.py b/tests/providers/tidal/test_page_parser.py new file mode 100644 index 00000000..7ab0c9f8 --- /dev/null +++ b/tests/providers/tidal/test_page_parser.py @@ -0,0 +1,58 @@ +"""Test Tidal Page Parser.""" + +import json +import pathlib +from unittest.mock import Mock + +import pytest +from music_assistant_models.enums import MediaType + +from music_assistant.providers.tidal.tidal_page_parser import TidalPageParser + +FIXTURES_DIR = pathlib.Path(__file__).parent / "fixtures" +PAGE_FIXTURES = list(FIXTURES_DIR.glob("pages/*.json")) + + +@pytest.fixture +def provider_mock() -> Mock: + """Return a mock provider.""" + provider = Mock() + provider.lookup_key = "tidal" + provider.domain = "tidal" + provider.instance_id = "tidal_instance" + provider.auth.user_id = "12345" + provider.logger = Mock() + return provider + + +@pytest.mark.parametrize("example", PAGE_FIXTURES, ids=lambda val: str(val.stem)) +def test_page_parser(example: pathlib.Path, provider_mock: Mock) -> None: + """Test page parser with fixtures.""" + with open(example) as f: + data = json.load(f) + + parser = TidalPageParser(provider_mock) + parser.parse_page_structure(data, "pages/home") + + assert len(parser._module_map) == 3 + + # Test first module (Playlists) + module_info = parser._module_map[0] + items, content_type = parser.get_module_items(module_info) + assert content_type == MediaType.PLAYLIST + assert len(items) == 1 + assert items[0].name == "Test Playlist" + + # Test second module (Albums) + module_info = parser._module_map[1] + items, content_type = parser.get_module_items(module_info) + assert content_type == MediaType.ALBUM + assert len(items) == 1 + assert items[0].name == "Test Album" + + # Test third module (Mixes) + module_info = parser._module_map[2] + items, content_type = parser.get_module_items(module_info) + assert content_type == MediaType.PLAYLIST + assert len(items) == 1 + assert items[0].name == "My Mix" diff --git a/tests/providers/tidal/test_page_parser_extended.py b/tests/providers/tidal/test_page_parser_extended.py new file mode 100644 index 00000000..ce622e7f --- /dev/null +++ b/tests/providers/tidal/test_page_parser_extended.py @@ -0,0 +1,519 @@ +"""Extended tests for Tidal Page Parser.""" + +from typing import Any +from unittest.mock import AsyncMock, Mock, patch + +import pytest +from music_assistant_models.enums import MediaType +from music_assistant_models.media_items import ItemMapping + +from music_assistant.providers.tidal.tidal_page_parser import TidalPageParser + + +@pytest.fixture +def provider_mock() -> Mock: + """Return a mock provider.""" + provider = Mock() + provider.lookup_key = "tidal" + provider.domain = "tidal" + provider.instance_id = "tidal_instance" + provider.auth.user_id = "12345" + provider.logger = Mock() + provider.mass = Mock() + provider.mass.cache.get = AsyncMock(return_value=None) + provider.mass.cache.set = AsyncMock() + + def get_item_mapping(media_type: MediaType, key: str, name: str) -> ItemMapping: + return ItemMapping( + media_type=media_type, + item_id=key, + provider=provider.lookup_key, + name=name, + ) + + provider.get_item_mapping.side_effect = get_item_mapping + return provider + + +def test_parser_initialization(provider_mock: Mock) -> None: + """Test parser initialization.""" + parser = TidalPageParser(provider_mock) + + assert parser.provider == provider_mock + assert parser.logger == provider_mock.logger + assert "MIX" in parser._content_map + assert "PLAYLIST" in parser._content_map + assert "ALBUM" in parser._content_map + assert "TRACK" in parser._content_map + assert "ARTIST" in parser._content_map + assert len(parser._module_map) == 0 + assert parser._page_path is None + assert parser._parsed_at == 0 + + +@patch("music_assistant.providers.tidal.tidal_page_parser.parse_track") +def test_process_track_list(mock_parse_track: Mock, provider_mock: Mock) -> None: + """Test processing TRACK_LIST module.""" + mock_track = Mock() + mock_track.name = "Test Track" + mock_parse_track.return_value = mock_track + + page_data = { + "rows": [ + { + "modules": [ + { + "title": "Top Tracks", + "type": "TRACK_LIST", + "pagedList": { + "items": [ + {"id": 1, "title": "Track 1"}, + {"id": 2, "title": "Track 2"}, + ] + }, + } + ] + } + ] + } + + parser = TidalPageParser(provider_mock) + parser.parse_page_structure(page_data, "pages/test") + + module_info = parser._module_map[0] + items, content_type = parser.get_module_items(module_info) + + assert content_type == MediaType.TRACK + assert len(items) == 2 + assert mock_parse_track.call_count == 2 + + +@patch("music_assistant.providers.tidal.tidal_page_parser.parse_artist") +def test_process_artist_list(mock_parse_artist: Mock, provider_mock: Mock) -> None: + """Test processing ARTIST_LIST module.""" + mock_artist = Mock() + mock_artist.name = "Test Artist" + mock_parse_artist.return_value = mock_artist + + page_data = { + "rows": [ + { + "modules": [ + { + "title": "Popular Artists", + "type": "ARTIST_LIST", + "pagedList": { + "items": [ + {"id": 1, "name": "Artist 1"}, + {"id": 2, "name": "Artist 2"}, + {"id": 3, "name": "Artist 3"}, + ] + }, + } + ] + } + ] + } + + parser = TidalPageParser(provider_mock) + parser.parse_page_structure(page_data, "pages/test") + + module_info = parser._module_map[0] + items, content_type = parser.get_module_items(module_info) + + assert content_type == MediaType.ARTIST + assert len(items) == 3 + assert mock_parse_artist.call_count == 3 + + +@patch("music_assistant.providers.tidal.tidal_page_parser.parse_playlist") +def test_process_mix_list(mock_parse_playlist: Mock, provider_mock: Mock) -> None: + """Test processing MIX_LIST module.""" + mock_mix = Mock() + mock_mix.name = "Daily Mix" + mock_parse_playlist.return_value = mock_mix + + page_data = { + "rows": [ + { + "modules": [ + { + "title": "Your Mixes", + "type": "MIX_LIST", + "pagedList": { + "items": [ + {"id": "mix1", "title": "Mix 1"}, + ] + }, + } + ] + } + ] + } + + parser = TidalPageParser(provider_mock) + parser.parse_page_structure(page_data, "pages/test") + + module_info = parser._module_map[0] + items, content_type = parser.get_module_items(module_info) + + assert content_type == MediaType.PLAYLIST + assert len(items) == 1 + mock_parse_playlist.assert_called_with( + provider_mock, {"id": "mix1", "title": "Mix 1"}, is_mix=True + ) + + +@patch("music_assistant.providers.tidal.tidal_page_parser.parse_track") +def test_process_track_list_with_error(mock_parse_track: Mock, provider_mock: Mock) -> None: + """Test TRACK_LIST with parsing error.""" + mock_parse_track.side_effect = [ + Mock(name="Track 1"), + KeyError("Missing field"), + Mock(name="Track 3"), + ] + + page_data = { + "rows": [ + { + "modules": [ + { + "title": "Tracks", + "type": "TRACK_LIST", + "pagedList": { + "items": [ + {"id": 1}, + {"id": 2}, + {"id": 3}, + ] + }, + } + ] + } + ] + } + + parser = TidalPageParser(provider_mock) + parser.parse_page_structure(page_data, "pages/test") + + module_info = parser._module_map[0] + items, _ = parser.get_module_items(module_info) + + # Should have 2 items (one failed) + assert len(items) == 2 + provider_mock.logger.warning.assert_called() + + +def test_process_track_list_with_non_dict_items(provider_mock: Mock) -> None: + """Test TRACK_LIST with non-dict items (should be skipped).""" + page_data = { + "rows": [ + { + "modules": [ + { + "title": "Tracks", + "type": "TRACK_LIST", + "pagedList": { + "items": [ + "not a dict", + 12345, + None, + ] + }, + } + ] + } + ] + } + + parser = TidalPageParser(provider_mock) + parser.parse_page_structure(page_data, "pages/test") + + module_info = parser._module_map[0] + items, _ = parser.get_module_items(module_info) + + # All items should be skipped + assert len(items) == 0 + + +async def test_from_cache_success(provider_mock: Mock) -> None: + """Test loading parser from cache.""" + cached_data = { + "module_map": [{"title": "Test Module"}], + "content_map": {"PLAYLIST": {}}, + "parsed_at": 1234567890, + } + provider_mock.mass.cache.get.return_value = cached_data + + parser = await TidalPageParser.from_cache(provider_mock, "pages/home") + + assert parser is not None + assert len(parser._module_map) == 1 + assert parser._parsed_at == 1234567890 + provider_mock.mass.cache.get.assert_called_with( + "pages/home", + provider=provider_mock.instance_id, + category=1, # CACHE_CATEGORY_RECOMMENDATIONS + ) + + +async def test_from_cache_miss(provider_mock: Mock) -> None: + """Test cache miss returns None.""" + provider_mock.mass.cache.get.return_value = None + + parser = await TidalPageParser.from_cache(provider_mock, "pages/home") + + assert parser is None + + +async def test_from_cache_invalid_data(provider_mock: Mock) -> None: + """Test cache with invalid data returns None.""" + # from_cache expects dict, won't handle invalid data gracefully + # The method will fail on .get() calls if data is invalid + provider_mock.mass.cache.get.return_value = {} # Empty dict is valid but has no data + + parser = await TidalPageParser.from_cache(provider_mock, "pages/home") + + # Parser should be None because empty dict evaluates to False + assert parser is None + + +@patch("music_assistant.providers.tidal.tidal_page_parser.parse_playlist") +def test_playlist_list_with_mix_detection(mock_parse_playlist: Mock, provider_mock: Mock) -> None: + """Test PLAYLIST_LIST detects mixes.""" + mock_playlist = Mock() + mock_parse_playlist.return_value = mock_playlist + + page_data = { + "rows": [ + { + "modules": [ + { + "title": "Playlists", + "type": "PLAYLIST_LIST", + "pagedList": { + "items": [ + {"uuid": "1", "title": "Regular Playlist"}, + {"mixId": "mix_123", "title": "Mix", "mixType": "DISCOVERY"}, + ] + }, + } + ] + } + ] + } + + parser = TidalPageParser(provider_mock) + parser.parse_page_structure(page_data, "pages/test") + + module_info = parser._module_map[0] + items, _ = parser.get_module_items(module_info) + + assert len(items) == 2 + # First call should be is_mix=False, second should be is_mix=True + assert mock_parse_playlist.call_args_list[0][1]["is_mix"] is False + assert mock_parse_playlist.call_args_list[1][1]["is_mix"] is True + + +def test_empty_page_data(provider_mock: Mock) -> None: + """Test parsing empty page data.""" + parser = TidalPageParser(provider_mock) + parser.parse_page_structure({}, "pages/empty") + + assert len(parser._module_map) == 0 + assert parser._page_path == "pages/empty" + + +def test_page_with_no_modules(provider_mock: Mock) -> None: + """Test page with rows but no modules.""" + page_data: dict[str, Any] = { + "rows": [ + {}, + {"modules": []}, + ] + } + + parser = TidalPageParser(provider_mock) + parser.parse_page_structure(page_data, "pages/test") + + assert len(parser._module_map) == 0 + + +def test_multiple_module_types_in_one_page(provider_mock: Mock) -> None: + """Test page with multiple different module types.""" + with ( + patch("music_assistant.providers.tidal.tidal_page_parser.parse_playlist") as mock_pl, + patch("music_assistant.providers.tidal.tidal_page_parser.parse_album") as mock_al, + patch("music_assistant.providers.tidal.tidal_page_parser.parse_track") as mock_tr, + ): + mock_pl.return_value = Mock(name="Playlist") + mock_al.return_value = Mock(name="Album") + mock_tr.return_value = Mock(name="Track") + + page_data = { + "rows": [ + { + "modules": [ + { + "title": "Playlists", + "type": "PLAYLIST_LIST", + "pagedList": {"items": [{"uuid": "1"}]}, + }, + { + "title": "Albums", + "type": "ALBUM_LIST", + "pagedList": {"items": [{"id": 1}]}, + }, + ] + }, + { + "modules": [ + { + "title": "Tracks", + "type": "TRACK_LIST", + "pagedList": {"items": [{"id": 1}]}, + } + ] + }, + ] + } + + parser = TidalPageParser(provider_mock) + parser.parse_page_structure(page_data, "pages/test") + + assert len(parser._module_map) == 3 + + # Verify each module + _, type1 = parser.get_module_items(parser._module_map[0]) + assert type1 == MediaType.PLAYLIST + + _, type2 = parser.get_module_items(parser._module_map[1]) + assert type2 == MediaType.ALBUM + + _, type3 = parser.get_module_items(parser._module_map[2]) + assert type3 == MediaType.TRACK + + +def test_module_info_structure(provider_mock: Mock) -> None: + """Test module_info contains correct metadata.""" + page_data = { + "rows": [ + { + "modules": [ + { + "title": "Test Module", + "type": "PLAYLIST_LIST", + "pagedList": {"items": []}, + } + ] + } + ] + } + + parser = TidalPageParser(provider_mock) + parser.parse_page_structure(page_data, "pages/test") + + module_info = parser._module_map[0] + assert module_info["title"] == "Test Module" + assert module_info["type"] == "PLAYLIST_LIST" + assert module_info["module_idx"] == 0 + assert module_info["row_idx"] == 0 + assert "raw_data" in module_info + + +@patch("music_assistant.providers.tidal.tidal_page_parser.parse_playlist") +def test_process_highlight_module(mock_parse_playlist: Mock, provider_mock: Mock) -> None: + """Test processing HIGHLIGHT_MODULE.""" + mock_playlist = Mock() + mock_playlist.name = "Highlight Playlist" + mock_parse_playlist.return_value = mock_playlist + + page_data = { + "rows": [ + { + "modules": [ + { + "title": "Highlights", + "type": "HIGHLIGHT_MODULE", + "highlight": [ + {"type": "PLAYLIST", "item": {"uuid": "1", "title": "Highlight 1"}} + ], + } + ] + } + ] + } + + parser = TidalPageParser(provider_mock) + parser.parse_page_structure(page_data, "pages/test") + + module_info = parser._module_map[0] + items, content_type = parser.get_module_items(module_info) + + assert content_type == MediaType.PLAYLIST + assert len(items) == 1 + mock_parse_playlist.assert_called_once() + + +def test_process_generic_items(provider_mock: Mock) -> None: + """Test processing generic items with type inference.""" + with ( + patch("music_assistant.providers.tidal.tidal_page_parser.parse_track") as mock_track, + patch("music_assistant.providers.tidal.tidal_page_parser.parse_album") as mock_album, + ): + mock_track.return_value = Mock(media_type=MediaType.TRACK) + mock_album.return_value = Mock(media_type=MediaType.ALBUM) + + page_data = { + "rows": [ + { + "modules": [ + { + "title": "Generic", + "type": "UNKNOWN_LIST", + "pagedList": { + "items": [ + { + "id": 1, + "title": "Track", + "duration": 100, + "album": {}, + }, # Inferred TRACK + { + "id": 2, + "title": "Album", + "numberOfTracks": 10, + "artists": [], + }, # Inferred ALBUM + ] + }, + } + ] + } + ] + } + + parser = TidalPageParser(provider_mock) + parser.parse_page_structure(page_data, "pages/test") + + module_info = parser._module_map[0] + items, _ = parser.get_module_items(module_info) + + assert len(items) == 2 + mock_track.assert_called_once() + mock_album.assert_called_once() + + +def test_content_stats(provider_mock: Mock) -> None: + """Test content_stats property.""" + parser = TidalPageParser(provider_mock) + parser._module_map = [{"title": "Test"}] + parser._parsed_at = 1234567890 + parser._content_map["PLAYLIST"] = {"1": {}} + + stats = parser.content_stats + + assert stats["modules"] == 1 + assert stats["playlist_count"] == 1 + assert stats["album_count"] == 0 + assert "cache_age_minutes" in stats diff --git a/tests/providers/tidal/test_parsers.py b/tests/providers/tidal/test_parsers.py new file mode 100644 index 00000000..31971360 --- /dev/null +++ b/tests/providers/tidal/test_parsers.py @@ -0,0 +1,93 @@ +"""Test we can parse Tidal models into Music Assistant models.""" + +import json +import pathlib +from unittest.mock import Mock + +import pytest +from music_assistant_models.enums import MediaType +from music_assistant_models.media_items import ItemMapping +from syrupy.assertion import SnapshotAssertion + +from music_assistant.providers.tidal.parsers import ( + parse_album, + parse_artist, + parse_playlist, + parse_track, +) + +FIXTURES_DIR = pathlib.Path(__file__).parent / "fixtures" +ARTIST_FIXTURES = list(FIXTURES_DIR.glob("artists/*.json")) +ALBUM_FIXTURES = list(FIXTURES_DIR.glob("albums/*.json")) +TRACK_FIXTURES = list(FIXTURES_DIR.glob("tracks/*.json")) +PLAYLIST_FIXTURES = list(FIXTURES_DIR.glob("playlists/*.json")) + + +@pytest.fixture +def provider_mock() -> Mock: + """Return a mock provider.""" + provider = Mock() + provider.lookup_key = "tidal" + provider.domain = "tidal" + provider.instance_id = "tidal_instance" + provider.auth.user_id = "12345" + provider.auth.user.profile_name = "Test User" + provider.auth.user.user_name = "Test User" + + def get_item_mapping(media_type: MediaType, key: str, name: str) -> ItemMapping: + return ItemMapping( + media_type=media_type, + item_id=key, + provider=provider.lookup_key, + name=name, + ) + + provider.get_item_mapping.side_effect = get_item_mapping + + return provider + + +@pytest.mark.parametrize("example", ARTIST_FIXTURES, ids=lambda val: str(val.stem)) +def test_parse_artist( + example: pathlib.Path, provider_mock: Mock, snapshot: SnapshotAssertion +) -> None: + """Test we can parse artists.""" + with open(example) as f: + data = json.load(f) + parsed = parse_artist(provider_mock, data).to_dict() + assert snapshot == parsed + + +@pytest.mark.parametrize("example", ALBUM_FIXTURES, ids=lambda val: str(val.stem)) +def test_parse_album( + example: pathlib.Path, provider_mock: Mock, snapshot: SnapshotAssertion +) -> None: + """Test we can parse albums.""" + with open(example) as f: + data = json.load(f) + parsed = parse_album(provider_mock, data).to_dict() + assert snapshot == parsed + + +@pytest.mark.parametrize("example", TRACK_FIXTURES, ids=lambda val: str(val.stem)) +def test_parse_track( + example: pathlib.Path, provider_mock: Mock, snapshot: SnapshotAssertion +) -> None: + """Test we can parse tracks.""" + with open(example) as f: + data = json.load(f) + parsed = parse_track(provider_mock, data).to_dict() + assert snapshot == parsed + + +@pytest.mark.parametrize("example", PLAYLIST_FIXTURES, ids=lambda val: str(val.stem)) +def test_parse_playlist( + example: pathlib.Path, provider_mock: Mock, snapshot: SnapshotAssertion +) -> None: + """Test we can parse playlists.""" + with open(example) as f: + data = json.load(f) + + is_mix = "mix" in example.name + parsed = parse_playlist(provider_mock, data, is_mix=is_mix).to_dict() + assert snapshot == parsed diff --git a/tests/providers/tidal/test_playlist.py b/tests/providers/tidal/test_playlist.py new file mode 100644 index 00000000..35b012ed --- /dev/null +++ b/tests/providers/tidal/test_playlist.py @@ -0,0 +1,82 @@ +"""Test Tidal Playlist Manager.""" + +from unittest.mock import AsyncMock, Mock, patch + +import pytest + +from music_assistant.providers.tidal.playlist import TidalPlaylistManager + + +@pytest.fixture +def provider_mock() -> Mock: + """Return a mock provider.""" + provider = Mock() + provider.auth.user_id = "12345" + provider.api = AsyncMock() + provider.logger = Mock() + return provider + + +@pytest.fixture +def playlist_manager(provider_mock: Mock) -> TidalPlaylistManager: + """Return a TidalPlaylistManager instance.""" + return TidalPlaylistManager(provider_mock) + + +@patch("music_assistant.providers.tidal.playlist.parse_playlist") +async def test_create_playlist( + mock_parse_playlist: Mock, playlist_manager: TidalPlaylistManager, provider_mock: Mock +) -> None: + """Test create_playlist.""" + provider_mock.api.post.return_value = {"uuid": "1", "title": "Test Playlist"} + mock_parse_playlist.return_value = Mock(item_id="1") + + playlist = await playlist_manager.create("Test Playlist") + + assert playlist.item_id == "1" + provider_mock.api.post.assert_called_with( + "users/12345/playlists", + data={"title": "Test Playlist", "description": ""}, + as_form=True, + ) + mock_parse_playlist.assert_called_once() + + +async def test_add_playlist_tracks( + playlist_manager: TidalPlaylistManager, provider_mock: Mock +) -> None: + """Test add_playlist_tracks.""" + # Mock get response with ETag + provider_mock.api.get.return_value = ({"numberOfTracks": 5}, "etag_123") + + await playlist_manager.add_tracks("1", ["track_1", "track_2"]) + + provider_mock.api.get.assert_called_with("playlists/1", return_etag=True) + provider_mock.api.post.assert_called_with( + "playlists/1/items", + data={ + "onArtifactNotFound": "SKIP", + "trackIds": "track_1,track_2", + "toIndex": 5, + "onDupes": "SKIP", + }, + as_form=True, + headers={"If-None-Match": "etag_123"}, + ) + + +async def test_remove_playlist_tracks( + playlist_manager: TidalPlaylistManager, provider_mock: Mock +) -> None: + """Test remove_playlist_tracks.""" + # Mock get response with ETag + provider_mock.api.get.return_value = ({}, "etag_123") + + # Positions are 1-based in MA, converted to 0-based for Tidal + await playlist_manager.remove_tracks("1", (1, 3)) + + provider_mock.api.get.assert_called_with("playlists/1", return_etag=True) + provider_mock.api.delete.assert_called_with( + "playlists/1/items/0,2", + headers={"If-None-Match": "etag_123"}, + ) diff --git a/tests/providers/tidal/test_provider.py b/tests/providers/tidal/test_provider.py new file mode 100644 index 00000000..a3d6a9f5 --- /dev/null +++ b/tests/providers/tidal/test_provider.py @@ -0,0 +1,379 @@ +"""Test Tidal Provider integration.""" + +from collections.abc import AsyncGenerator +from typing import Any +from unittest.mock import AsyncMock, Mock, patch + +import pytest +from music_assistant_models.enums import MediaType +from music_assistant_models.errors import LoginFailed +from music_assistant_models.media_items import Album, Artist, Playlist, Track + +from music_assistant.providers.tidal.provider import TidalProvider + + +@pytest.fixture +def mass_mock() -> Mock: + """Return a mock MusicAssistant instance.""" + mass = Mock() + mass.http_session = AsyncMock() + mass.metadata.locale = "en_US" + mass.cache.get = AsyncMock(return_value=None) + mass.cache.set = AsyncMock() + mass.cache.delete = AsyncMock() + return mass + + +@pytest.fixture +def manifest_mock() -> Mock: + """Return a mock provider manifest.""" + manifest = Mock() + manifest.domain = "tidal" + return manifest + + +@pytest.fixture +def config_mock() -> Mock: + """Return a mock provider config.""" + config = Mock() + config.name = "Tidal Test" + config.instance_id = "tidal_test" + config.enabled = True + config.get_value.side_effect = lambda key: { + "auth_token": "mock_access_token", + "refresh_token": "mock_refresh_token", + "expiry_time": 1234567890, + "user_id": "12345", + "log_level": "INFO", + }.get(key, "INFO" if "log" in key else None) + return config + + +@pytest.fixture +def provider(mass_mock: Mock, manifest_mock: Mock, config_mock: Mock) -> TidalProvider: + """Return a TidalProvider instance.""" + return TidalProvider(mass_mock, manifest_mock, config_mock) + + +async def test_provider_initialization( + mass_mock: Mock, manifest_mock: Mock, config_mock: Mock +) -> None: + """Test provider initialization creates all managers.""" + provider = TidalProvider(mass_mock, manifest_mock, config_mock) + + assert provider.auth is not None + assert provider.api is not None + assert provider.library is not None + assert provider.media is not None + assert provider.playlists is not None + assert provider.recommendations_manager is not None + assert provider.streaming is not None + + +async def test_handle_async_init_success(provider: TidalProvider) -> None: + """Test successful async initialization.""" + with ( + patch.object(provider.auth, "initialize", new_callable=AsyncMock) as mock_init, + patch.object(provider.api, "get", new_callable=AsyncMock) as mock_get, + patch.object(provider, "get_user", new_callable=AsyncMock) as mock_get_user, + patch.object(provider.auth, "update_user_info", new_callable=AsyncMock), + ): + mock_init.return_value = True + mock_get.return_value = ({"userId": "12345", "sessionId": "session_123"}, None) + mock_get_user.return_value = {"id": "12345", "username": "testuser"} + + await provider.handle_async_init() + + mock_init.assert_called_once() + mock_get.assert_called_with("sessions") + + +async def test_handle_async_init_missing_auth() -> None: + """Test async initialization fails with missing auth.""" + mass = Mock() + mass.http_session = AsyncMock() + mass.metadata.locale = "en_US" + + manifest = Mock() + manifest.domain = "tidal" + + config = Mock() + config.name = "Tidal Test" + config.instance_id = "tidal_test" + config.enabled = True + config.get_value.side_effect = lambda key: "INFO" if "log" in key else None # Missing auth data + + provider = TidalProvider(mass, manifest, config) + + with pytest.raises(LoginFailed, match="Missing authentication data"): + await provider.handle_async_init() + + +async def test_handle_async_init_auth_failed(provider: TidalProvider) -> None: + """Test async initialization fails when auth initialize fails.""" + with patch.object(provider.auth, "initialize", new_callable=AsyncMock) as mock_init: + mock_init.return_value = False + + with pytest.raises(LoginFailed, match="Failed to authenticate with Tidal"): + await provider.handle_async_init() + + +async def test_search_delegates_to_media(provider: TidalProvider) -> None: + """Test search delegates to media manager.""" + with patch.object(provider.media, "search", new_callable=AsyncMock) as mock_search: + mock_search.return_value = Mock() + + await provider.search("test query", [MediaType.ARTIST], limit=10) + + mock_search.assert_called_with("test query", [MediaType.ARTIST], 10) + + +async def test_get_similar_tracks_delegates_to_media(provider: TidalProvider) -> None: + """Test get_similar_tracks delegates to media manager.""" + with patch.object(provider.media, "get_similar_tracks", new_callable=AsyncMock) as mock_get: + mock_get.return_value = [] + + result = await provider.get_similar_tracks("123", limit=30) + + mock_get.assert_called_with("123", 30) + assert result == [] + + +async def test_get_artist_delegates_to_media(provider: TidalProvider) -> None: + """Test get_artist delegates to media manager.""" + with patch.object(provider.media, "get_artist", new_callable=AsyncMock) as mock_get: + mock_get.return_value = Mock(spec=Artist) + + result = await provider.get_artist("123") + + mock_get.assert_called_with("123") + assert result is not None + + +async def test_get_album_delegates_to_media(provider: TidalProvider) -> None: + """Test get_album delegates to media manager.""" + with patch.object(provider.media, "get_album", new_callable=AsyncMock) as mock_get: + mock_get.return_value = Mock(spec=Album) + + result = await provider.get_album("123") + + mock_get.assert_called_with("123") + assert result is not None + + +async def test_get_track_delegates_to_media(provider: TidalProvider) -> None: + """Test get_track delegates to media manager.""" + with patch.object(provider.media, "get_track", new_callable=AsyncMock) as mock_get: + mock_get.return_value = Mock(spec=Track) + + result = await provider.get_track("123") + + mock_get.assert_called_with("123") + assert result is not None + + +async def test_get_playlist_delegates_to_media(provider: TidalProvider) -> None: + """Test get_playlist delegates to media manager.""" + with patch.object(provider.media, "get_playlist", new_callable=AsyncMock) as mock_get: + mock_get.return_value = Mock(spec=Playlist) + + result = await provider.get_playlist("123") + + mock_get.assert_called_with("123") + assert result is not None + + +async def test_get_album_tracks_delegates_to_media(provider: TidalProvider) -> None: + """Test get_album_tracks delegates to media manager.""" + with patch.object(provider.media, "get_album_tracks", new_callable=AsyncMock) as mock_get: + mock_get.return_value = [] + + result = await provider.get_album_tracks("123") + + mock_get.assert_called_with("123") + assert result == [] + + +async def test_get_artist_albums_delegates_to_media(provider: TidalProvider) -> None: + """Test get_artist_albums delegates to media manager.""" + with patch.object(provider.media, "get_artist_albums", new_callable=AsyncMock) as mock_get: + mock_get.return_value = [] + + result = await provider.get_artist_albums("123") + + mock_get.assert_called_with("123") + assert result == [] + + +async def test_get_artist_toptracks_delegates_to_media(provider: TidalProvider) -> None: + """Test get_artist_toptracks delegates to media manager.""" + with patch.object(provider.media, "get_artist_toptracks", new_callable=AsyncMock) as mock_get: + mock_get.return_value = [] + + await provider.get_artist_toptracks("123") + + mock_get.assert_called_with("123") + + +async def test_get_playlist_tracks_delegates_to_media(provider: TidalProvider) -> None: + """Test get_playlist_tracks delegates to media manager.""" + with patch.object(provider.media, "get_playlist_tracks", new_callable=AsyncMock) as mock_get: + mock_get.return_value = [] + + await provider.get_playlist_tracks("123", page=2) + + mock_get.assert_called_with("123", 2) + + +async def test_get_stream_details_delegates_to_streaming(provider: TidalProvider) -> None: + """Test get_stream_details delegates to streaming manager.""" + with patch.object(provider.streaming, "get_stream_details", new_callable=AsyncMock) as mock_get: + mock_get.return_value = Mock() + + result = await provider.get_stream_details("123") + + mock_get.assert_called_with("123") + assert result is not None + + +async def test_get_item_mapping(provider: TidalProvider) -> None: + """Test get_item_mapping creates correct ItemMapping.""" + mapping = provider.get_item_mapping(MediaType.ARTIST, "123", "Test Artist") + + assert mapping.media_type == MediaType.ARTIST + assert mapping.item_id == "123" + assert mapping.provider == provider.lookup_key + assert mapping.name == "Test Artist" + + +async def test_get_library_artists_delegates_to_library(provider: TidalProvider) -> None: + """Test get_library_artists delegates to library manager.""" + + async def mock_generator() -> AsyncGenerator[Any, None]: + yield Mock(spec=Artist) + yield Mock(spec=Artist) + + with patch.object(provider.library, "get_artists", return_value=mock_generator()): + artists = [] + async for artist in provider.get_library_artists(): + artists.append(artist) + + assert len(artists) == 2 + + +async def test_get_library_albums_delegates_to_library(provider: TidalProvider) -> None: + """Test get_library_albums delegates to library manager.""" + + async def mock_generator() -> AsyncGenerator[Any, None]: + yield Mock(spec=Album) + + with patch.object(provider.library, "get_albums", return_value=mock_generator()): + albums = [] + async for album in provider.get_library_albums(): + albums.append(album) + + assert len(albums) == 1 + + +async def test_get_library_tracks_delegates_to_library(provider: TidalProvider) -> None: + """Test get_library_tracks delegates to library manager.""" + + async def mock_generator() -> AsyncGenerator[Any, None]: + yield Mock(spec=Track) + yield Mock(spec=Track) + yield Mock(spec=Track) + + with patch.object(provider.library, "get_tracks", return_value=mock_generator()): + tracks = [] + async for track in provider.get_library_tracks(): + tracks.append(track) + + assert len(tracks) == 3 + + +async def test_get_library_playlists_delegates_to_library(provider: TidalProvider) -> None: + """Test get_library_playlists delegates to library manager.""" + + async def mock_generator() -> AsyncGenerator[Any, None]: + yield Mock(spec=Playlist) + + with patch.object(provider.library, "get_playlists", return_value=mock_generator()): + playlists = [] + async for playlist in provider.get_library_playlists(): + playlists.append(playlist) + + assert len(playlists) == 1 + + +async def test_library_add_delegates_to_library(provider: TidalProvider) -> None: + """Test library_add delegates to library manager.""" + with patch.object(provider.library, "add_item", new_callable=AsyncMock) as mock_add: + mock_add.return_value = True + item = Mock() + + result = await provider.library_add(item) + + assert result is True + mock_add.assert_called_with(item) + + +async def test_library_remove_delegates_to_library(provider: TidalProvider) -> None: + """Test library_remove delegates to library manager.""" + with patch.object(provider.library, "remove_item", new_callable=AsyncMock) as mock_remove: + mock_remove.return_value = True + + result = await provider.library_remove("123", MediaType.TRACK) + + assert result is True + mock_remove.assert_called_with("123", MediaType.TRACK) + + +async def test_create_playlist_delegates_to_playlists(provider: TidalProvider) -> None: + """Test create_playlist delegates to playlist manager.""" + with patch.object(provider.playlists, "create", new_callable=AsyncMock) as mock_create: + mock_create.return_value = Mock(spec=Playlist) + + await provider.create_playlist("New Playlist") + + mock_create.assert_called_with("New Playlist") + + +async def test_add_playlist_tracks_delegates_to_playlists(provider: TidalProvider) -> None: + """Test add_playlist_tracks delegates to playlist manager.""" + with patch.object(provider.playlists, "add_tracks", new_callable=AsyncMock) as mock_add: + await provider.add_playlist_tracks("123", ["track1", "track2"]) + + mock_add.assert_called_with("123", ["track1", "track2"]) + + +async def test_remove_playlist_tracks_delegates_to_playlists(provider: TidalProvider) -> None: + """Test remove_playlist_tracks delegates to playlist manager.""" + with patch.object(provider.playlists, "remove_tracks", new_callable=AsyncMock) as mock_remove: + await provider.remove_playlist_tracks("123", (1, 2, 3)) + + mock_remove.assert_called_with("123", (1, 2, 3)) + + +async def test_recommendations_delegates_to_recommendations_manager( + provider: TidalProvider, +) -> None: + """Test recommendations delegates to recommendations manager.""" + with patch.object( + provider.recommendations_manager, "get_recommendations", new_callable=AsyncMock + ) as mock_get: + mock_get.return_value = [] + + await provider.recommendations() + + mock_get.assert_called_once() + + +async def test_get_user(provider: TidalProvider) -> None: + """Test get_user fetches user data.""" + with patch.object(provider.api, "get_data", new_callable=AsyncMock) as mock_get: + mock_get.return_value = {"id": "123", "username": "testuser"} + + user = await provider.get_user("123") + + assert user["id"] == "123" + mock_get.assert_called_with("users/123") diff --git a/tests/providers/tidal/test_recommendations.py b/tests/providers/tidal/test_recommendations.py new file mode 100644 index 00000000..95b431de --- /dev/null +++ b/tests/providers/tidal/test_recommendations.py @@ -0,0 +1,99 @@ +"""Test Tidal Recommendation Manager.""" + +from unittest.mock import AsyncMock, Mock, patch + +import pytest +from music_assistant_models.enums import MediaType + +from music_assistant.providers.tidal.recommendations import TidalRecommendationManager + + +@pytest.fixture +def provider_mock() -> Mock: + """Return a mock provider.""" + provider = Mock() + provider.domain = "tidal" + provider.instance_id = "tidal_instance" + provider.auth.user_id = "12345" + provider.auth.country_code = "US" + provider.api = AsyncMock() + provider.logger = Mock() + + # Mock mass + provider.mass = Mock() + provider.mass.config.get_provider_configs = AsyncMock(return_value=[]) + provider.mass.metadata.locale = "en_US" + provider.mass.cache.set = AsyncMock() + + return provider + + +@pytest.fixture +def recommendation_manager(provider_mock: Mock) -> TidalRecommendationManager: + """Return a TidalRecommendationManager instance.""" + return TidalRecommendationManager(provider_mock) + + +@pytest.mark.usefixtures("provider_mock") +async def test_get_recommendations( + recommendation_manager: TidalRecommendationManager, +) -> None: + """Test get_recommendations.""" + # Mock get_page_content to return a mock parser + mock_parser = Mock() + mock_parser._module_map = [{"title": "Test Module"}] + mock_parser.get_module_items.return_value = ( + [Mock(item_id="rec_1", name="Recommendation 1")], + MediaType.PLAYLIST, + ) + + with patch.object( + recommendation_manager, "get_page_content", new_callable=AsyncMock + ) as mock_get_page: + mock_get_page.return_value = mock_parser + + recommendations = await recommendation_manager.get_recommendations() + + assert len(recommendations) == 1 + assert recommendations[0].name == "Test Module" + assert len(recommendations[0].items) == 1 + + # Should fetch pages + assert mock_get_page.call_count >= 1 + + +async def test_get_page_content( + recommendation_manager: TidalRecommendationManager, provider_mock: Mock +) -> None: + """Test get_page_content.""" + with patch( + "music_assistant.providers.tidal.recommendations.TidalPageParser" + ) as mock_parser_cls: + # Configure from_cache to be async and return None + mock_parser_cls.from_cache = AsyncMock(return_value=None) + + # Configure parser instance + mock_parser_instance = mock_parser_cls.return_value + mock_parser_instance._module_map = [] + mock_parser_instance._content_map = {} + mock_parser_instance._parsed_at = 1234567890 + mock_parser_instance.parse_page_structure = Mock() # Ensure it's a synchronous mock + + # Mock API response + provider_mock.api.get.return_value = ({"rows": []}, "etag") + + parser = await recommendation_manager.get_page_content("pages/home") + + assert parser == mock_parser_instance + + # Should check cache + mock_parser_cls.from_cache.assert_called_with(provider_mock, "pages/home") + + # Should fetch from API + provider_mock.api.get.assert_called() + + # Should parse structure + mock_parser_instance.parse_page_structure.assert_called() + + # Should cache result + provider_mock.mass.cache.set.assert_called() diff --git a/tests/providers/tidal/test_streaming.py b/tests/providers/tidal/test_streaming.py new file mode 100644 index 00000000..72f2c11d --- /dev/null +++ b/tests/providers/tidal/test_streaming.py @@ -0,0 +1,363 @@ +"""Test Tidal Streaming Manager.""" + +from unittest.mock import AsyncMock, MagicMock, Mock + +import pytest +from music_assistant_models.enums import ContentType, ExternalID, StreamType +from music_assistant_models.errors import MediaNotFoundError +from music_assistant_models.media_items import Track + +from music_assistant.providers.tidal.streaming import TidalStreamingManager + + +@pytest.fixture +def provider_mock() -> Mock: + """Return a mock provider.""" + provider = Mock() + provider.lookup_key = "tidal" + provider.instance_id = "tidal_instance" + provider.config.get_value.return_value = "HIGH" + provider.api = AsyncMock() + provider.api.OPEN_API_URL = "https://openapi.tidal.com/v2" + + # Mock throttler bypass as async context manager using MagicMock + bypass_ctx = MagicMock() + bypass_ctx.__aenter__ = AsyncMock(return_value=None) + bypass_ctx.__aexit__ = AsyncMock(return_value=None) + provider.api.throttler = Mock() + provider.api.throttler.bypass = Mock(return_value=bypass_ctx) + + provider.get_track = AsyncMock() + + # Mock mass + provider.mass = Mock() + provider.mass.cache.get = AsyncMock(return_value=None) + provider.mass.cache.set = AsyncMock() + provider.mass.cache.delete = AsyncMock() + provider.mass.music.tracks.get_library_item_by_prov_id = AsyncMock(return_value=None) + + return provider + + +@pytest.fixture +def streaming_manager(provider_mock: Mock) -> TidalStreamingManager: + """Return a TidalStreamingManager instance.""" + return TidalStreamingManager(provider_mock) + + +@pytest.fixture +def mock_track() -> Mock: + """Return a mock track.""" + track = Mock(spec=Track) + track.item_id = "123" + track.duration = 180 + return track + + +async def test_get_stream_details_lossless( + streaming_manager: TidalStreamingManager, provider_mock: Mock, mock_track: Mock +) -> None: + """Test get_stream_details with LOSSLESS quality.""" + provider_mock.get_track.return_value = mock_track + provider_mock.api.get.return_value = ( + { + "manifestMimeType": "application/vnd.tidal.bts", + "urls": ["https://example.com/stream.flac"], + "audioQuality": "LOSSLESS", + "sampleRate": 44100, + "bitDepth": 16, + }, + None, + ) + + stream_details = await streaming_manager.get_stream_details("123") + + assert stream_details.item_id == "123" + assert stream_details.provider == "tidal" + assert stream_details.audio_format.content_type == ContentType.FLAC + assert stream_details.audio_format.sample_rate == 44100 + assert stream_details.audio_format.bit_depth == 16 + assert stream_details.stream_type == StreamType.HTTP + assert stream_details.path == "https://example.com/stream.flac" + assert stream_details.can_seek is True + + provider_mock.get_track.assert_called_with("123") + provider_mock.api.get.assert_called_with( + "tracks/123/playbackinfopostpaywall", + params={ + "playbackmode": "STREAM", + "assetpresentation": "FULL", + "audioquality": "HIGH", + }, + ) + + +async def test_get_stream_details_hires( + streaming_manager: TidalStreamingManager, provider_mock: Mock, mock_track: Mock +) -> None: + """Test get_stream_details with HIRES_LOSSLESS quality.""" + provider_mock.get_track.return_value = mock_track + provider_mock.api.get.return_value = { + "urls": ["https://example.com/stream.flac"], + "audioQuality": "HIRES_LOSSLESS", + "sampleRate": 96000, + "bitDepth": 24, + } + + stream_details = await streaming_manager.get_stream_details("123") + + assert stream_details.audio_format.content_type == ContentType.FLAC + assert stream_details.audio_format.sample_rate == 96000 + assert stream_details.audio_format.bit_depth == 24 + + +async def test_get_stream_details_with_dash_manifest( + streaming_manager: TidalStreamingManager, provider_mock: Mock, mock_track: Mock +) -> None: + """Test get_stream_details with DASH manifest.""" + provider_mock.get_track.return_value = mock_track + provider_mock.api.get.return_value = { + "manifestMimeType": "application/dash+xml", + "manifest": "base64encodedmanifestdata", + "audioQuality": "HIGH", + "sampleRate": 44100, + "bitDepth": 16, + } + + stream_details = await streaming_manager.get_stream_details("123") + + assert isinstance(stream_details.path, str) + assert stream_details.path.startswith("data:application/dash+xml;base64,") + assert "base64encodedmanifestdata" in stream_details.path + + +async def test_get_stream_details_with_codec( + streaming_manager: TidalStreamingManager, provider_mock: Mock, mock_track: Mock +) -> None: + """Test get_stream_details with codec specified.""" + provider_mock.get_track.return_value = mock_track + provider_mock.api.get.return_value = { + "urls": ["https://example.com/stream.aac"], + "audioQuality": "HIGH", + "codec": "AAC", + "sampleRate": 44100, + "bitDepth": 16, + } + + stream_details = await streaming_manager.get_stream_details("123") + + assert stream_details.audio_format.content_type == ContentType.AAC + + +async def test_get_stream_details_defaults_to_mp4( + streaming_manager: TidalStreamingManager, provider_mock: Mock, mock_track: Mock +) -> None: + """Test get_stream_details defaults to MP4 when no quality/codec.""" + provider_mock.get_track.return_value = mock_track + provider_mock.api.get.return_value = { + "urls": ["https://example.com/stream.m4a"], + "sampleRate": 44100, + "bitDepth": 16, + } + + stream_details = await streaming_manager.get_stream_details("123") + + assert stream_details.audio_format.content_type == ContentType.MP4 + + +async def test_get_stream_details_no_urls_raises_error( + streaming_manager: TidalStreamingManager, provider_mock: Mock, mock_track: Mock +) -> None: + """Test get_stream_details raises error when no URLs.""" + provider_mock.get_track.return_value = mock_track + provider_mock.api.get.return_value = { + "audioQuality": "HIGH", + "sampleRate": 44100, + "bitDepth": 16, + } + + with pytest.raises(MediaNotFoundError, match="No stream URL found"): + await streaming_manager.get_stream_details("123") + + +async def test_get_stream_details_track_not_found_no_isrc( + streaming_manager: TidalStreamingManager, provider_mock: Mock +) -> None: + """Test get_stream_details when track not found and no ISRC fallback.""" + provider_mock.get_track.side_effect = MediaNotFoundError("Track not found") + provider_mock.mass.music.tracks.get_library_item_by_prov_id.return_value = None + + with pytest.raises(MediaNotFoundError, match="Track 123 not found"): + await streaming_manager.get_stream_details("123") + + +async def test_get_track_by_isrc_from_cache( + streaming_manager: TidalStreamingManager, provider_mock: Mock, mock_track: Mock +) -> None: + """Test _get_track_by_isrc returns cached result.""" + provider_mock.mass.cache.get.return_value = "cached_track_456" + provider_mock.get_track.return_value = mock_track + + result = await streaming_manager._get_track_by_isrc("123") + + assert result == mock_track + provider_mock.mass.cache.get.assert_called_with( + "123", + provider="tidal_instance", + category=2, # CACHE_CATEGORY_ISRC_MAP + ) + provider_mock.get_track.assert_called_with("cached_track_456") + + +async def test_get_track_by_isrc_cache_miss_lookup_success( + streaming_manager: TidalStreamingManager, provider_mock: Mock, mock_track: Mock +) -> None: + """Test _get_track_by_isrc performs ISRC lookup on cache miss.""" + # Cache miss + provider_mock.mass.cache.get.return_value = None + + # Library item with ISRC + lib_track = Mock() + lib_track.external_ids = [(ExternalID.ISRC, "US1234567890")] + provider_mock.mass.music.tracks.get_library_item_by_prov_id.return_value = lib_track + + # API lookup + provider_mock.api.get.return_value = {"data": [{"id": 456}]} + + # Final track fetch + provider_mock.get_track.return_value = mock_track + + result = await streaming_manager._get_track_by_isrc("123") + + assert result == mock_track + + # Verify API call + provider_mock.api.get.assert_called_with( + "/tracks", + params={"filter[isrc]": "US1234567890"}, + base_url=provider_mock.api.OPEN_API_URL, + ) + + # Verify cache set + provider_mock.mass.cache.set.assert_called_with( + key="123", + data="456", + provider="tidal_instance", + category=2, # CACHE_CATEGORY_ISRC_MAP + persistent=True, + expiration=86400 * 90, + ) + + # Verify final track fetch + provider_mock.get_track.assert_called_with("456") + + +async def test_get_track_by_isrc_no_library_item( + streaming_manager: TidalStreamingManager, provider_mock: Mock +) -> None: + """Test _get_track_by_isrc returns None when no library item.""" + provider_mock.mass.cache.get.return_value = None + provider_mock.mass.music.tracks.get_library_item_by_prov_id.return_value = None + + result = await streaming_manager._get_track_by_isrc("123") + + assert result is None + + +async def test_get_track_by_isrc_no_isrc_external_id( + streaming_manager: TidalStreamingManager, provider_mock: Mock +) -> None: + """Test _get_track_by_isrc returns None when library item has no ISRC.""" + provider_mock.mass.cache.get.return_value = None + + lib_track = Mock() + lib_track.external_ids = [(ExternalID.BARCODE, "some-id")] + provider_mock.mass.music.tracks.get_library_item_by_prov_id.return_value = lib_track + + result = await streaming_manager._get_track_by_isrc("123") + + assert result is None + + +async def test_get_track_by_isrc_api_returns_empty( + streaming_manager: TidalStreamingManager, provider_mock: Mock +) -> None: + """Test _get_track_by_isrc returns None when API returns no data.""" + provider_mock.mass.cache.get.return_value = None + + lib_track = Mock() + lib_track.external_ids = [(ExternalID.ISRC, "US1234567890")] + provider_mock.mass.music.tracks.get_library_item_by_prov_id.return_value = lib_track + + provider_mock.api.get.return_value = {"data": []} + + result = await streaming_manager._get_track_by_isrc("123") + + assert result is None + + +async def test_get_track_by_isrc_cached_track_not_found( + streaming_manager: TidalStreamingManager, provider_mock: Mock +) -> None: + """Test _get_track_by_isrc deletes cache when cached track not found.""" + provider_mock.mass.cache.get.return_value = "cached_track_999" + provider_mock.get_track.side_effect = MediaNotFoundError("Track not found") + + # Should continue with ISRC lookup + lib_track = Mock() + lib_track.external_ids = [(ExternalID.ISRC, "US1234567890")] + provider_mock.mass.music.tracks.get_library_item_by_prov_id.return_value = lib_track + + provider_mock.api.get.return_value = {"data": []} + + result = await streaming_manager._get_track_by_isrc("123") + + # Should delete invalid cache entry + provider_mock.mass.cache.delete.assert_called_with( + "123", + provider="tidal_instance", + category=2, # CACHE_CATEGORY_ISRC_MAP + ) + + assert result is None + + +async def test_get_stream_details_with_isrc_fallback( + streaming_manager: TidalStreamingManager, provider_mock: Mock, mock_track: Mock +) -> None: + """Test get_stream_details uses ISRC fallback when direct lookup fails.""" + # Direct lookup fails + provider_mock.get_track.side_effect = [ + MediaNotFoundError("Track not found"), # First call + mock_track, # Second call from ISRC lookup + mock_track, # Third call for stream details + ] + + # ISRC lookup succeeds + lib_track = Mock() + lib_track.external_ids = [(ExternalID.ISRC, "US1234567890")] + provider_mock.mass.music.tracks.get_library_item_by_prov_id.return_value = lib_track + + provider_mock.api.get.return_value = ( + {"data": [{"id": 456}]}, # ISRC lookup response + None, + ) + + # Stream details + provider_mock.api.get.side_effect = [ + ({"data": [{"id": 456}]}, None), # ISRC lookup + ( + { # Stream details + "urls": ["https://example.com/stream.flac"], + "audioQuality": "LOSSLESS", + "sampleRate": 44100, + "bitDepth": 16, + }, + None, + ), + ] + + stream_details = await streaming_manager.get_stream_details("123") + + assert stream_details.item_id == "123" + assert stream_details.path == "https://example.com/stream.flac" -- 2.34.1