import asyncio
import functools
import json
-from collections.abc import Awaitable, Callable, Coroutine
+from collections.abc import Awaitable, Callable
from contextlib import suppress
from enum import StrEnum
from typing import TYPE_CHECKING, Any, TypeVar, cast
return wrapper
- @staticmethod
- def handle_item_errors(
- item_type: str,
- ) -> Callable[[Callable[..., Coroutine[Any, Any, T]]], Callable[..., Coroutine[Any, Any, T]]]:
- """Handle standard error patterns in item getters."""
-
- def decorator(
- method: Callable[..., Coroutine[Any, Any, T]],
- ) -> Callable[..., Coroutine[Any, Any, T]]:
- @functools.wraps(method)
- async def wrapper(self: TidalProvider, item_id: str, *args: Any, **kwargs: Any) -> T:
- try:
- return await method(self, item_id, *args, **kwargs)
- except ResourceTemporarilyUnavailable:
- raise
- except Exception as err:
- raise MediaNotFoundError(f"{item_type} {item_id} not found") from err
-
- return wrapper
-
- return decorator
-
#
# CORE API METHODS
#
return parsed_results
- @handle_item_errors("Track")
async def get_similar_tracks(self, prov_track_id: str, limit: int = 25) -> list[Track]:
"""Get similar tracks for given track id."""
- api_result = await self._get_data(f"tracks/{prov_track_id}/radio", params={"limit": limit})
- similar_tracks = self._extract_data(api_result)
- return [self._parse_track(track_obj) for track_obj in similar_tracks.get("items", [])]
+ try:
+ api_result = await self._get_data(
+ f"tracks/{prov_track_id}/radio", params={"limit": limit}
+ )
+ similar_tracks = self._extract_data(api_result)
+ return [self._parse_track(track_obj) for track_obj in similar_tracks.get("items", [])]
+ except ResourceTemporarilyUnavailable:
+ raise
+ except Exception as err:
+ raise MediaNotFoundError(f"Track {prov_track_id} not found") from err
#
# ITEM RETRIEVAL METHODS
#
- @handle_item_errors("Artist")
async def get_artist(self, prov_artist_id: str) -> Artist:
"""Get artist details for given artist id."""
- api_result = await self._get_data(f"artists/{prov_artist_id}")
- artist_obj = self._extract_data(api_result)
- return self._parse_artist(artist_obj)
+ try:
+ api_result = await self._get_data(f"artists/{prov_artist_id}")
+ artist_obj = self._extract_data(api_result)
+ return self._parse_artist(artist_obj)
+ except ResourceTemporarilyUnavailable:
+ raise
+ except Exception as err:
+ raise MediaNotFoundError(f"Artist {prov_artist_id} not found") from err
- @handle_item_errors("Album")
async def get_album(self, prov_album_id: str) -> Album:
"""Get album details for given album id."""
- api_result = await self._get_data(f"albums/{prov_album_id}")
- album_obj = self._extract_data(api_result)
- return self._parse_album(album_obj)
+ try:
+ api_result = await self._get_data(f"albums/{prov_album_id}")
+ album_obj = self._extract_data(api_result)
+ return self._parse_album(album_obj)
+ except ResourceTemporarilyUnavailable:
+ raise
+ except Exception as err:
+ raise MediaNotFoundError(f"Album {prov_album_id} not found") from err
- @handle_item_errors("Track")
async def get_track(self, prov_track_id: str) -> Track:
"""Get track details for given track id."""
- api_result = await self._get_data(f"tracks/{prov_track_id}")
- track_obj = self._extract_data(api_result)
- track = self._parse_track(track_obj)
- # Get additional details like lyrics if needed
- with suppress(MediaNotFoundError):
- api_result = await self._get_data(f"tracks/{prov_track_id}/lyrics")
- lyrics_data = self._extract_data(api_result)
-
- if lyrics_data and "text" in lyrics_data:
- track.metadata.lyrics = lyrics_data["text"]
-
- return track
+ try:
+ api_result = await self._get_data(f"tracks/{prov_track_id}")
+ track_obj = self._extract_data(api_result)
+ track = self._parse_track(track_obj)
+ # Get additional details like lyrics if needed
+ with suppress(MediaNotFoundError):
+ api_result = await self._get_data(f"tracks/{prov_track_id}/lyrics")
+ lyrics_data = self._extract_data(api_result)
+
+ if lyrics_data and "text" in lyrics_data:
+ track.metadata.lyrics = lyrics_data["text"]
+
+ return track
+ except ResourceTemporarilyUnavailable:
+ raise
+ except Exception as err:
+ raise MediaNotFoundError(f"Track {prov_track_id} not found") from err
- @handle_item_errors("Playlist")
async def get_playlist(self, prov_playlist_id: str) -> Playlist:
"""Get playlist details for given playlist id."""
- api_result = await self._get_data(f"playlists/{prov_playlist_id}")
- playlist_obj = self._extract_data(api_result)
- return self._parse_playlist(playlist_obj)
+ try:
+ api_result = await self._get_data(f"playlists/{prov_playlist_id}")
+ playlist_obj = self._extract_data(api_result)
+ return self._parse_playlist(playlist_obj)
+ except ResourceTemporarilyUnavailable:
+ raise
+ except Exception as err:
+ raise MediaNotFoundError(f"Playlist {prov_playlist_id} not found") from err
- @handle_item_errors("Track")
async def get_album_tracks(self, prov_album_id: str) -> list[Track]:
"""Get album tracks for given album id."""
- api_result = await self._get_data(f"albums/{prov_album_id}/tracks")
- album_tracks = self._extract_data(api_result)
- return [self._parse_track(track_obj) for track_obj in album_tracks.get("items", [])]
+ try:
+ api_result = await self._get_data(f"albums/{prov_album_id}/tracks")
+ album_tracks = self._extract_data(api_result)
+ return [self._parse_track(track_obj) for track_obj in album_tracks.get("items", [])]
+ except ResourceTemporarilyUnavailable:
+ raise
+ except Exception as err:
+ raise MediaNotFoundError(f"Album {prov_album_id} not found") from err
- @handle_item_errors("Album")
async def get_artist_albums(self, prov_artist_id: str) -> list[Album]:
"""Get a list of all albums for the given artist."""
- api_result = await self._get_data(f"artists/{prov_artist_id}/albums")
- artist_albums = self._extract_data(api_result)
- return [self._parse_album(album_obj) for album_obj in artist_albums.get("items", [])]
+ try:
+ api_result = await self._get_data(f"artists/{prov_artist_id}/albums")
+ artist_albums = self._extract_data(api_result)
+ return [self._parse_album(album_obj) for album_obj in artist_albums.get("items", [])]
+ except ResourceTemporarilyUnavailable:
+ raise
+ except Exception as err:
+ raise MediaNotFoundError(f"Artist {prov_artist_id} not found") from err
- @handle_item_errors("Track")
async def get_artist_toptracks(self, prov_artist_id: str) -> list[Track]:
"""Get a list of 10 most popular tracks for the given artist."""
- api_result = await self._get_data(
- f"artists/{prov_artist_id}/toptracks", params={"limit": 10, "offset": 0}
- )
- artist_top_tracks = self._extract_data(api_result)
- return [self._parse_track(track_obj) for track_obj in artist_top_tracks.get("items", [])]
+ try:
+ api_result = await self._get_data(
+ f"artists/{prov_artist_id}/toptracks", params={"limit": 10, "offset": 0}
+ )
+ artist_top_tracks = self._extract_data(api_result)
+ return [
+ self._parse_track(track_obj) for track_obj in artist_top_tracks.get("items", [])
+ ]
+ except ResourceTemporarilyUnavailable:
+ raise
+ except Exception as err:
+ raise MediaNotFoundError(f"Artist {prov_artist_id} not found") from err
- @handle_item_errors("Playlist")
async def get_playlist_tracks(self, prov_playlist_id: str, page: int = 0) -> list[Track]:
"""Get playlist tracks."""
result: list[Track] = []
page_size = 200
offset = page * page_size
- api_result = await self._get_data(
- f"playlists/{prov_playlist_id}/tracks",
- params={"limit": page_size, "offset": offset},
- )
- tidal_tracks = self._extract_data(api_result)
- for index, track_obj in enumerate(tidal_tracks.get("items", []), 1):
- track = self._parse_track(track_obj=track_obj)
- track.position = offset + index
- result.append(track)
- return result
+ try:
+ api_result = await self._get_data(
+ f"playlists/{prov_playlist_id}/tracks",
+ params={"limit": page_size, "offset": offset},
+ )
+ tidal_tracks = self._extract_data(api_result)
+ for index, track_obj in enumerate(tidal_tracks.get("items", []), 1):
+ track = self._parse_track(track_obj=track_obj)
+ track.position = offset + index
+ result.append(track)
+ return result
+ except ResourceTemporarilyUnavailable:
+ raise
+ except Exception as err:
+ raise MediaNotFoundError(f"Playlist {prov_playlist_id} not found") from err
async def get_stream_details(
self, item_id: str, media_type: MediaType = MediaType.TRACK