from __future__ import annotations
import asyncio
-import base64
-import pickle
-from collections.abc import Callable
+import functools
+import json
+from collections.abc import Awaitable, Callable, Coroutine
from contextlib import suppress
-from datetime import datetime, timedelta
from enum import StrEnum
-from typing import TYPE_CHECKING, ParamSpec, TypeVar, cast
+from typing import TYPE_CHECKING, Any, TypeVar, cast
-from music_assistant_models.config_entries import ConfigEntry, ConfigValueOption, ConfigValueType
+from aiohttp import ClientResponse
+from music_assistant_models.config_entries import (
+ ConfigEntry,
+ ConfigValueOption,
+ ConfigValueType,
+)
from music_assistant_models.enums import (
AlbumType,
ConfigEntryType,
ProviderFeature,
StreamType,
)
-from music_assistant_models.errors import LoginFailed, MediaNotFoundError
+from music_assistant_models.errors import (
+ LoginFailed,
+ MediaNotFoundError,
+ ResourceTemporarilyUnavailable,
+)
from music_assistant_models.media_items import (
Album,
Artist,
UniqueList,
)
from music_assistant_models.streamdetails import StreamDetails
-from tidalapi import Album as TidalAlbum
-from tidalapi import Artist as TidalArtist
-from tidalapi import Config as TidalConfig
-from tidalapi import Playlist as TidalPlaylist
-from tidalapi import Session as TidalSession
-from tidalapi import Track as TidalTrack
-from tidalapi import exceptions as tidal_exceptions
-
-from music_assistant.constants import CACHE_CATEGORY_DEFAULT, CACHE_CATEGORY_MEDIA_INFO
-from music_assistant.helpers.auth import AuthenticationHelper
-from music_assistant.helpers.tags import AudioTags, async_parse_tags
-from music_assistant.helpers.throttle_retry import ThrottlerManager, throttle_with_retries
-from music_assistant.models.music_provider import MusicProvider
-from .helpers import (
- DEFAULT_LIMIT,
- add_playlist_tracks,
- create_playlist,
- get_album,
- get_album_tracks,
- get_artist,
- get_artist_albums,
- get_artist_toptracks,
- get_library_albums,
- get_library_artists,
- get_library_playlists,
- get_library_tracks,
- get_playlist,
- get_playlist_tracks,
- get_similar_tracks,
- get_stream,
- get_track,
- get_track_lyrics,
- get_tracks_by_isrc,
- library_items_add_remove,
- remove_playlist_tracks,
- search,
+from music_assistant.constants import CACHE_CATEGORY_DEFAULT
+from music_assistant.helpers.throttle_retry import (
+ ThrottlerManager,
+ throttle_with_retries,
)
+from music_assistant.models.music_provider import MusicProvider
+
+from .auth_manager import ManualAuthenticationHelper, TidalAuthManager
if TYPE_CHECKING:
- from collections.abc import AsyncGenerator, Awaitable
+ from collections.abc import AsyncGenerator
+ from aiohttp import ClientResponse
from music_assistant_models.config_entries import ProviderConfig
from music_assistant_models.provider import ProviderManifest
- from tidalapi.media import Lyrics as TidalLyrics
- from tidalapi.media import Stream as TidalStream
from music_assistant.mass import MusicAssistant
from music_assistant.models import ProviderInstanceType
CONF_REFRESH_TOKEN = "refresh_token"
CONF_USER_ID = "user_id"
CONF_EXPIRY_TIME = "expiry_time"
+CONF_COUNTRY_CODE = "country_code"
+CONF_SESSION_ID = "session_id"
CONF_QUALITY = "quality"
# Labels
BROWSE_URL = "https://tidal.com/browse"
RESOURCES_URL = "https://resources.tidal.com/images"
-_R = TypeVar("_R")
-_P = ParamSpec("_P")
+DEFAULT_LIMIT = 50
+
+T = TypeVar("T")
class TidalQualityEnum(StrEnum):
return TidalProvider(mass, manifest, config)
-async def tidal_auth_url(auth_helper: AuthenticationHelper, quality: str) -> str:
- """Generate the Tidal authentication URL."""
-
- def inner() -> str:
- config = TidalConfig(quality=quality, item_limit=10000, alac=False)
- session = TidalSession(config=config)
- url = session.pkce_login_url()
- # Schedule auth_helper.send_url to run in event loop
- auth_helper.mass.loop.call_soon_threadsafe(auth_helper.send_url, url)
- session_bytes = pickle.dumps(session)
- base64_bytes = base64.b64encode(session_bytes)
- return base64_bytes.decode("utf-8")
-
- return await asyncio.to_thread(inner)
-
-
-async def tidal_pkce_login(base64_session: str, url: str) -> TidalSession:
- """Async wrapper around the tidalapi Session function."""
-
- def inner() -> TidalSession:
- base64_bytes = base64_session.encode("utf-8")
- message_bytes = base64.b64decode(base64_bytes)
- session = pickle.loads(message_bytes) # noqa: S301
- token = session.pkce_get_auth_token(url_redirect=url)
- session.process_auth_token(token)
- return session
-
- return await asyncio.to_thread(inner)
-
-
async def get_config_entries(
mass: MusicAssistant,
instance_id: str | None = None, # noqa: ARG001
assert values is not None
if action == CONF_ACTION_START_PKCE_LOGIN:
- async with AuthenticationHelper(mass, cast(str, values["session_id"])) as auth_helper:
+ async with ManualAuthenticationHelper(mass, cast(str, values["session_id"])) as auth_helper:
quality = str(values.get(CONF_QUALITY))
- base64_session = await tidal_auth_url(auth_helper, quality)
+ base64_session = await TidalAuthManager.generate_auth_url(auth_helper, quality)
values[CONF_TEMP_SESSION] = base64_session
- # Tidal is (ab)using the AuthenticationHelper just to send the user to an URL
+ # Tidal is using the ManualAuthenticationHelper just to send the user to an URL
# there is no actual oauth callback happening, instead the user is redirected
# to a non-existent page and needs to copy the URL from the browser and paste it
# we simply wait here to allow the user to start the auth
quality = str(values.get(CONF_QUALITY))
pkce_url = str(values.get(CONF_OOPS_URL))
base64_session = str(values.get(CONF_TEMP_SESSION))
- tidal_session = await tidal_pkce_login(base64_session, pkce_url)
- if not tidal_session.check_login():
- msg = "Authentication to Tidal failed"
- raise LoginFailed(msg)
- # set the retrieved token on the values object to pass along
- values[CONF_AUTH_TOKEN] = tidal_session.access_token
- values[CONF_REFRESH_TOKEN] = tidal_session.refresh_token
- values[CONF_EXPIRY_TIME] = tidal_session.expiry_time.isoformat()
- values[CONF_USER_ID] = str(tidal_session.user.id)
+ auth_data = await TidalAuthManager.process_pkce_login(
+ mass.http_session, base64_session, pkce_url
+ )
+ values[CONF_AUTH_TOKEN] = auth_data["access_token"]
+ values[CONF_REFRESH_TOKEN] = auth_data["refresh_token"]
+ values[CONF_EXPIRY_TIME] = auth_data["expires_at"]
+ values[CONF_USER_ID] = auth_data["userId"]
values[CONF_TEMP_SESSION] = ""
if action == CONF_ACTION_CLEAR_AUTH:
values[CONF_AUTH_TOKEN] = None
+ values[CONF_REFRESH_TOKEN] = None
+ values[CONF_EXPIRY_TIME] = None
+ values[CONF_USER_ID] = None
if values.get(CONF_AUTH_TOKEN):
auth_entries: tuple[ConfigEntry, ...] = (
),
)
- # return the collected config entries
+ # return the auth_data config entry
return (
*auth_entries,
ConfigEntry(
class TidalProvider(MusicProvider):
"""Implementation of a Tidal MusicProvider."""
- _tidal_session: TidalSession | None = None
- _tidal_user_id: str
- # rate limiter needs to be specified on provider-level,
- # so make it an class attribute
+ BASE_URL: str = "https://api.tidal.com/v1"
+ OPEN_API_URL: str = "https://openapi.tidal.com/v2/"
+
throttler = ThrottlerManager(rate_limit=1, period=2)
+ #
+ # INITIALIZATION & SETUP
+ #
+
+ def __init__(self, mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig):
+ """Initialize Tidal provider."""
+ super().__init__(mass, manifest, config)
+ self.auth = TidalAuthManager(
+ http_session=mass.http_session,
+ config_updater=self._update_auth_config,
+ logger=self.logger,
+ )
+
+ def _update_auth_config(self, auth_info: dict[str, Any]) -> None:
+ """Update auth config with new auth info."""
+ self.update_config_value(CONF_AUTH_TOKEN, auth_info["access_token"], encrypted=True)
+ self.update_config_value(CONF_REFRESH_TOKEN, auth_info["refresh_token"], encrypted=True)
+ self.update_config_value(CONF_EXPIRY_TIME, auth_info["expires_at"])
+ self.update_config_value(CONF_USER_ID, auth_info["userId"])
+ # Also update country/session for backward compatibility
+ # if "countryCode" in auth_info:
+ # self.update_config_value(CONF_COUNTRY_CODE, auth_info["countryCode"])
+ # if "sessionId" in auth_info:
+ # self.update_config_value(CONF_SESSION_ID, auth_info["sessionId"])
+
async def handle_async_init(self) -> None:
"""Handle async initialization of the provider."""
- self._tidal_user_id = str(self.config.get_value(CONF_USER_ID))
- await self._get_tidal_session()
+ # Load auth info from individual config values
+ access_token = self.config.get_value(CONF_AUTH_TOKEN)
+ refresh_token = self.config.get_value(CONF_REFRESH_TOKEN)
+ expires_at = self.config.get_value(CONF_EXPIRY_TIME)
+ user_id = self.config.get_value(CONF_USER_ID)
+
+ if not access_token or not refresh_token:
+ raise LoginFailed("Missing authentication data")
+
+ # Handle conversion from ISO format to timestamp if needed
+ if isinstance(expires_at, str) and "T" in expires_at:
+ # This looks like an ISO format date
+ import datetime
+
+ try:
+ dt = datetime.datetime.fromisoformat(expires_at)
+ # Convert to timestamp
+ expires_at = dt.timestamp()
+ # Update the config with the numeric value
+ self.update_config_value(CONF_EXPIRY_TIME, expires_at)
+ except ValueError:
+ self.logger.warning(
+ "Could not parse expiry time %s, setting to expired", expires_at
+ )
+ expires_at = 0
+
+ # Create auth data dictionary from individual config values
+ auth_data = {
+ "access_token": access_token,
+ "refresh_token": refresh_token,
+ "expires_at": expires_at,
+ "userId": user_id,
+ }
+
+ # Initialize auth manager
+ if not await self.auth.initialize(json.dumps(auth_data)):
+ raise LoginFailed("Failed to authenticate with Tidal")
+
+ # Get user information from sessions API
+ api_result = await self._get_data("sessions")
+ user_info = self._extract_data(api_result)
+ await self.auth.update_user_info(user_info)
@property
def supported_features(self) -> set[ProviderFeature]:
ProviderFeature.PLAYLIST_TRACKS_EDIT,
}
+ #
+ # API REQUEST HELPERS & DECORATORS
+ #
+
+ @staticmethod
+ def prepare_api_request(method: Callable[..., Awaitable[T]]) -> Callable[..., Awaitable[T]]:
+ """Prepare API requests with authentication and common parameters."""
+
+ @functools.wraps(method)
+ async def wrapper(self: TidalProvider, endpoint: str, **kwargs: Any) -> T:
+ # Ensure we have a valid token through auth manager
+ if not await self.auth.ensure_valid_token():
+ raise LoginFailed("Failed to authenticate with Tidal")
+
+ # Add required parameters to every request
+ params = kwargs.pop("params", {}) or {}
+
+ # Add session ID and country code if available
+ if self.auth.session_id:
+ params["sessionId"] = self.auth.session_id
+
+ if self.auth.country_code:
+ params["countryCode"] = self.auth.country_code
+
+ kwargs["params"] = params
+
+ # Prepare headers
+ headers = kwargs.pop("headers", {}) or {}
+ headers["Authorization"] = f"Bearer {self.auth.access_token}"
+
+ # Add locale headers
+ locale = self.mass.metadata.locale.replace("_", "-")
+ language = locale.split("-")[0]
+ headers["Accept-Language"] = f"{locale}, {language};q=0.9, *;q=0.5"
+ kwargs["headers"] = headers
+
+ return await method(self, endpoint, **kwargs)
+
+ return wrapper
+
+ @staticmethod
+ def handle_item_errors(
+ item_type: str,
+ ) -> Callable[[Callable[..., Coroutine[Any, Any, T]]], Callable[..., Coroutine[Any, Any, T]]]:
+ """Handle standard error patterns in item getters."""
+
+ def decorator(
+ method: Callable[..., Coroutine[Any, Any, T]],
+ ) -> Callable[..., Coroutine[Any, Any, T]]:
+ @functools.wraps(method)
+ async def wrapper(self: TidalProvider, item_id: str, *args: Any, **kwargs: Any) -> T:
+ try:
+ return await method(self, item_id, *args, **kwargs)
+ except ResourceTemporarilyUnavailable:
+ raise
+ except Exception as err:
+ raise MediaNotFoundError(f"{item_type} {item_id} not found") from err
+
+ return wrapper
+
+ return decorator
+
+ #
+ # CORE API METHODS
+ #
+
+ @throttle_with_retries
+ @prepare_api_request
+ async def _get_data(
+ self, endpoint: str, **kwargs: Any
+ ) -> dict[str, Any] | tuple[dict[str, Any], str]:
+ """Get data from Tidal API using mass.http_session."""
+ # Check if we want to return the ETag
+ return_etag = kwargs.pop("return_etag", False)
+
+ base_url = kwargs.pop("base_url", self.BASE_URL)
+ url = f"{base_url}/{endpoint}"
+
+ self.logger.debug("Making request to Tidal API: %s", endpoint)
+
+ async with self.mass.http_session.get(url, **kwargs) as response:
+ return await self._handle_response(response, return_etag)
+
+ @prepare_api_request
+ async def _post_data(
+ self,
+ endpoint: str,
+ data: dict[str, Any] | None = None,
+ as_form: bool = False,
+ **kwargs: Any,
+ ) -> dict[str, Any]:
+ """Post data to Tidal API using mass.http_session."""
+ url = f"{self.BASE_URL}/{endpoint}"
+
+ if as_form:
+ # Set content type for form data
+ headers = kwargs.get("headers", {})
+ headers["Content-Type"] = "application/x-www-form-urlencoded"
+ kwargs["headers"] = headers
+ # Use data parameter for form-encoded data
+ async with self.mass.http_session.post(url, data=data, **kwargs) as response:
+ return cast(
+ dict[str, Any],
+ await self._handle_response(response, return_etag=False),
+ )
+ else:
+ # Use json parameter for JSON data (default)
+ async with self.mass.http_session.post(url, json=data, **kwargs) as response:
+ return cast(
+ dict[str, Any],
+ await self._handle_response(response, return_etag=False),
+ )
+
+ @prepare_api_request
+ async def _delete_data(
+ self, endpoint: str, data: dict[str, Any] | None = None, **kwargs: Any
+ ) -> dict[str, Any]:
+ """Delete data from Tidal API using mass.http_session."""
+ url = f"{self.BASE_URL}/{endpoint}"
+ self.logger.debug("Making DELETE request to Tidal API: %s", endpoint)
+
+ # For DELETE requests with a body, we need to use json parameter
+ async with self.mass.http_session.delete(url, json=data, **kwargs) as response:
+ return cast(dict[str, Any], await self._handle_response(response, return_etag=False))
+
+ async def _handle_response(
+ self, response: ClientResponse, return_etag: bool = False
+ ) -> dict[str, Any] | tuple[dict[str, Any], str]:
+ """Handle API response and common error conditions."""
+ # Handle error responses
+ if response.status == 401:
+ # Authentication error is handled by the calling method (which will retry)
+ raise LoginFailed("Authentication failed")
+
+ if response.status == 404:
+ raise MediaNotFoundError(f"Item not found: {response.url}")
+
+ if response.status == 429:
+ retry_after = int(response.headers.get("Retry-After", 30))
+ raise ResourceTemporarilyUnavailable(
+ "Tidal Rate limit reached", backoff_time=retry_after
+ )
+
+ if response.status == 412:
+ text = await response.text()
+ self.logger.error("Precondition failed: %s", text)
+ raise ResourceTemporarilyUnavailable(
+ "Resource changed while updating, please try again"
+ )
+
+ if response.status >= 400:
+ text = await response.text()
+ self.logger.error("API error: %s - %s", response.status, text)
+ raise ResourceTemporarilyUnavailable("API error")
+
+ # Parse successful response
+ try:
+ # Check if there's content to parse
+ if (
+ response.content_length == 0
+ or not response.content_type
+ or response.content_type == ""
+ ):
+ # Empty response, return success indicator
+ data = {"success": True}
+ else:
+ data = await response.json()
+
+ # Return with etag if requested
+ if return_etag:
+ etag = response.headers.get("ETag", "")
+ return data, etag
+ return data
+ except json.JSONDecodeError as err:
+ self.logger.error("Failed to parse JSON response: %s", err)
+ raise ResourceTemporarilyUnavailable("Failed to parse response") from err
+ except (TypeError, ValueError, KeyError) as err:
+ self.logger.error("Invalid response format: %s", err)
+ raise ResourceTemporarilyUnavailable("Invalid response format") from err
+
+ async def _paginate_api(
+ self,
+ endpoint: str,
+ item_key: str = "items",
+ nested_key: str | None = None,
+ limit: int = DEFAULT_LIMIT,
+ **kwargs: Any,
+ ) -> AsyncGenerator[Any, None]:
+ """Paginate through all items from a Tidal API endpoint."""
+ offset = 0
+ while True:
+ # Get a batch of items
+ params = {"limit": limit, "offset": offset}
+ if "params" in kwargs:
+ params.update(kwargs.pop("params"))
+
+ api_result = await self._get_data(endpoint, params=params, **kwargs)
+ response = self._extract_data(api_result)
+
+ # Extract items from response
+ items = response.get(item_key, [])
+ if not items:
+ break
+
+ # Process each item in the batch
+ for item in items:
+ if nested_key and nested_key in item and item[nested_key]:
+ yield item[nested_key]
+ else:
+ yield item
+
+ # Update offset for next batch
+ offset += len(items)
+
+ # Stop if we've received fewer items than the limit
+ if len(items) < limit:
+ break
+
+ def _extract_data(
+ self, api_result: dict[str, Any] | tuple[dict[str, Any], str]
+ ) -> dict[str, Any]:
+ """Extract data from API result that might be tuple of (data, etag)."""
+ return api_result[0] if isinstance(api_result, tuple) else api_result
+
+ def _extract_data_and_etag(
+ self, api_result: dict[str, Any] | tuple[dict[str, Any], str]
+ ) -> tuple[dict[str, Any], str | None]:
+ """Extract both data and etag from API result."""
+ if isinstance(api_result, tuple):
+ return api_result
+ return api_result, None
+
+ #
+ # SEARCH & DISCOVERY
+ #
+
async def search(
self,
search_query: str,
if not media_types:
return parsed_results
- tidal_session = await self._get_tidal_session()
- search_query = search_query.replace("'", "")
- results = await search(tidal_session, search_query, media_types, limit)
+ api_result = await self._get_data(
+ "search",
+ params={
+ "query": search_query.replace("'", ""),
+ "limit": limit,
+ "types": ",".join(media_types),
+ },
+ )
+
+ # Handle potential tuple return (data, etag)
+ results = self._extract_data(api_result)
if results["artists"]:
- parsed_results.artists = [self._parse_artist(artist) for artist in results["artists"]]
+ parsed_results.artists = [
+ self._parse_artist(artist) for artist in results["artists"]["items"]
+ ]
if results["albums"]:
- parsed_results.albums = [self._parse_album(album) for album in results["albums"]]
+ parsed_results.albums = [
+ self._parse_album(album) for album in results["albums"]["items"]
+ ]
if results["playlists"]:
parsed_results.playlists = [
- self._parse_playlist(playlist) for playlist in results["playlists"]
+ self._parse_playlist(playlist) for playlist in results["playlists"]["items"]
]
if results["tracks"]:
- parsed_results.tracks = [self._parse_track(track) for track in results["tracks"]]
+ parsed_results.tracks = [
+ self._parse_track(track) for track in results["tracks"]["items"]
+ ]
return parsed_results
- async def get_library_artists(self) -> AsyncGenerator[Artist, None]:
- """Retrieve all library artists from Tidal."""
- tidal_session = await self._get_tidal_session()
- artist: TidalArtist # satisfy the type checker
- async for artist in self._iter_items(
- get_library_artists, tidal_session, self._tidal_user_id, limit=DEFAULT_LIMIT
- ):
- yield self._parse_artist(artist)
+ @handle_item_errors("Track")
+ async def get_similar_tracks(self, prov_track_id: str, limit: int = 25) -> list[Track]:
+ """Get similar tracks for given track id."""
+ api_result = await self._get_data(f"tracks/{prov_track_id}/radio", params={"limit": limit})
+ similar_tracks = self._extract_data(api_result)
+ return [self._parse_track(track_obj) for track_obj in similar_tracks.get("items", [])]
- async def get_library_albums(self) -> AsyncGenerator[Album, None]:
- """Retrieve all library albums from Tidal."""
- tidal_session = await self._get_tidal_session()
- album: TidalAlbum # satisfy the type checker
- async for album in self._iter_items(
- get_library_albums, tidal_session, self._tidal_user_id, limit=DEFAULT_LIMIT
- ):
- yield self._parse_album(album)
+ #
+ # ITEM RETRIEVAL METHODS
+ #
- async def get_library_tracks(self) -> AsyncGenerator[Track, None]:
- """Retrieve library tracks from Tidal."""
- tidal_session = await self._get_tidal_session()
- track: TidalTrack # satisfy the type checker
- async for track in self._iter_items(
- get_library_tracks, tidal_session, self._tidal_user_id, limit=DEFAULT_LIMIT
- ):
- yield self._parse_track(track)
+ @handle_item_errors("Artist")
+ async def get_artist(self, prov_artist_id: str) -> Artist:
+ """Get artist details for given artist id."""
+ api_result = await self._get_data(f"artists/{prov_artist_id}")
+ artist_obj = self._extract_data(api_result)
+ return self._parse_artist(artist_obj)
- async def get_library_playlists(self) -> AsyncGenerator[Playlist, None]:
- """Retrieve all library playlists from the provider."""
- tidal_session = await self._get_tidal_session()
- playlist: TidalPlaylist # satisfy the type checker
- async for playlist in self._iter_items(
- get_library_playlists, tidal_session, self._tidal_user_id
- ):
- yield self._parse_playlist(playlist)
+ @handle_item_errors("Album")
+ async def get_album(self, prov_album_id: str) -> Album:
+ """Get album details for given album id."""
+ api_result = await self._get_data(f"albums/{prov_album_id}")
+ album_obj = self._extract_data(api_result)
+ return self._parse_album(album_obj)
- @throttle_with_retries
+ @handle_item_errors("Track")
+ async def get_track(self, prov_track_id: str) -> Track:
+ """Get track details for given track id."""
+ api_result = await self._get_data(f"tracks/{prov_track_id}")
+ track_obj = self._extract_data(api_result)
+ track = self._parse_track(track_obj)
+ # Get additional details like lyrics if needed
+ with suppress(MediaNotFoundError):
+ api_result = await self._get_data(f"tracks/{prov_track_id}/lyrics")
+ lyrics_data = self._extract_data(api_result)
+
+ if lyrics_data and "text" in lyrics_data:
+ track.metadata.lyrics = lyrics_data["text"]
+
+ return track
+
+ @handle_item_errors("Playlist")
+ async def get_playlist(self, prov_playlist_id: str) -> Playlist:
+ """Get playlist details for given playlist id."""
+ api_result = await self._get_data(f"playlists/{prov_playlist_id}")
+ playlist_obj = self._extract_data(api_result)
+ return self._parse_playlist(playlist_obj)
+
+ @handle_item_errors("Track")
async def get_album_tracks(self, prov_album_id: str) -> list[Track]:
"""Get album tracks for given album id."""
- tidal_session = await self._get_tidal_session()
- tracks_obj = await get_album_tracks(tidal_session, prov_album_id)
- return [self._parse_track(track_obj=track_obj) for track_obj in tracks_obj]
+ api_result = await self._get_data(f"albums/{prov_album_id}/tracks")
+ album_tracks = self._extract_data(api_result)
+ return [self._parse_track(track_obj) for track_obj in album_tracks.get("items", [])]
- @throttle_with_retries
+ @handle_item_errors("Album")
async def get_artist_albums(self, prov_artist_id: str) -> list[Album]:
"""Get a list of all albums for the given artist."""
- tidal_session = await self._get_tidal_session()
- artist_albums_obj = await get_artist_albums(tidal_session, prov_artist_id)
- return [self._parse_album(album) for album in artist_albums_obj]
+ api_result = await self._get_data(f"artists/{prov_artist_id}/albums")
+ artist_albums = self._extract_data(api_result)
+ return [self._parse_album(album_obj) for album_obj in artist_albums.get("items", [])]
- @throttle_with_retries
+ @handle_item_errors("Track")
async def get_artist_toptracks(self, prov_artist_id: str) -> list[Track]:
"""Get a list of 10 most popular tracks for the given artist."""
- tidal_session = await self._get_tidal_session()
- try:
- artist_toptracks_obj = await get_artist_toptracks(tidal_session, prov_artist_id)
- return [self._parse_track(track) for track in artist_toptracks_obj]
- except tidal_exceptions.ObjectNotFound as err:
- self.logger.warning(f"Failed to get toptracks for artist {prov_artist_id}: {err}")
- return []
+ api_result = await self._get_data(
+ f"artists/{prov_artist_id}/toptracks", params={"limit": 10, "offset": 0}
+ )
+ artist_top_tracks = self._extract_data(api_result)
+ return [self._parse_track(track_obj) for track_obj in artist_top_tracks.get("items", [])]
+ @handle_item_errors("Playlist")
async def get_playlist_tracks(self, prov_playlist_id: str, page: int = 0) -> list[Track]:
"""Get playlist tracks."""
- tidal_session = await self._get_tidal_session()
result: list[Track] = []
page_size = 200
offset = page * page_size
- track_obj: TidalTrack # satisfy the type checker
- tidal_tracks = await get_playlist_tracks(
- tidal_session, prov_playlist_id, limit=page_size, offset=offset
+ api_result = await self._get_data(
+ f"playlists/{prov_playlist_id}/tracks",
+ params={"limit": page_size, "offset": offset},
)
- for index, track_obj in enumerate(tidal_tracks, 1):
+ tidal_tracks = self._extract_data(api_result)
+ for index, track_obj in enumerate(tidal_tracks.get("items", []), 1):
track = self._parse_track(track_obj=track_obj)
track.position = offset + index
result.append(track)
return result
- @throttle_with_retries
- async def get_similar_tracks(self, prov_track_id: str, limit: int = 25) -> list[Track]:
- """Get similar tracks for given track id."""
- tidal_session = await self._get_tidal_session()
- similar_tracks_obj = await get_similar_tracks(tidal_session, prov_track_id, limit)
- return [self._parse_track(track) for track in similar_tracks_obj]
-
- async def library_add(self, item: MediaItemType) -> bool:
- """Add item to library."""
- tidal_session = await self._get_tidal_session()
- return await library_items_add_remove(
- tidal_session,
- str(self._tidal_user_id),
- item.item_id,
- item.media_type,
- add=True,
- )
-
- async def library_remove(self, prov_item_id: str, media_type: MediaType) -> bool:
- """Remove item from library."""
- tidal_session = await self._get_tidal_session()
- return await library_items_add_remove(
- tidal_session,
- str(self._tidal_user_id),
- prov_item_id,
- media_type,
- add=False,
- )
-
- async def add_playlist_tracks(self, prov_playlist_id: str, prov_track_ids: list[str]) -> None:
- """Add track(s) to playlist."""
- tidal_session = await self._get_tidal_session()
- await add_playlist_tracks(tidal_session, prov_playlist_id, prov_track_ids)
-
- async def remove_playlist_tracks(
- self, prov_playlist_id: str, positions_to_remove: tuple[int, ...]
- ) -> None:
- """Remove track(s) from playlist."""
- tidal_session = await self._get_tidal_session()
- prov_track_ids: list[str] = []
- # Get tracks by position
- for pos in positions_to_remove:
- tracks = await get_playlist_tracks(
- tidal_session, prov_playlist_id, limit=1, offset=pos - 1
- )
- if tracks and len(tracks) > 0:
- prov_track_ids.append(str(tracks[0].id))
-
- if prov_track_ids:
- await remove_playlist_tracks(tidal_session, prov_playlist_id, prov_track_ids)
-
- async def create_playlist(self, name: str) -> Playlist:
- """Create a new playlist on provider with given name."""
- tidal_session = await self._get_tidal_session()
- playlist_obj = await create_playlist(
- session=tidal_session,
- user_id=str(self._tidal_user_id),
- title=name,
- description="",
- )
- return self._parse_playlist(playlist_obj)
-
- async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails:
+ async def get_stream_details(
+ self, item_id: str, media_type: MediaType = MediaType.TRACK
+ ) -> StreamDetails:
"""Return the content details for the given track when it will be streamed."""
- tidal_session = await self._get_tidal_session()
- # make sure a valid track is requested.
# Try direct track lookup first with exception handling
try:
- track = await get_track(tidal_session, item_id)
+ track = await self.get_track(item_id)
except MediaNotFoundError:
- # Fallback to ISRC lookup
self.logger.info(
- """Track %s not found, attempting fallback by ISRC.
- It's likely that this track has a new ID upstream in Tidal's WebApp.""",
+ "Track %s not found, attempting fallback by ISRC lookup",
item_id,
)
- track = await self._get_track_by_isrc(item_id, tidal_session)
- if not track:
+ track_result = await self._get_track_by_isrc(item_id)
+ if not track_result:
raise MediaNotFoundError(f"Track {item_id} not found")
+ track = track_result
+
+ quality = self.config.get_value(CONF_QUALITY)
+
+ # Request stream manifest
+ async with self.throttler.bypass():
+ api_result = await self._get_data(
+ f"tracks/{item_id}/playbackinfopostpaywall",
+ params={
+ "playbackmode": "STREAM",
+ "audioquality": quality,
+ "assetpresentation": "FULL",
+ },
+ )
+ stream_data = self._extract_data(api_result)
- stream: TidalStream = await get_stream(track)
- manifest = stream.get_stream_manifest()
+ # Extract streaming information
+ manifest_type = stream_data.get("manifestMimeType", "")
+ is_mpd = "dash+xml" in manifest_type
- url = (
- # for mpeg-dash streams we just pass the complete base64 manifest
- f"data:application/dash+xml;base64,{manifest.manifest}"
- if stream.is_mpd
- # as far as I can oversee a BTS stream is just a single URL
- else manifest.urls[0]
- )
+ if is_mpd:
+ url = f"data:application/dash+xml;base64,{stream_data['manifest']}"
+ else:
+ # For non-MPD streams, use the direct URL
+ url = stream_data.get("urls", [None])[0]
+ if not url:
+ raise MediaNotFoundError(f"No stream URL for track {item_id}")
+
+ # Determine audio format info
+ codec = stream_data.get("codec", "")
+ content_type = ContentType.try_parse(codec)
+ bit_depth = 24 if "HI_RES_LOSSLESS" in stream_data.get("audioMode", "") else 16
+ sample_rate = stream_data.get("sampleRate", 44100)
return StreamDetails(
- item_id=track.id,
+ item_id=track.item_id,
provider=self.lookup_key,
audio_format=AudioFormat(
- content_type=ContentType.try_parse(manifest.codecs),
- sample_rate=manifest.sample_rate,
- bit_depth=stream.bit_depth,
+ content_type=content_type,
+ sample_rate=sample_rate,
+ bit_depth=bit_depth,
channels=2,
),
stream_type=StreamType.HTTP,
allow_seek=True,
)
- @throttle_with_retries
- async def get_artist(self, prov_artist_id: str) -> Artist:
- """Get artist details for given artist id."""
- tidal_session = await self._get_tidal_session()
- try:
- artist_obj = await get_artist(tidal_session, prov_artist_id)
- return self._parse_artist(artist_obj)
- except tidal_exceptions.ObjectNotFound as err:
- raise MediaNotFoundError from err
-
- @throttle_with_retries
- async def get_album(self, prov_album_id: str) -> Album:
- """Get album details for given album id."""
- tidal_session = await self._get_tidal_session()
- try:
- album_obj = await get_album(tidal_session, prov_album_id)
- return self._parse_album(album_obj)
- except tidal_exceptions.ObjectNotFound as err:
- raise MediaNotFoundError from err
-
- @throttle_with_retries
- async def get_track(self, prov_track_id: str) -> Track:
- """Get track details for given track id."""
- tidal_session = await self._get_tidal_session()
- track_obj = await get_track(tidal_session, prov_track_id)
- try:
- track = self._parse_track(track_obj)
- # get some extra details for the full track info
- with suppress(tidal_exceptions.MetadataNotAvailable, AttributeError):
- lyrics: TidalLyrics = await get_track_lyrics(tidal_session, prov_track_id)
- if lyrics and hasattr(lyrics, "text"):
- track.metadata.lyrics = lyrics.text
- return track
- except tidal_exceptions.ObjectNotFound as err:
- raise MediaNotFoundError from err
-
- @throttle_with_retries
- async def get_playlist(self, prov_playlist_id: str) -> Playlist:
- """Get playlist details for given playlist id."""
- tidal_session = await self._get_tidal_session()
- playlist_obj = await get_playlist(tidal_session, prov_playlist_id)
- return self._parse_playlist(playlist_obj)
-
- def get_item_mapping(self, media_type: MediaType, key: str, name: str) -> ItemMapping:
- """Create a generic item mapping."""
- return ItemMapping(
- media_type=media_type,
- item_id=key,
- provider=self.lookup_key,
- name=name,
- )
-
- async def _get_tidal_session(self) -> TidalSession:
- """Ensure the current token is valid and return a tidal session."""
- if (
- self._tidal_session
- and self._tidal_session.access_token
- and datetime.fromisoformat(str(self.config.get_value(CONF_EXPIRY_TIME)))
- > (datetime.now() + timedelta(days=1))
- ):
- return self._tidal_session
-
- try:
- self._tidal_session = await self._load_tidal_session(
- token_type="Bearer",
- quality=str(self.config.get_value(CONF_QUALITY)),
- access_token=str(self.config.get_value(CONF_AUTH_TOKEN)),
- refresh_token=str(self.config.get_value(CONF_REFRESH_TOKEN)),
- expiry_time=datetime.fromisoformat(str(self.config.get_value(CONF_EXPIRY_TIME))),
- )
- except Exception as err:
- if "401 Client Error: Unauthorized" in str(err):
- err_msg = "Credentials expired, you need to re-setup"
- # clear stored creds
- self.update_config_value(CONF_AUTH_TOKEN, None)
- self.update_config_value(CONF_REFRESH_TOKEN, None)
- # if we're already loaded and the login got invalid, we need to unload
- if self.available:
- self.unload_with_error(err_msg)
- raise LoginFailed(err_msg)
- raise
-
- self.update_config_value(
- CONF_AUTH_TOKEN,
- self._tidal_session.access_token,
- encrypted=True,
- )
- self.update_config_value(
- CONF_REFRESH_TOKEN,
- self._tidal_session.refresh_token,
- encrypted=True,
- )
- self.update_config_value(
- CONF_EXPIRY_TIME,
- self._tidal_session.expiry_time.isoformat(),
- )
- return self._tidal_session
-
- async def _load_tidal_session(
- self,
- token_type: str,
- quality: str,
- access_token: str,
- refresh_token: str,
- expiry_time: datetime | None = None,
- ) -> TidalSession:
- """Load the tidalapi Session."""
-
- def inner() -> TidalSession:
- config = TidalConfig(quality=quality, item_limit=10000, alac=False)
- session = TidalSession(config=config)
- session.load_oauth_session(
- token_type=token_type,
- access_token=access_token,
- refresh_token=refresh_token,
- expiry_time=expiry_time,
- is_pkce=True,
- )
- return session
-
- return await asyncio.to_thread(inner)
-
- async def _get_track_by_isrc(
- self, item_id: str, tidal_session: TidalSession
- ) -> TidalTrack | None:
+ async def _get_track_by_isrc(self, item_id: str) -> Track | None:
"""Get track by ISRC from library item, with caching."""
# Try to get from cache first
cache_key = f"isrc_map_{item_id}"
)
if cached_track_id:
- self.logger.debug(
- "Using cached track id",
- )
+ self.logger.debug("Using cached track id")
try:
- return await get_track(tidal_session, str(cached_track_id))
+ api_result = await self._get_data(f"tracks/{cached_track_id}")
+ track_data = self._extract_data(api_result)
+ return self._parse_track(track_data)
except MediaNotFoundError:
# Track no longer exists, invalidate cache
await self.mass.cache.delete(
return None
self.logger.debug("Attempting track lookup by ISRC: %s", isrc)
- tracks: list[TidalTrack] = await get_tracks_by_isrc(tidal_session, isrc)
- if not tracks:
+
+ # Get tracks by ISRC using direct API
+ api_result = await self._get_data(
+ "tracks",
+ params={
+ "filter[isrc]": isrc,
+ },
+ base_url=self.OPEN_API_URL,
+ )
+ tracks_data = self._extract_data(api_result)
+
+ if not tracks_data and not tracks_data.get("data"):
return None
+ track_data = tracks_data["data"][0]
+ track_id = str(track_data["id"])
+
# Cache the mapping for future use
await self.mass.cache.set(
- cache_key, tracks[0].id, category=CACHE_CATEGORY_DEFAULT, base_key=self.lookup_key
+ cache_key,
+ track_id,
+ category=CACHE_CATEGORY_DEFAULT,
+ base_key=self.lookup_key,
+ )
+
+ return await self.get_track(track_id)
+
+ def get_item_mapping(self, media_type: MediaType, key: str, name: str) -> ItemMapping:
+ """Create a generic item mapping."""
+ return ItemMapping(
+ media_type=media_type,
+ item_id=key,
+ provider=self.lookup_key,
+ name=name,
)
- return tracks[0]
+ #
+ # LIBRARY MANAGEMENT
+ #
+
+ async def get_library_artists(self) -> AsyncGenerator[Artist, None]:
+ """Retrieve all library artists from Tidal."""
+ user_id = self.auth.user_id
+ path = f"users/{user_id}/favorites/artists"
+
+ async for artist_item in self._paginate_api(path, nested_key="item"):
+ if artist_item and artist_item.get("id"):
+ yield self._parse_artist(artist_item)
+
+ async def get_library_albums(self) -> AsyncGenerator[Album, None]:
+ """Retrieve all library albums from Tidal."""
+ user_id = self.auth.user_id
+ path = f"users/{user_id}/favorites/albums"
+
+ async for album_item in self._paginate_api(path, nested_key="item"):
+ if album_item and album_item.get("id"):
+ yield self._parse_album(album_item)
+
+ async def get_library_tracks(self) -> AsyncGenerator[Track, None]:
+ """Retrieve library tracks from Tidal."""
+ user_id = self.auth.user_id
+ path = f"users/{user_id}/favorites/tracks"
+
+ async for track_item in self._paginate_api(path, nested_key="item"):
+ if track_item and track_item.get("id"):
+ yield self._parse_track(track_item)
+
+ async def get_library_playlists(self) -> AsyncGenerator[Playlist, None]:
+ """Retrieve all library playlists from the provider."""
+ user_id = self.auth.user_id
+ path = f"users/{user_id}/playlistsAndFavoritePlaylists"
+
+ async for playlist_item in self._paginate_api(path, nested_key="playlist"):
+ if playlist_item and playlist_item.get("uuid"):
+ yield self._parse_playlist(playlist_item)
- # Parsers
+ async def library_add(self, item: MediaItemType) -> bool:
+ """Add item to library."""
+ endpoint = None
+ data = {}
+
+ if item.media_type == MediaType.ARTIST:
+ endpoint = "favorites/artists"
+ data = {"artistId": item.item_id}
+ elif item.media_type == MediaType.ALBUM:
+ endpoint = "favorites/albums"
+ data = {"albumId": item.item_id}
+ elif item.media_type == MediaType.TRACK:
+ endpoint = "favorites/tracks"
+ data = {"trackId": item.item_id}
+ elif item.media_type == MediaType.PLAYLIST:
+ endpoint = "favorites/playlists"
+ data = {"playlistId": item.item_id}
+ else:
+ return False
+
+ endpoint = f"users/{self.auth.user_id}/{endpoint}"
+
+ await self._post_data(endpoint, data=data, as_form=True)
+ return True
+
+ async def library_remove(self, prov_item_id: str, media_type: MediaType) -> bool:
+ """Remove item from library."""
+ endpoint = None
+
+ if media_type == MediaType.ARTIST:
+ endpoint = f"favorites/artists/{prov_item_id}"
+ elif media_type == MediaType.ALBUM:
+ endpoint = f"favorites/albums/{prov_item_id}"
+ elif media_type == MediaType.TRACK:
+ endpoint = f"favorites/tracks/{prov_item_id}"
+ elif media_type == MediaType.PLAYLIST:
+ endpoint = f"favorites/playlists/{prov_item_id}"
+ else:
+ return False
+
+ endpoint = f"users/{self.auth.user_id}/{endpoint}"
+
+ try:
+ await self._delete_data(endpoint)
+ return True
+ except Exception:
+ # Log but don't raise - just return False to indicate failure
+ self.logger.warning("Failed to remove %s:%s library", media_type, prov_item_id)
+ return False
+
+ #
+ # PLAYLIST MANAGEMENT
+ #
+
+ async def create_playlist(self, name: str) -> Playlist:
+ """Create a new playlist on provider with given name."""
+ # Create playlist using form-encoded data
+ data = {"title": name, "description": ""}
- def _parse_artist(self, artist_obj: TidalArtist) -> Artist:
+ try:
+ playlist_obj = await self._post_data(
+ f"users/{self.auth.user_id}/playlists", data=data, as_form=True
+ )
+
+ return self._parse_playlist(playlist_obj)
+ except Exception as err:
+ self.logger.error("Failed to create playlist: %s", err)
+ raise ResourceTemporarilyUnavailable("Failed to create playlist") from err
+
+ async def add_playlist_tracks(self, prov_playlist_id: str, prov_track_ids: list[str]) -> None:
+ """Add track(s) to playlist."""
+ try:
+ # Get playlist details first with ETag
+ api_result = await self._get_data(f"playlists/{prov_playlist_id}", return_etag=True)
+ playlist_obj, etag = self._extract_data_and_etag(api_result)
+
+ # Send using form-encoded data like the synchronous library
+ data = {
+ "onArtifactNotFound": "SKIP",
+ "trackIds": ",".join(map(str, prov_track_ids)),
+ "toIndex": playlist_obj["numberOfTracks"],
+ "onDupes": "SKIP",
+ }
+
+ # Force using form data instead of JSON and include ETag
+ headers = {"If-None-Match": etag} if etag else {}
+ await self._post_data(
+ f"playlists/{prov_playlist_id}/items",
+ data=data,
+ as_form=True,
+ headers=headers,
+ )
+
+ except MediaNotFoundError as err:
+ raise MediaNotFoundError(f"Playlist {prov_playlist_id} not found") from err
+
+ async def remove_playlist_tracks(
+ self, prov_playlist_id: str, positions_to_remove: tuple[int, ...]
+ ) -> None:
+ """Remove track(s) from playlist."""
+ # Get playlist with ETag first
+ api_result = await self._get_data(f"playlists/{prov_playlist_id}", return_etag=True)
+ _, etag = self._extract_data_and_etag(api_result)
+
+ # Format positions as string in URL path
+ # Tidal can use directly indices in path, not track IDs in the body
+ position_string = ",".join([str(pos - 1) for pos in positions_to_remove])
+
+ # Use DELETE with If-None-Match header
+ # Tidal uses this incorrectly, but it's required
+ headers = {"If-None-Match": etag} if etag else {}
+
+ # Make a direct DELETE request to the endpoint with positions in the URL path
+ await self._delete_data(
+ f"playlists/{prov_playlist_id}/items/{position_string}", headers=headers
+ )
+
+ #
+ # ITEM PARSERS
+ #
+
+ def _parse_artist(self, artist_obj: dict[str, Any]) -> Artist:
"""Parse tidal artist object to generic layout."""
- artist_id = artist_obj.id
+ artist_id = str(artist_obj["id"])
artist = Artist(
- item_id=str(artist_id),
+ item_id=artist_id,
provider=self.lookup_key,
- name=artist_obj.name,
+ name=artist_obj["name"],
provider_mappings={
ProviderMapping(
- item_id=str(artist_id),
+ item_id=artist_id,
provider_domain=self.domain,
provider_instance=self.instance_id,
# NOTE: don't use the /browse endpoint as it's
},
)
# metadata
- if artist_obj.picture:
- picture_id = artist_obj.picture.replace("-", "/")
+ if artist_obj["picture"]:
+ picture_id = artist_obj["picture"].replace("-", "/")
image_url = f"{RESOURCES_URL}/{picture_id}/750x750.jpg"
artist.metadata.images = UniqueList(
[
return artist
- def _parse_album(self, album_obj: TidalAlbum) -> Album:
+ def _parse_album(self, album_obj: dict[str, Any]) -> Album:
"""Parse tidal album object to generic layout."""
- name = album_obj.name
- version = album_obj.version or ""
- album_id = album_obj.id
+ name = album_obj["title"]
+ version = album_obj["version"] or ""
+ album_id = str(album_obj["id"])
album = Album(
- item_id=str(album_id),
+ item_id=album_id,
provider=self.lookup_key,
name=name,
version=version,
provider_mappings={
ProviderMapping(
- item_id=str(album_id),
+ item_id=album_id,
provider_domain=self.domain,
provider_instance=self.instance_id,
audio_format=AudioFormat(
content_type=ContentType.FLAC,
),
url=f"https://tidal.com/album/{album_id}",
- available=album_obj.available,
+ available=album_obj["streamReady"],
)
},
)
various_artist_album: bool = False
- for artist_obj in album_obj.artists:
- if artist_obj.name == "Various Artists":
+ for artist_obj in album_obj["artists"]:
+ if artist_obj["name"] == "Various Artists":
various_artist_album = True
album.artists.append(self._parse_artist(artist_obj))
- if album_obj.type == "COMPILATION" or various_artist_album:
+ if album_obj["type"] == "COMPILATION" or various_artist_album:
album.album_type = AlbumType.COMPILATION
- elif album_obj.type == "ALBUM":
+ elif album_obj["type"] == "ALBUM":
album.album_type = AlbumType.ALBUM
- elif album_obj.type == "EP":
+ elif album_obj["type"] == "EP":
album.album_type = AlbumType.EP
- elif album_obj.type == "SINGLE":
+ elif album_obj["type"] == "SINGLE":
album.album_type = AlbumType.SINGLE
- album.year = int(album_obj.year)
+ album.year = int(album_obj["releaseDate"].split("-")[0])
# metadata
- if album_obj.universal_product_number:
- album.external_ids.add((ExternalID.BARCODE, album_obj.universal_product_number))
- album.metadata.copyright = album_obj.copyright
- album.metadata.explicit = album_obj.explicit
- album.metadata.popularity = album_obj.popularity
- if album_obj.cover:
- picture_id = album_obj.cover.replace("-", "/")
+ if album_obj["upc"]:
+ album.external_ids.add((ExternalID.BARCODE, album_obj["upc"]))
+ album.metadata.copyright = album_obj["copyright"]
+ album.metadata.explicit = album_obj["explicit"]
+ album.metadata.popularity = album_obj["popularity"]
+ if album_obj["cover"]:
+ picture_id = album_obj["cover"].replace("-", "/")
image_url = f"{RESOURCES_URL}/{picture_id}/750x750.jpg"
album.metadata.images = UniqueList(
[
def _parse_track(
self,
- track_obj: TidalTrack,
+ track_obj: dict[str, Any],
) -> Track:
"""Parse tidal track object to generic layout."""
- version = track_obj.version or ""
- track_id = str(track_obj.id)
+ version = track_obj["version"] or ""
+ track_id = str(track_obj["id"])
+ hi_res_lossless = "HI_RES_LOSSLESS" in track_obj["mediaMetadata"]["tags"]
track = Track(
- item_id=str(track_id),
+ item_id=track_id,
provider=self.lookup_key,
- name=track_obj.name,
+ name=track_obj["title"],
version=version,
- duration=track_obj.duration,
+ duration=track_obj["duration"],
provider_mappings={
ProviderMapping(
item_id=str(track_id),
provider_instance=self.instance_id,
audio_format=AudioFormat(
content_type=ContentType.FLAC,
- bit_depth=24 if track_obj.is_hi_res_lossless else 16,
+ bit_depth=24 if hi_res_lossless else 16,
),
url=f"https://tidal.com/track/{track_id}",
- available=track_obj.available,
+ available=track_obj["streamReady"],
)
},
- disc_number=track_obj.volume_num or 0,
- track_number=track_obj.track_num or 0,
+ disc_number=track_obj["volumeNumber"] or 0,
+ track_number=track_obj["trackNumber"] or 0,
)
- if track_obj.isrc:
- track.external_ids.add((ExternalID.ISRC, track_obj.isrc))
+ if track_obj["isrc"]:
+ track.external_ids.add((ExternalID.ISRC, track_obj["isrc"]))
track.artists = UniqueList()
- for track_artist in track_obj.artists:
+ for track_artist in track_obj["artists"]:
artist = self._parse_artist(track_artist)
track.artists.append(artist)
# metadata
- track.metadata.explicit = track_obj.explicit
- track.metadata.popularity = track_obj.popularity
- track.metadata.copyright = track_obj.copyright
- if track_obj.album:
+ track.metadata.explicit = track_obj["explicit"]
+ track.metadata.popularity = track_obj["popularity"]
+ track.metadata.copyright = track_obj["copyright"]
+ if track_obj["album"]:
# Here we use an ItemMapping as Tidal returns
# minimal data when getting an Album from a Track
track.album = self.get_item_mapping(
media_type=MediaType.ALBUM,
- key=str(track_obj.album.id),
- name=track_obj.album.name,
+ key=str(track_obj["album"]["id"]),
+ name=track_obj["album"]["title"],
)
- if track_obj.album.cover:
- picture_id = track_obj.album.cover.replace("-", "/")
+ if track_obj["album"]["cover"]:
+ picture_id = track_obj["album"]["cover"].replace("-", "/")
image_url = f"{RESOURCES_URL}/{picture_id}/750x750.jpg"
track.metadata.images = UniqueList(
[
)
return track
- def _parse_playlist(self, playlist_obj: TidalPlaylist) -> Playlist:
+ def _parse_playlist(self, playlist_obj: dict[str, Any]) -> Playlist:
"""Parse tidal playlist object to generic layout."""
- playlist_id = playlist_obj.id
- creator_id = playlist_obj.creator.id if playlist_obj.creator else None
- creator_name = playlist_obj.creator.name if playlist_obj.creator else "Tidal"
- is_editable = bool(creator_id and str(creator_id) == self._tidal_user_id)
+ playlist_id = str(playlist_obj["uuid"])
+ creator_id = None
+ if playlist_obj["creator"]:
+ creator_id = playlist_obj["creator"]["id"]
+ is_editable = bool(creator_id and str(creator_id) == str(self.auth.user_id))
playlist = Playlist(
- item_id=str(playlist_id),
+ item_id=playlist_id,
provider=self.instance_id if is_editable else self.lookup_key,
- name=playlist_obj.name,
- owner=creator_name,
+ name=playlist_obj["title"],
+ owner=creator_id or "Tidal",
provider_mappings={
ProviderMapping(
- item_id=str(playlist_id),
+ item_id=playlist_id,
provider_domain=self.domain,
provider_instance=self.instance_id,
url=f"{BROWSE_URL}/playlist/{playlist_id}",
is_editable=is_editable,
)
# metadata
- playlist.cache_checksum = str(playlist_obj.last_updated)
- playlist.metadata.popularity = playlist_obj.popularity
- if picture := (playlist_obj.square_picture or playlist_obj.picture):
+ playlist.cache_checksum = str(playlist_obj["lastUpdated"])
+ playlist.metadata.popularity = playlist_obj["popularity"]
+ if picture := (playlist_obj["squareImage"] or playlist_obj["image"]):
picture_id = picture.replace("-", "/")
image_url = f"{RESOURCES_URL}/{picture_id}/750x750.jpg"
playlist.metadata.images = UniqueList(
)
return playlist
-
- async def _iter_items(
- self,
- func: Callable[_P, list[_R]] | Callable[_P, Awaitable[list[_R]]],
- *args: _P.args,
- **kwargs: _P.kwargs,
- ) -> AsyncGenerator[_R, None]:
- """Yield all items from a larger listing."""
- offset = 0
- while True:
- if asyncio.iscoroutinefunction(func):
- chunk = await func(*args, **kwargs, offset=offset) # type: ignore[arg-type]
- else:
- chunk = await asyncio.to_thread(func, *args, **kwargs, offset=offset) # type: ignore[arg-type]
- offset += len(chunk)
- for item in chunk:
- yield item
- if len(chunk) < DEFAULT_LIMIT:
- break
-
- async def _get_media_info(
- self, item_id: str, url: str, force_refresh: bool = False
- ) -> AudioTags:
- """Retrieve (cached) mediainfo for track."""
- cache_category = CACHE_CATEGORY_MEDIA_INFO
- cache_base_key = self.lookup_key
- # do we have some cached info for this url ?
- cached_info = await self.mass.cache.get(
- item_id, category=cache_category, base_key=cache_base_key
- )
- if cached_info and not force_refresh:
- media_info = AudioTags.parse(cached_info)
- else:
- # parse info with ffprobe (and store in cache)
- media_info = await async_parse_tags(url)
- await self.mass.cache.set(
- item_id,
- media_info.raw,
- category=cache_category,
- base_key=cache_base_key,
- )
- return media_info
+++ /dev/null
-"""Helper module for parsing the Tidal API.
-
-This helpers file is an async wrapper around the excellent tidalapi package.
-While the tidalapi package does an excellent job at parsing the Tidal results,
-it is unfortunately not async, which is required for Music Assistant to run smoothly.
-This also nicely separates the parsing logic from the Tidal provider logic.
-
-CREDITS:
-tidalapi: https://github.com/tamland/python-tidal
-"""
-
-import asyncio
-import logging
-
-from music_assistant_models.enums import MediaType
-from music_assistant_models.errors import (
- MediaNotFoundError,
- ResourceTemporarilyUnavailable,
-)
-from tidalapi import Album as TidalAlbum
-from tidalapi import Artist as TidalArtist
-from tidalapi import Favorites as TidalFavorites
-from tidalapi import LoggedInUser
-from tidalapi import Playlist as TidalPlaylist
-from tidalapi import Session as TidalSession
-from tidalapi import Track as TidalTrack
-from tidalapi import UserPlaylist as TidalUserPlaylist
-from tidalapi.exceptions import (
- InvalidISRC,
- MetadataNotAvailable,
- ObjectNotFound,
- TooManyRequests,
-)
-from tidalapi.media import Lyrics as TidalLyrics
-from tidalapi.media import Stream as TidalStream
-
-DEFAULT_LIMIT = 50
-LOGGER = logging.getLogger(__name__)
-
-
-async def get_library_artists(
- session: TidalSession, user_id: str, limit: int = DEFAULT_LIMIT, offset: int = 0
-) -> list[TidalArtist]:
- """Async wrapper around the tidalapi Favorites.artists function."""
-
- def inner() -> list[TidalArtist]:
- artists: list[TidalArtist] = TidalFavorites(session, user_id).artists(
- limit=limit, offset=offset
- )
- return artists
-
- return await asyncio.to_thread(inner)
-
-
-async def library_items_add_remove(
- session: TidalSession,
- user_id: str,
- item_id: str,
- media_type: MediaType,
- add: bool = True,
-) -> bool:
- """Async wrapper around the tidalapi Favorites.items add/remove function."""
-
- def inner() -> bool:
- tidal_favorites = TidalFavorites(session, user_id)
- if media_type == MediaType.UNKNOWN:
- return False
- response: bool = False
- if add:
- match media_type:
- case MediaType.ARTIST:
- response = tidal_favorites.add_artist(item_id)
- case MediaType.ALBUM:
- response = tidal_favorites.add_album(item_id)
- case MediaType.TRACK:
- response = tidal_favorites.add_track(item_id)
- case MediaType.PLAYLIST:
- response = tidal_favorites.add_playlist(item_id)
- else:
- match media_type:
- case MediaType.ARTIST:
- response = tidal_favorites.remove_artist(item_id)
- case MediaType.ALBUM:
- response = tidal_favorites.remove_album(item_id)
- case MediaType.TRACK:
- response = tidal_favorites.remove_track(item_id)
- case MediaType.PLAYLIST:
- response = tidal_favorites.remove_playlist(item_id)
- return response
-
- return await asyncio.to_thread(inner)
-
-
-async def get_artist(session: TidalSession, prov_artist_id: str) -> TidalArtist:
- """Async wrapper around the tidalapi Artist function."""
-
- def inner() -> TidalArtist:
- try:
- return TidalArtist(session, prov_artist_id)
- except ObjectNotFound as err:
- msg = f"Artist {prov_artist_id} not found"
- raise MediaNotFoundError(msg) from err
- except TooManyRequests:
- msg = "Tidal API rate limit reached"
- raise ResourceTemporarilyUnavailable(msg)
-
- return await asyncio.to_thread(inner)
-
-
-async def get_artist_albums(session: TidalSession, prov_artist_id: str) -> list[TidalAlbum]:
- """Async wrapper around 3 tidalapi album functions."""
-
- def inner() -> list[TidalAlbum]:
- try:
- artist_obj = TidalArtist(session, prov_artist_id)
- except ObjectNotFound as err:
- msg = f"Artist {prov_artist_id} not found"
- raise MediaNotFoundError(msg) from err
- except TooManyRequests:
- msg = "Tidal API rate limit reached"
- raise ResourceTemporarilyUnavailable(msg)
- else:
- all_albums: list[TidalAlbum] = artist_obj.get_albums(limit=DEFAULT_LIMIT)
- # extend with EPs and singles
- all_albums.extend(artist_obj.get_ep_singles(limit=DEFAULT_LIMIT))
- # extend with compilations
- # note that the Tidal API gives back really strange results here so
- # filter on either various artists or the artist id
- for album in artist_obj.get_other(limit=DEFAULT_LIMIT):
- if album.artist.id == artist_obj.id or album.artist.name == "Various Artists":
- all_albums.append(album)
- return all_albums
-
- return await asyncio.to_thread(inner)
-
-
-async def get_artist_toptracks(
- session: TidalSession, prov_artist_id: str, limit: int = 10, offset: int = 0
-) -> list[TidalTrack]:
- """Async wrapper around the tidalapi Artist.get_top_tracks function."""
-
- def inner() -> list[TidalTrack]:
- top_tracks: list[TidalTrack] = TidalArtist(session, prov_artist_id).get_top_tracks(
- limit=limit, offset=offset
- )
- return top_tracks
-
- return await asyncio.to_thread(inner)
-
-
-async def get_library_albums(
- session: TidalSession, user_id: str, limit: int = DEFAULT_LIMIT, offset: int = 0
-) -> list[TidalAlbum]:
- """Async wrapper around the tidalapi Favorites.albums function."""
-
- def inner() -> list[TidalAlbum]:
- albums: list[TidalAlbum] = TidalFavorites(session, user_id).albums(
- limit=limit, offset=offset
- )
- return albums
-
- return await asyncio.to_thread(inner)
-
-
-async def get_album(session: TidalSession, prov_album_id: str) -> TidalAlbum:
- """Async wrapper around the tidalapi Album function."""
-
- def inner() -> TidalAlbum:
- try:
- return TidalAlbum(session, prov_album_id)
- except ObjectNotFound as err:
- msg = f"Album {prov_album_id} not found"
- raise MediaNotFoundError(msg) from err
- except TooManyRequests:
- msg = "Tidal API rate limit reached"
- raise ResourceTemporarilyUnavailable(msg)
-
- return await asyncio.to_thread(inner)
-
-
-async def get_track(session: TidalSession, prov_track_id: str) -> TidalTrack:
- """Async wrapper around the tidalapi Track function."""
-
- def inner() -> TidalTrack:
- try:
- return TidalTrack(session, prov_track_id)
- except ObjectNotFound as err:
- msg = f"Track {prov_track_id} not found"
- raise MediaNotFoundError(msg) from err
- except TooManyRequests:
- msg = "Tidal API rate limit reached"
- raise ResourceTemporarilyUnavailable(msg)
-
- return await asyncio.to_thread(inner)
-
-
-async def get_track_lyrics(session: TidalSession, prov_track_id: str) -> TidalLyrics | None:
- """Async wrapper around the tidalapi Track lyrics function."""
-
- def inner() -> TidalLyrics | None:
- try:
- track: TidalTrack = TidalTrack(session, prov_track_id)
- lyrics = track.lyrics()
- if lyrics and hasattr(lyrics, "text"):
- return lyrics
- except ObjectNotFound as err:
- msg = f"Track {prov_track_id} not found"
- raise MediaNotFoundError(msg) from err
- except MetadataNotAvailable as err:
- msg = f"Lyrics not available for track {prov_track_id}"
- LOGGER.debug(msg)
- raise MetadataNotAvailable(msg) from err
- except TooManyRequests:
- msg = "Tidal API rate limit reached"
- raise ResourceTemporarilyUnavailable(msg)
- return None
-
- return await asyncio.to_thread(inner)
-
-
-async def get_tracks_by_isrc(session: TidalSession, isrc: str) -> list[TidalTrack]:
- """Async wrapper around the tidalapi Track function."""
-
- def inner() -> list[TidalTrack]:
- try:
- tracks: list[TidalTrack] = session.get_tracks_by_isrc(isrc)
- return tracks
- except InvalidISRC as err:
- msg = f"ISRC {isrc} invalid or not found"
- raise MediaNotFoundError(msg) from err
- except TooManyRequests:
- msg = "Tidal API rate limit reached"
- raise ResourceTemporarilyUnavailable(msg)
-
- return await asyncio.to_thread(inner)
-
-
-async def get_stream(track: TidalTrack) -> TidalStream:
- """Async wrapper around the tidalapi Track.get_stream_url function."""
-
- def inner() -> TidalStream:
- try:
- return track.get_stream()
- except ObjectNotFound as err:
- msg = f"Track {track.id} has no available stream"
- raise MediaNotFoundError(msg) from err
- except TooManyRequests:
- msg = "Tidal API rate limit reached"
- raise ResourceTemporarilyUnavailable(msg)
-
- return await asyncio.to_thread(inner)
-
-
-async def get_album_tracks(session: TidalSession, prov_album_id: str) -> list[TidalTrack]:
- """Async wrapper around the tidalapi Album.tracks function."""
-
- def inner() -> list[TidalTrack]:
- try:
- tracks: list[TidalTrack] = TidalAlbum(session, prov_album_id).tracks(
- limit=DEFAULT_LIMIT
- )
- return tracks
- except ObjectNotFound as err:
- msg = f"Album {prov_album_id} not found"
- raise MediaNotFoundError(msg) from err
- except TooManyRequests:
- msg = "Tidal API rate limit reached"
- raise ResourceTemporarilyUnavailable(msg)
-
- return await asyncio.to_thread(inner)
-
-
-async def get_library_tracks(
- session: TidalSession, user_id: str, limit: int = DEFAULT_LIMIT, offset: int = 0
-) -> list[TidalTrack]:
- """Async wrapper around the tidalapi Favorites.tracks function."""
-
- def inner() -> list[TidalTrack]:
- tracks: list[TidalTrack] = TidalFavorites(session, user_id).tracks(
- limit=limit, offset=offset
- )
- return tracks
-
- return await asyncio.to_thread(inner)
-
-
-async def get_library_playlists(
- session: TidalSession, user_id: str, offset: int = 0
-) -> list[TidalPlaylist]:
- """Async wrapper around the tidalapi LoggedInUser.playlist_and_favorite_playlists function."""
-
- def inner() -> list[TidalPlaylist]:
- playlists: list[TidalPlaylist] = LoggedInUser(
- session, user_id
- ).playlist_and_favorite_playlists(offset=offset)
- return playlists
-
- return await asyncio.to_thread(inner)
-
-
-async def get_playlist(session: TidalSession, prov_playlist_id: str) -> TidalPlaylist:
- """Async wrapper around the tidal Playlist function."""
-
- def inner() -> TidalPlaylist:
- try:
- return TidalPlaylist(session, prov_playlist_id)
- except ObjectNotFound as err:
- msg = f"Playlist {prov_playlist_id} not found"
- raise MediaNotFoundError(msg) from err
- except TooManyRequests:
- msg = "Tidal API rate limit reached"
- raise ResourceTemporarilyUnavailable(msg)
-
- return await asyncio.to_thread(inner)
-
-
-async def get_playlist_tracks(
- session: TidalSession,
- prov_playlist_id: str,
- limit: int = DEFAULT_LIMIT,
- offset: int = 0,
-) -> list[TidalTrack]:
- """Async wrapper around the tidal Playlist.tracks function."""
-
- def inner() -> list[TidalTrack]:
- try:
- tracks: list[TidalTrack] = TidalPlaylist(session, prov_playlist_id).tracks(
- limit=limit, offset=offset
- )
- return tracks
- except ObjectNotFound as err:
- msg = f"Playlist {prov_playlist_id} not found"
- raise MediaNotFoundError(msg) from err
- except TooManyRequests:
- msg = "Tidal API rate limit reached"
- raise ResourceTemporarilyUnavailable(msg)
-
- return await asyncio.to_thread(inner)
-
-
-async def add_playlist_tracks(
- session: TidalSession, prov_playlist_id: str, track_ids: list[str]
-) -> None:
- """Async wrapper around the tidal Playlist.add function."""
-
- def inner() -> None:
- TidalUserPlaylist(session, prov_playlist_id).add(track_ids)
-
- return await asyncio.to_thread(inner)
-
-
-async def remove_playlist_tracks(
- session: TidalSession, prov_playlist_id: str, track_ids: list[str]
-) -> None:
- """Async wrapper around the tidal Playlist.remove function."""
-
- def inner() -> None:
- for item in track_ids:
- TidalUserPlaylist(session, prov_playlist_id).remove_by_id(int(item))
-
- return await asyncio.to_thread(inner)
-
-
-async def create_playlist(
- session: TidalSession, user_id: str, title: str, description: str | None = None
-) -> TidalPlaylist:
- """Async wrapper around the tidal LoggedInUser.create_playlist function."""
-
- def inner() -> TidalPlaylist:
- playlist: TidalPlaylist = LoggedInUser(session, user_id).create_playlist(title, description)
- return playlist
-
- return await asyncio.to_thread(inner)
-
-
-async def get_similar_tracks(
- session: TidalSession, prov_track_id: str, limit: int = 25
-) -> list[TidalTrack]:
- """Async wrapper around the tidal Track.get_similar_tracks function."""
-
- def inner() -> list[TidalTrack]:
- try:
- tracks: list[TidalTrack] = TidalTrack(session, prov_track_id).get_track_radio(
- limit=limit
- )
- return tracks
- except ObjectNotFound as err:
- msg = f"Source track {prov_track_id} not found"
- raise MediaNotFoundError(msg) from err
- except MetadataNotAvailable as err:
- msg = f"No similar tracks available for {prov_track_id}"
- raise MediaNotFoundError(msg) from err
- except TooManyRequests:
- msg = "Tidal API rate limit reached"
- raise ResourceTemporarilyUnavailable(msg)
-
- return await asyncio.to_thread(inner)
-
-
-async def search(
- session: TidalSession,
- query: str,
- media_types: list[MediaType],
- limit: int = 50,
- offset: int = 0,
-) -> dict[str, str]:
- """Async wrapper around the tidalapi Search function."""
-
- def inner() -> dict[str, str]:
- search_types = []
- if MediaType.ARTIST in media_types:
- search_types.append(TidalArtist)
- if MediaType.ALBUM in media_types:
- search_types.append(TidalAlbum)
- if MediaType.TRACK in media_types:
- search_types.append(TidalTrack)
- if MediaType.PLAYLIST in media_types:
- search_types.append(TidalPlaylist)
-
- models = search_types
- results: dict[str, str] = session.search(query, models, limit, offset)
- return results
-
- return await asyncio.to_thread(inner)
-
-
-async def token_refresh(session: TidalSession) -> None:
- """Async wrapper around the tidalapi Session.refresh function."""
-
- def inner() -> None:
- session.token_refresh(session.refresh_token)
-
- return await asyncio.to_thread(inner)