--- /dev/null
+"""Context manager using asyncio_throttle that catches and re-raises RetriesExhausted."""
+
+import asyncio
+import logging
+
+from asyncio_throttle import Throttler
+
+from music_assistant.common.models.errors import (
+ ResourceTemporarilyUnavailable,
+ RetriesExhausted,
+)
+from music_assistant.constants import MASS_LOGGER_NAME
+
+LOGGER = logging.getLogger(f"{MASS_LOGGER_NAME}.throttle_retry")
+
+
+class AsyncThrottleWithRetryContextManager:
+ """Context manager using asyncio_throttle that catches and re-raises RetriesExhausted."""
+
+ def __init__(self, rate_limit, period, retry_attempts=5, initial_backoff=5):
+ """Initialize the AsyncThrottledContextManager."""
+ self.rate_limit = rate_limit
+ self.period = period
+ self.retry_attempts = retry_attempts
+ self.initial_backoff = initial_backoff
+ self.throttler = Throttler(rate_limit=rate_limit, period=period)
+
+ async def __aenter__(self):
+ """Acquire the throttle when entering the async context."""
+ await self.throttler.acquire()
+ return self
+
+ async def __aexit__(self, exc_type, exc, tb):
+ """Release the throttle. If a RetriesExhausted occurs, re-raise it."""
+ self.throttler.flush()
+ if isinstance(exc, RetriesExhausted):
+ raise exc
+
+ async def wrapped_function_with_retry(self, func, *args, **kwargs):
+ """Async function wrapper with retry logic."""
+ backoff_time = self.initial_backoff
+ for attempt in range(self.retry_attempts):
+ try:
+ return await func(*args, **kwargs)
+ except ResourceTemporarilyUnavailable as e:
+ LOGGER.warning(f"Attempt {attempt + 1}/{self.retry_attempts} failed: {e}")
+ if attempt < self.retry_attempts - 1:
+ LOGGER.warning(f"Retrying in {backoff_time} seconds...")
+ await asyncio.sleep(backoff_time)
+ backoff_time *= 2
+ else: # noqa: PLW0120
+ msg = f"Retries exhausted, failed after {self.retry_attempts} attempts"
+ raise RetriesExhausted(msg)
from datetime import datetime, timedelta
from typing import TYPE_CHECKING, Any, cast
-from asyncio_throttle import Throttler
from tidalapi import Album as TidalAlbum
from tidalapi import Artist as TidalArtist
from tidalapi import Config as TidalConfig
ProviderFeature,
StreamType,
)
-from music_assistant.common.models.errors import LoginFailed, MediaNotFoundError
+from music_assistant.common.models.errors import (
+ LoginFailed,
+ MediaNotFoundError,
+)
from music_assistant.common.models.media_items import (
Album,
AlbumTrack,
ContentType,
ItemMapping,
MediaItemImage,
- MediaItemType,
Playlist,
PlaylistTrack,
ProviderMapping,
from music_assistant.common.models.streamdetails import StreamDetails
from music_assistant.server.helpers.auth import AuthenticationHelper
from music_assistant.server.helpers.tags import AudioTags, parse_tags
+from music_assistant.server.helpers.throttle_retry import (
+ AsyncThrottleWithRetryContextManager,
+)
from music_assistant.server.models.music_provider import MusicProvider
from .helpers import (
"""Handle async initialization of the provider."""
self._tidal_user_id: str = self.config.get_value(CONF_USER_ID)
self._tidal_session = await self._get_tidal_session()
- self._throttler = Throttler(rate_limit=1, period=0.1)
+ self._throttle_retry = AsyncThrottleWithRetryContextManager(
+ rate_limit=1, period=0.5, retry_attempts=5, initial_backoff=5
+ )
@property
def supported_features(self) -> tuple[ProviderFeature, ...]:
async def get_album_tracks(self, prov_album_id: str) -> list[AlbumTrack]:
"""Get album tracks for given album id."""
tidal_session = await self._get_tidal_session()
- async with self._throttler:
- return cast(
- list[AlbumTrack],
- [
- self._parse_track(
- track_obj=track_obj,
- extra_init_kwargs={
- "disc_number": track_obj.volume_num,
- "track_number": track_obj.track_num,
- },
- )
- for track_obj in await get_album_tracks(tidal_session, prov_album_id)
- ],
+ async with self._throttle_retry as manager:
+ album_obj = await manager.wrapped_function_with_retry(
+ get_album, tidal_session, prov_album_id
)
+ async with self._throttle_retry as manager:
+ tracks_obj = await manager.wrapped_function_with_retry(
+ get_album_tracks, tidal_session, prov_album_id
+ )
+ return [
+ AlbumTrack.from_track(
+ track=self._parse_track(track_obj=track_obj),
+ album=self._parse_album(album_obj=album_obj),
+ disc_number=track_obj.volume_num,
+ track_number=track_obj.track_num,
+ )
+ for track_obj in tracks_obj
+ ]
+
async def get_artist_albums(self, prov_artist_id: str) -> list[Album]:
"""Get a list of all albums for the given artist."""
tidal_session = await self._get_tidal_session()
- async with self._throttler:
- return [
- self._parse_album(album)
- for album in await get_artist_albums(tidal_session, prov_artist_id)
- ]
+ async with self._throttle_retry as manager:
+ artist_albums_obj = await manager.wrapped_function_with_retry(
+ get_artist_albums, tidal_session, prov_artist_id
+ )
+ return [self._parse_album(album) for album in artist_albums_obj]
async def get_artist_toptracks(self, prov_artist_id: str) -> list[Track]:
"""Get a list of 10 most popular tracks for the given artist."""
tidal_session = await self._get_tidal_session()
- async with self._throttler:
- return [
- self._parse_track(track)
- for track in await get_artist_toptracks(tidal_session, prov_artist_id)
- ]
+ async with self._throttle_retry as manager:
+ artist_toptracks_obj = await manager.wrapped_function_with_retry(
+ get_artist_toptracks, tidal_session, prov_artist_id
+ )
+ return [self._parse_track(track) for track in artist_toptracks_obj]
async def get_playlist_tracks(
self, prov_playlist_id: str
get_playlist_tracks, tidal_session, prov_playlist_id, limit=DEFAULT_LIMIT
):
total_playlist_tracks += 1
- track = self._parse_track(
- track_obj=track_obj,
- extra_init_kwargs={"position": total_playlist_tracks},
+ track = PlaylistTrack.from_track(
+ self._parse_track(track_obj=track_obj), total_playlist_tracks
)
yield track
async def get_similar_tracks(self, prov_track_id: str, limit: int = 25) -> list[Track]:
"""Get similar tracks for given track id."""
tidal_session = await self._get_tidal_session()
- async with self._throttler:
- return [
- self._parse_track(track)
- for track in await get_similar_tracks(tidal_session, prov_track_id, limit)
- ]
+ async with self._throttle_retry as manager:
+ similar_tracks_obj = await manager.wrapped_function_with_retry(
+ get_similar_tracks, tidal_session, prov_track_id, limit
+ )
+ return [self._parse_track(track) for track in similar_tracks_obj]
- async def library_add(self, item: MediaItemType) -> bool:
+ async def library_add(self, prov_item_id: str, media_type: MediaType) -> bool:
"""Add item to library."""
tidal_session = await self._get_tidal_session()
return await library_items_add_remove(
tidal_session,
str(self._tidal_user_id),
- item.item_id,
- item.media_type,
+ prov_item_id,
+ media_type,
add=True,
)
async def get_artist(self, prov_artist_id: str) -> Artist:
"""Get artist details for given artist id."""
tidal_session = await self._get_tidal_session()
- async with self._throttler:
- return self._parse_artist(
- artist_obj=await get_artist(tidal_session, prov_artist_id),
+ async with self._throttle_retry as manager:
+ artist_obj = await manager.wrapped_function_with_retry(
+ get_artist, tidal_session, prov_artist_id
)
+ return self._parse_artist(artist_obj)
async def get_album(self, prov_album_id: str) -> Album:
"""Get album details for given album id."""
tidal_session = await self._get_tidal_session()
- async with self._throttler:
- return self._parse_album(
- album_obj=await get_album(tidal_session, prov_album_id),
+ async with self._throttle_retry as manager:
+ album_obj = await manager.wrapped_function_with_retry(
+ get_album, tidal_session, prov_album_id
)
+ return self._parse_album(album_obj)
async def get_track(self, prov_track_id: str) -> Track:
"""Get track details for given track id."""
tidal_session = await self._get_tidal_session()
- async with self._throttler:
- track_obj = await get_track(tidal_session, prov_track_id)
+ async with self._throttle_retry as manager:
+ track_obj = await manager.wrapped_function_with_retry(
+ get_track, tidal_session, prov_track_id
+ )
track = self._parse_track(track_obj)
# get some extra details for the full track info
with suppress(tidal_exceptions.MetadataNotAvailable):
- lyrics: TidalLyrics = await asyncio.to_thread(track_obj.lyrics)
+ lyrics: TidalLyrics = await asyncio.to_thread(track.lyrics)
track.metadata.lyrics = lyrics.text
return track
async def get_playlist(self, prov_playlist_id: str) -> Playlist:
"""Get playlist details for given playlist id."""
tidal_session = await self._get_tidal_session()
- async with self._throttler:
- return self._parse_playlist(
- await get_playlist(tidal_session, prov_playlist_id),
+ async with self._throttle_retry as manager:
+ playlist_obj = await manager.wrapped_function_with_retry(
+ get_playlist, tidal_session, prov_playlist_id
)
+ return self._parse_playlist(playlist_obj)
def get_item_mapping(self, media_type: MediaType, key: str, name: str) -> ItemMapping:
"""Create a generic item mapping."""
MediaItemImage(
type=ImageType.THUMB,
path=image_url,
- provider=self.instance_id,
- remotely_accessible=True,
+ provider=self.domain,
)
]
MediaItemImage(
type=ImageType.THUMB,
path=image_url,
- provider=self.instance_id,
- remotely_accessible=True,
+ provider=self.domain,
)
]
MediaItemImage(
type=ImageType.THUMB,
path=image_url,
- provider=self.instance_id,
- remotely_accessible=True,
+ provider=self.domain,
)
]
return track
is_editable = bool(creator_id and str(creator_id) == self._tidal_user_id)
playlist.is_editable = is_editable
# metadata
- playlist.metadata.cache_checksum = str(playlist_obj.last_updated)
+ playlist.metadata.checksum = str(playlist_obj.last_updated)
playlist.metadata.popularity = playlist_obj.popularity
if picture := (playlist_obj.square_picture or playlist_obj.picture):
picture_id = picture.replace("-", "/")
MediaItemImage(
type=ImageType.THUMB,
path=image_url,
- provider=self.instance_id,
- remotely_accessible=True,
+ provider=self.domain,
)
]
) -> AsyncGenerator[Any, None]:
"""Yield all items from a larger listing."""
offset = 0
- async with self._throttler:
+ async with self._throttle_retry:
while True:
if asyncio.iscoroutinefunction(func):
chunk = await func(*args, **kwargs, offset=offset)
MetadataNotAvailable,
ObjectNotFound,
TooManyRequests,
- URLNotAvailable,
)
from music_assistant.common.models.enums import MediaType
-from music_assistant.common.models.errors import MediaNotFoundError
+from music_assistant.common.models.errors import (
+ MediaNotFoundError,
+ ResourceTemporarilyUnavailable,
+)
DEFAULT_LIMIT = 50
LOGGER = logging.getLogger(__name__)
def inner() -> TidalArtist:
try:
return TidalArtist(session, prov_artist_id)
- except (ObjectNotFound, TooManyRequests) as err:
+ except ObjectNotFound as err:
msg = f"Artist {prov_artist_id} not found"
raise MediaNotFoundError(msg) from err
+ except TooManyRequests:
+ msg = "Tidal API rate limit reached"
+ raise ResourceTemporarilyUnavailable(msg)
return await asyncio.to_thread(inner)
def inner() -> list[TidalAlbum]:
try:
artist_obj = TidalArtist(session, prov_artist_id)
- except (ObjectNotFound, TooManyRequests) as err:
+ except ObjectNotFound as err:
msg = f"Artist {prov_artist_id} not found"
raise MediaNotFoundError(msg) from err
+ except TooManyRequests:
+ msg = "Tidal API rate limit reached"
+ raise ResourceTemporarilyUnavailable(msg)
else:
all_albums = []
albums = artist_obj.get_albums(limit=DEFAULT_LIMIT)
def inner() -> TidalAlbum:
try:
return TidalAlbum(session, prov_album_id)
- except (ObjectNotFound, TooManyRequests) as err:
+ except ObjectNotFound as err:
msg = f"Album {prov_album_id} not found"
raise MediaNotFoundError(msg) from err
+ except TooManyRequests:
+ msg = "Tidal API rate limit reached"
+ raise ResourceTemporarilyUnavailable(msg)
return await asyncio.to_thread(inner)
def inner() -> TidalTrack:
try:
return TidalTrack(session, prov_track_id)
- except (ObjectNotFound, TooManyRequests) as err:
+ except ObjectNotFound as err:
msg = f"Track {prov_track_id} not found"
raise MediaNotFoundError(msg) from err
+ except TooManyRequests:
+ msg = "Tidal API rate limit reached"
+ raise ResourceTemporarilyUnavailable(msg)
return await asyncio.to_thread(inner)
try:
track_url: str = TidalTrack(session, prov_track_id).get_url()
return track_url
- except (ObjectNotFound, TooManyRequests, URLNotAvailable) as err:
+ except ObjectNotFound as err:
msg = f"Track {prov_track_id} not found"
raise MediaNotFoundError(msg) from err
+ except TooManyRequests:
+ msg = "Tidal API rate limit reached"
+ raise ResourceTemporarilyUnavailable(msg)
return await asyncio.to_thread(inner)
limit=DEFAULT_LIMIT
)
return tracks
- except (ObjectNotFound, TooManyRequests) as err:
+ except ObjectNotFound as err:
msg = f"Album {prov_album_id} not found"
raise MediaNotFoundError(msg) from err
+ except TooManyRequests:
+ msg = "Tidal API rate limit reached"
+ raise ResourceTemporarilyUnavailable(msg)
return await asyncio.to_thread(inner)
def inner() -> TidalPlaylist:
try:
return TidalPlaylist(session, prov_playlist_id)
- except (ObjectNotFound, TooManyRequests) as err:
+ except ObjectNotFound as err:
msg = f"Playlist {prov_playlist_id} not found"
raise MediaNotFoundError(msg) from err
+ except TooManyRequests:
+ msg = "Tidal API rate limit reached"
+ raise ResourceTemporarilyUnavailable(msg)
return await asyncio.to_thread(inner)
limit=limit, offset=offset
)
return tracks
- except (ObjectNotFound, TooManyRequests) as err:
+ except ObjectNotFound as err:
msg = f"Playlist {prov_playlist_id} not found"
raise MediaNotFoundError(msg) from err
+ except TooManyRequests:
+ msg = "Tidal API rate limit reached"
+ raise ResourceTemporarilyUnavailable(msg)
return await asyncio.to_thread(inner)
limit=limit
)
return tracks
- except (MetadataNotAvailable, ObjectNotFound, TooManyRequests) as err:
+ except (MetadataNotAvailable, ObjectNotFound) as err:
msg = f"Track {prov_track_id} not found"
raise MediaNotFoundError(msg) from err
+ except TooManyRequests:
+ msg = "Tidal API rate limit reached"
+ raise ResourceTemporarilyUnavailable(msg)
return await asyncio.to_thread(inner)