from datetime import datetime, timedelta
from typing import TYPE_CHECKING, Any
+from asyncio_throttle import Throttler
from tidalapi import Album as TidalAlbum
from tidalapi import Artist as TidalArtist
from tidalapi import Config as TidalConfig
)
-async def iter_items(func: Awaitable | Callable, *args, **kwargs) -> AsyncGenerator[Any, None]:
- """Yield all items from a larger listing."""
- offset = 0
- while True:
- if asyncio.iscoroutinefunction(func):
- chunk = await func(*args, **kwargs, offset=offset)
- else:
- chunk = await asyncio.to_thread(func, *args, **kwargs, offset=offset)
- offset += len(chunk)
- for item in chunk:
- yield item
- if len(chunk) < DEFAULT_LIMIT:
- break
-
-
class TidalProvider(MusicProvider):
"""Implementation of a Tidal MusicProvider."""
"""Handle async initialization of the provider."""
self._tidal_user_id = self.config.get_value(CONF_USER_ID)
self._tidal_session = await self._get_tidal_session()
+ self._throttler = Throttler(rate_limit=1, period=0.1)
@property
def supported_features(self) -> tuple[ProviderFeature, ...]:
"""Retrieve all library artists from Tidal."""
tidal_session = await self._get_tidal_session()
artist: TidalArtist # satisfy the type checker
- async for artist in iter_items(
+ async for artist in self._iter_items(
get_library_artists, tidal_session, self._tidal_user_id, limit=DEFAULT_LIMIT
):
yield await self._parse_artist(artist_obj=artist)
"""Retrieve all library albums from Tidal."""
tidal_session = await self._get_tidal_session()
album: TidalAlbum # satisfy the type checker
- async for album in iter_items(
+ async for album in self._iter_items(
get_library_albums, tidal_session, self._tidal_user_id, limit=DEFAULT_LIMIT
):
yield await self._parse_album(album_obj=album)
"""Retrieve library tracks from Tidal."""
tidal_session = await self._get_tidal_session()
track: TidalTrack # satisfy the type checker
- async for track in iter_items(
+ async for track in self._iter_items(
get_library_tracks, tidal_session, self._tidal_user_id, limit=DEFAULT_LIMIT
):
yield await self._parse_track(track_obj=track)
"""Retrieve all library playlists from the provider."""
tidal_session = await self._get_tidal_session()
playlist: TidalPlaylist # satisfy the type checker
- async for playlist in iter_items(get_library_playlists, tidal_session, self._tidal_user_id):
+ async for playlist in self._iter_items(
+ get_library_playlists, tidal_session, self._tidal_user_id
+ ):
yield await self._parse_playlist(playlist_obj=playlist)
async def get_album_tracks(self, prov_album_id: str) -> list[Track]:
"""Get album tracks for given album id."""
tidal_session = await self._get_tidal_session()
- return [
- await self._parse_track(track_obj=track)
- for track in await get_album_tracks(tidal_session, prov_album_id)
- ]
+ async with self._throttler:
+ return [
+ await self._parse_track(track_obj=track)
+ for track in await get_album_tracks(tidal_session, prov_album_id)
+ ]
async def get_artist_albums(self, prov_artist_id: str) -> list[Album]:
"""Get a list of all albums for the given artist."""
tidal_session = await self._get_tidal_session()
- return [
- await self._parse_album(album_obj=album)
- for album in await get_artist_albums(tidal_session, prov_artist_id)
- ]
+ async with self._throttler:
+ return [
+ await self._parse_album(album_obj=album)
+ for album in await get_artist_albums(tidal_session, prov_artist_id)
+ ]
async def get_artist_toptracks(self, prov_artist_id: str) -> list[Track]:
"""Get a list of 10 most popular tracks for the given artist."""
tidal_session = await self._get_tidal_session()
- return [
- await self._parse_track(track_obj=track)
- for track in await get_artist_toptracks(tidal_session, prov_artist_id)
- ]
+ async with self._throttler:
+ return [
+ await self._parse_track(track_obj=track)
+ for track in await get_artist_toptracks(tidal_session, prov_artist_id)
+ ]
async def get_playlist_tracks(self, prov_playlist_id: str) -> AsyncGenerator[Track, None]:
"""Get all playlist tracks for given playlist id."""
tidal_session = await self._get_tidal_session()
total_playlist_tracks = 0
track: TidalTrack # satisfy the type checker
- async for track_obj in iter_items(
+ async for track_obj in self._iter_items(
get_playlist_tracks, tidal_session, prov_playlist_id, limit=DEFAULT_LIMIT
):
total_playlist_tracks += 1
async def get_similar_tracks(self, prov_track_id: str, limit=25) -> list[Track]:
"""Get similar tracks for given track id."""
tidal_session = await self._get_tidal_session()
- return [
- await self._parse_track(track_obj=track)
- for track in await get_similar_tracks(tidal_session, prov_track_id, limit)
- ]
+ async with self._throttler:
+ return [
+ await self._parse_track(track_obj=track)
+ for track in await get_similar_tracks(tidal_session, prov_track_id, limit)
+ ]
async def library_add(self, prov_item_id: str, media_type: MediaType):
"""Add item to library."""
async def get_artist(self, prov_artist_id: str) -> Artist:
"""Get artist details for given artist id."""
tidal_session = await self._get_tidal_session()
- try:
- artist = await self._parse_artist(
- artist_obj=await get_artist(tidal_session, prov_artist_id), full_details=True
+ async with self._throttler:
+ return await self._parse_artist(
+ artist_obj=await get_artist(tidal_session, prov_artist_id),
+ full_details=True,
)
- except MediaNotFoundError as err:
- raise MediaNotFoundError(f"Artist {prov_artist_id} not found") from err
- return artist
async def get_album(self, prov_album_id: str) -> Album:
"""Get album details for given album id."""
tidal_session = await self._get_tidal_session()
- try:
- album = await self._parse_album(
+ async with self._throttler:
+ return await self._parse_album(
album_obj=await get_album(tidal_session, prov_album_id), full_details=True
)
- except MediaNotFoundError as err:
- raise MediaNotFoundError(f"Album {prov_album_id} not found") from err
- return album
async def get_track(self, prov_track_id: str) -> Track:
"""Get track details for given track id."""
tidal_session = await self._get_tidal_session()
- try:
- track = await self._parse_track(
+ async with self._throttler:
+ return await self._parse_track(
track_obj=await get_track(tidal_session, prov_track_id), full_details=True
)
- except MediaNotFoundError as err:
- raise MediaNotFoundError(f"Track {prov_track_id} not found") from err
- return track
async def get_playlist(self, prov_playlist_id: str) -> Playlist:
"""Get playlist details for given playlist id."""
tidal_session = await self._get_tidal_session()
- try:
- playlist = await self._parse_playlist(
+ async with self._throttler:
+ return await self._parse_playlist(
playlist_obj=await get_playlist(tidal_session, prov_playlist_id), full_details=True
)
- except MediaNotFoundError as err:
- raise MediaNotFoundError(f"Playlist {prov_playlist_id} not found") from err
- return playlist
def get_item_mapping(self, media_type: MediaType, key: str, name: str) -> ItemMapping:
"""Create a generic item mapping."""
album.upc = album_obj.universal_product_number
album.year = int(album_obj.year)
+ available = album_obj.available
album.add_provider_mapping(
ProviderMapping(
item_id=album_id,
provider_instance=self.instance_id,
content_type=ContentType.FLAC,
url=f"http://www.tidal.com/album/{album_id}",
+ available=available,
)
)
# metadata
return item.lyrics
return await asyncio.to_thread(inner)
+
+ async def _iter_items(
+ self, func: Awaitable | Callable, *args, **kwargs
+ ) -> AsyncGenerator[Any, None]:
+ """Yield all items from a larger listing."""
+ offset = 0
+ async with self._throttler:
+ while True:
+ if asyncio.iscoroutinefunction(func):
+ chunk = await func(*args, **kwargs, offset=offset)
+ else:
+ chunk = await asyncio.to_thread(func, *args, **kwargs, offset=offset)
+ offset += len(chunk)
+ for item in chunk:
+ yield item
+ if len(chunk) < DEFAULT_LIMIT:
+ break
"""
import asyncio
+import logging
from requests import HTTPError
from tidalapi import Album as TidalAlbum
from music_assistant.common.models.enums import MediaType
from music_assistant.common.models.errors import MediaNotFoundError
+from music_assistant.constants import ROOT_LOGGER_NAME
DEFAULT_LIMIT = 50
+LOGGER = logging.getLogger(f"{ROOT_LOGGER_NAME}.tidal.helpers")
async def get_library_artists(
def inner() -> TidalArtist:
try:
- artist_obj = TidalArtist(session, prov_artist_id)
+ return TidalArtist(session, prov_artist_id)
except HTTPError as err:
- raise MediaNotFoundError(f"Artist {prov_artist_id} not found") from err
- return artist_obj
+ if err.response.status_code == 404:
+ raise MediaNotFoundError(f"Artist {prov_artist_id} not found") from err
+ raise err
return await asyncio.to_thread(inner)
"""Async wrapper around 3 tidalapi album functions."""
def inner() -> list[TidalAlbum]:
- all_albums = []
- albums = TidalArtist(session, prov_artist_id).get_albums(limit=DEFAULT_LIMIT)
- eps_singles = TidalArtist(session, prov_artist_id).get_albums_ep_singles(
- limit=DEFAULT_LIMIT
- )
- compilations = TidalArtist(session, prov_artist_id).get_albums_other(limit=DEFAULT_LIMIT)
- all_albums.extend(albums)
- all_albums.extend(eps_singles)
- all_albums.extend(compilations)
- return all_albums
+ try:
+ artist_obj = TidalArtist(session, prov_artist_id)
+ all_albums = []
+ albums = artist_obj.get_albums(limit=DEFAULT_LIMIT)
+ eps_singles = artist_obj.get_albums_ep_singles(limit=DEFAULT_LIMIT)
+ compilations = artist_obj.get_albums_other(limit=DEFAULT_LIMIT)
+ all_albums.extend(albums)
+ all_albums.extend(eps_singles)
+ all_albums.extend(compilations)
+ return all_albums
+ except HTTPError as err:
+ if err.response.status_code == 404:
+ raise MediaNotFoundError(f"Artist {prov_artist_id} not found") from err
+ raise err
return await asyncio.to_thread(inner)
def inner() -> TidalAlbum:
try:
- album_obj = TidalAlbum(session, prov_album_id)
+ return TidalAlbum(session, prov_album_id)
except HTTPError as err:
- raise MediaNotFoundError(f"Album {prov_album_id} not found") from err
- return album_obj
+ if err.response.status_code == 404:
+ raise MediaNotFoundError(f"Album {prov_album_id} not found") from err
+ raise err
return await asyncio.to_thread(inner)
def inner() -> TidalTrack:
try:
- track_obj = TidalTrack(session, prov_track_id)
+ return TidalTrack(session, prov_track_id)
except HTTPError as err:
- raise MediaNotFoundError(f"Track {prov_track_id} not found") from err
- return track_obj
+ if err.response.status_code == 404:
+ raise MediaNotFoundError(f"Track {prov_track_id} not found") from err
+ raise err
return await asyncio.to_thread(inner)
"""Async wrapper around the tidalapi Track.get_url function."""
def inner() -> dict[str, str]:
- return TidalTrack(session, prov_track_id).get_url()
+ try:
+ return TidalTrack(session, prov_track_id).get_url()
+ except HTTPError as err:
+ if err.response.status_code == 404:
+ raise MediaNotFoundError(f"Track {prov_track_id} not found") from err
+ raise err
return await asyncio.to_thread(inner)
"""Async wrapper around the tidalapi Album.tracks function."""
def inner() -> list[TidalTrack]:
- return TidalAlbum(session, prov_album_id).tracks(limit=DEFAULT_LIMIT)
+ try:
+ return TidalAlbum(session, prov_album_id).tracks(limit=DEFAULT_LIMIT)
+ except HTTPError as err:
+ if err.response.status_code == 404:
+ raise MediaNotFoundError(f"Album {prov_album_id} not found") from err
+ raise err
return await asyncio.to_thread(inner)
def inner() -> TidalPlaylist:
try:
- playlist_obj = TidalPlaylist(session, prov_playlist_id)
+ return TidalPlaylist(session, prov_playlist_id)
except HTTPError as err:
- raise MediaNotFoundError(f"Playlist {prov_playlist_id} not found") from err
- return playlist_obj
+ if err.response.status_code == 404:
+ raise MediaNotFoundError(f"Playlist {prov_playlist_id} not found") from err
+ raise err
return await asyncio.to_thread(inner)
"""Async wrapper around the tidal Playlist.tracks function."""
def inner() -> list[TidalTrack]:
- return TidalPlaylist(session, prov_playlist_id).tracks(limit=limit, offset=offset)
+ try:
+ return TidalPlaylist(session, prov_playlist_id).tracks(limit=limit, offset=offset)
+ except HTTPError as err:
+ if err.response.status_code == 404:
+ raise MediaNotFoundError(f"Playlist {prov_playlist_id} not found") from err
+ raise err
return await asyncio.to_thread(inner)
"""Async wrapper around the tidal Track.get_similar_tracks function."""
def inner() -> list[TidalTrack]:
- return TidalTrack(session, media_id=prov_track_id).get_track_radio(limit)
+ try:
+ return TidalTrack(session, prov_track_id).get_track_radio(limit)
+ except HTTPError as err:
+ if err.response.status_code == 404:
+ raise MediaNotFoundError(f"Track {prov_track_id} not found") from err
+ raise err
return await asyncio.to_thread(inner)