From: Jozef Kruszynski <60214390+jozefKruszynski@users.noreply.github.com> Date: Sun, 21 Apr 2024 09:35:29 +0000 (+0200) Subject: 429 backoff implementation (#1230) X-Git-Url: https://git.kitaultman.com/?a=commitdiff_plain;h=510b3213c5f14c0a7b64b94821ba64bf4ba85d16;p=music-assistant-server.git 429 backoff implementation (#1230) --- diff --git a/music_assistant/common/models/errors.py b/music_assistant/common/models/errors.py index ca8f3bfe..a641a98a 100644 --- a/music_assistant/common/models/errors.py +++ b/music_assistant/common/models/errors.py @@ -104,3 +104,15 @@ class InvalidProviderID(MusicAssistantError): """Error thrown when a provider media item identifier does not match a known format.""" error_code = 15 + + +class RetriesExhausted(MusicAssistantError): + """Error thrown when a retries to a given provider URI have been exhausted.""" + + error_code = 16 + + +class ResourceTemporarilyUnavailable(MusicAssistantError): + """Error thrown when a resource is temporarily unavailable.""" + + error_code = 17 diff --git a/music_assistant/server/helpers/throttle_retry.py b/music_assistant/server/helpers/throttle_retry.py new file mode 100644 index 00000000..54dcc255 --- /dev/null +++ b/music_assistant/server/helpers/throttle_retry.py @@ -0,0 +1,53 @@ +"""Context manager using asyncio_throttle that catches and re-raises RetriesExhausted.""" + +import asyncio +import logging + +from asyncio_throttle import Throttler + +from music_assistant.common.models.errors import ( + ResourceTemporarilyUnavailable, + RetriesExhausted, +) +from music_assistant.constants import MASS_LOGGER_NAME + +LOGGER = logging.getLogger(f"{MASS_LOGGER_NAME}.throttle_retry") + + +class AsyncThrottleWithRetryContextManager: + """Context manager using asyncio_throttle that catches and re-raises RetriesExhausted.""" + + def __init__(self, rate_limit, period, retry_attempts=5, initial_backoff=5): + """Initialize the AsyncThrottledContextManager.""" + self.rate_limit = rate_limit + self.period = period + self.retry_attempts = retry_attempts + self.initial_backoff = initial_backoff + self.throttler = Throttler(rate_limit=rate_limit, period=period) + + async def __aenter__(self): + """Acquire the throttle when entering the async context.""" + await self.throttler.acquire() + return self + + async def __aexit__(self, exc_type, exc, tb): + """Release the throttle. If a RetriesExhausted occurs, re-raise it.""" + self.throttler.flush() + if isinstance(exc, RetriesExhausted): + raise exc + + async def wrapped_function_with_retry(self, func, *args, **kwargs): + """Async function wrapper with retry logic.""" + backoff_time = self.initial_backoff + for attempt in range(self.retry_attempts): + try: + return await func(*args, **kwargs) + except ResourceTemporarilyUnavailable as e: + LOGGER.warning(f"Attempt {attempt + 1}/{self.retry_attempts} failed: {e}") + if attempt < self.retry_attempts - 1: + LOGGER.warning(f"Retrying in {backoff_time} seconds...") + await asyncio.sleep(backoff_time) + backoff_time *= 2 + else: # noqa: PLW0120 + msg = f"Retries exhausted, failed after {self.retry_attempts} attempts" + raise RetriesExhausted(msg) diff --git a/music_assistant/server/providers/tidal/__init__.py b/music_assistant/server/providers/tidal/__init__.py index 428d181f..59cb8dfb 100644 --- a/music_assistant/server/providers/tidal/__init__.py +++ b/music_assistant/server/providers/tidal/__init__.py @@ -7,7 +7,6 @@ from contextlib import suppress from datetime import datetime, timedelta from typing import TYPE_CHECKING, Any, cast -from asyncio_throttle import Throttler from tidalapi import Album as TidalAlbum from tidalapi import Artist as TidalArtist from tidalapi import Config as TidalConfig @@ -31,7 +30,10 @@ from music_assistant.common.models.enums import ( ProviderFeature, StreamType, ) -from music_assistant.common.models.errors import LoginFailed, MediaNotFoundError +from music_assistant.common.models.errors import ( + LoginFailed, + MediaNotFoundError, +) from music_assistant.common.models.media_items import ( Album, AlbumTrack, @@ -40,7 +42,6 @@ from music_assistant.common.models.media_items import ( ContentType, ItemMapping, MediaItemImage, - MediaItemType, Playlist, PlaylistTrack, ProviderMapping, @@ -50,6 +51,9 @@ from music_assistant.common.models.media_items import ( from music_assistant.common.models.streamdetails import StreamDetails from music_assistant.server.helpers.auth import AuthenticationHelper from music_assistant.server.helpers.tags import AudioTags, parse_tags +from music_assistant.server.helpers.throttle_retry import ( + AsyncThrottleWithRetryContextManager, +) from music_assistant.server.models.music_provider import MusicProvider from .helpers import ( @@ -218,7 +222,9 @@ class TidalProvider(MusicProvider): """Handle async initialization of the provider.""" self._tidal_user_id: str = self.config.get_value(CONF_USER_ID) self._tidal_session = await self._get_tidal_session() - self._throttler = Throttler(rate_limit=1, period=0.1) + self._throttle_retry = AsyncThrottleWithRetryContextManager( + rate_limit=1, period=0.5, retry_attempts=5, initial_backoff=5 + ) @property def supported_features(self) -> tuple[ProviderFeature, ...]: @@ -310,38 +316,42 @@ class TidalProvider(MusicProvider): async def get_album_tracks(self, prov_album_id: str) -> list[AlbumTrack]: """Get album tracks for given album id.""" tidal_session = await self._get_tidal_session() - async with self._throttler: - return cast( - list[AlbumTrack], - [ - self._parse_track( - track_obj=track_obj, - extra_init_kwargs={ - "disc_number": track_obj.volume_num, - "track_number": track_obj.track_num, - }, - ) - for track_obj in await get_album_tracks(tidal_session, prov_album_id) - ], + async with self._throttle_retry as manager: + album_obj = await manager.wrapped_function_with_retry( + get_album, tidal_session, prov_album_id ) + async with self._throttle_retry as manager: + tracks_obj = await manager.wrapped_function_with_retry( + get_album_tracks, tidal_session, prov_album_id + ) + return [ + AlbumTrack.from_track( + track=self._parse_track(track_obj=track_obj), + album=self._parse_album(album_obj=album_obj), + disc_number=track_obj.volume_num, + track_number=track_obj.track_num, + ) + for track_obj in tracks_obj + ] + async def get_artist_albums(self, prov_artist_id: str) -> list[Album]: """Get a list of all albums for the given artist.""" tidal_session = await self._get_tidal_session() - async with self._throttler: - return [ - self._parse_album(album) - for album in await get_artist_albums(tidal_session, prov_artist_id) - ] + async with self._throttle_retry as manager: + artist_albums_obj = await manager.wrapped_function_with_retry( + get_artist_albums, tidal_session, prov_artist_id + ) + return [self._parse_album(album) for album in artist_albums_obj] async def get_artist_toptracks(self, prov_artist_id: str) -> list[Track]: """Get a list of 10 most popular tracks for the given artist.""" tidal_session = await self._get_tidal_session() - async with self._throttler: - return [ - self._parse_track(track) - for track in await get_artist_toptracks(tidal_session, prov_artist_id) - ] + async with self._throttle_retry as manager: + artist_toptracks_obj = await manager.wrapped_function_with_retry( + get_artist_toptracks, tidal_session, prov_artist_id + ) + return [self._parse_track(track) for track in artist_toptracks_obj] async def get_playlist_tracks( self, prov_playlist_id: str @@ -354,29 +364,28 @@ class TidalProvider(MusicProvider): get_playlist_tracks, tidal_session, prov_playlist_id, limit=DEFAULT_LIMIT ): total_playlist_tracks += 1 - track = self._parse_track( - track_obj=track_obj, - extra_init_kwargs={"position": total_playlist_tracks}, + track = PlaylistTrack.from_track( + self._parse_track(track_obj=track_obj), total_playlist_tracks ) yield track async def get_similar_tracks(self, prov_track_id: str, limit: int = 25) -> list[Track]: """Get similar tracks for given track id.""" tidal_session = await self._get_tidal_session() - async with self._throttler: - return [ - self._parse_track(track) - for track in await get_similar_tracks(tidal_session, prov_track_id, limit) - ] + async with self._throttle_retry as manager: + similar_tracks_obj = await manager.wrapped_function_with_retry( + get_similar_tracks, tidal_session, prov_track_id, limit + ) + return [self._parse_track(track) for track in similar_tracks_obj] - async def library_add(self, item: MediaItemType) -> bool: + async def library_add(self, prov_item_id: str, media_type: MediaType) -> bool: """Add item to library.""" tidal_session = await self._get_tidal_session() return await library_items_add_remove( tidal_session, str(self._tidal_user_id), - item.item_id, - item.media_type, + prov_item_id, + media_type, add=True, ) @@ -451,38 +460,43 @@ class TidalProvider(MusicProvider): async def get_artist(self, prov_artist_id: str) -> Artist: """Get artist details for given artist id.""" tidal_session = await self._get_tidal_session() - async with self._throttler: - return self._parse_artist( - artist_obj=await get_artist(tidal_session, prov_artist_id), + async with self._throttle_retry as manager: + artist_obj = await manager.wrapped_function_with_retry( + get_artist, tidal_session, prov_artist_id ) + return self._parse_artist(artist_obj) async def get_album(self, prov_album_id: str) -> Album: """Get album details for given album id.""" tidal_session = await self._get_tidal_session() - async with self._throttler: - return self._parse_album( - album_obj=await get_album(tidal_session, prov_album_id), + async with self._throttle_retry as manager: + album_obj = await manager.wrapped_function_with_retry( + get_album, tidal_session, prov_album_id ) + return self._parse_album(album_obj) async def get_track(self, prov_track_id: str) -> Track: """Get track details for given track id.""" tidal_session = await self._get_tidal_session() - async with self._throttler: - track_obj = await get_track(tidal_session, prov_track_id) + async with self._throttle_retry as manager: + track_obj = await manager.wrapped_function_with_retry( + get_track, tidal_session, prov_track_id + ) track = self._parse_track(track_obj) # get some extra details for the full track info with suppress(tidal_exceptions.MetadataNotAvailable): - lyrics: TidalLyrics = await asyncio.to_thread(track_obj.lyrics) + lyrics: TidalLyrics = await asyncio.to_thread(track.lyrics) track.metadata.lyrics = lyrics.text return track async def get_playlist(self, prov_playlist_id: str) -> Playlist: """Get playlist details for given playlist id.""" tidal_session = await self._get_tidal_session() - async with self._throttler: - return self._parse_playlist( - await get_playlist(tidal_session, prov_playlist_id), + async with self._throttle_retry as manager: + playlist_obj = await manager.wrapped_function_with_retry( + get_playlist, tidal_session, prov_playlist_id ) + return self._parse_playlist(playlist_obj) def get_item_mapping(self, media_type: MediaType, key: str, name: str) -> ItemMapping: """Create a generic item mapping.""" @@ -570,8 +584,7 @@ class TidalProvider(MusicProvider): MediaItemImage( type=ImageType.THUMB, path=image_url, - provider=self.instance_id, - remotely_accessible=True, + provider=self.domain, ) ] @@ -625,8 +638,7 @@ class TidalProvider(MusicProvider): MediaItemImage( type=ImageType.THUMB, path=image_url, - provider=self.instance_id, - remotely_accessible=True, + provider=self.domain, ) ] @@ -694,8 +706,7 @@ class TidalProvider(MusicProvider): MediaItemImage( type=ImageType.THUMB, path=image_url, - provider=self.instance_id, - remotely_accessible=True, + provider=self.domain, ) ] return track @@ -722,7 +733,7 @@ class TidalProvider(MusicProvider): is_editable = bool(creator_id and str(creator_id) == self._tidal_user_id) playlist.is_editable = is_editable # metadata - playlist.metadata.cache_checksum = str(playlist_obj.last_updated) + playlist.metadata.checksum = str(playlist_obj.last_updated) playlist.metadata.popularity = playlist_obj.popularity if picture := (playlist_obj.square_picture or playlist_obj.picture): picture_id = picture.replace("-", "/") @@ -731,8 +742,7 @@ class TidalProvider(MusicProvider): MediaItemImage( type=ImageType.THUMB, path=image_url, - provider=self.instance_id, - remotely_accessible=True, + provider=self.domain, ) ] @@ -743,7 +753,7 @@ class TidalProvider(MusicProvider): ) -> AsyncGenerator[Any, None]: """Yield all items from a larger listing.""" offset = 0 - async with self._throttler: + async with self._throttle_retry: while True: if asyncio.iscoroutinefunction(func): chunk = await func(*args, **kwargs, offset=offset) diff --git a/music_assistant/server/providers/tidal/helpers.py b/music_assistant/server/providers/tidal/helpers.py index 663d7d6b..0fcb64eb 100644 --- a/music_assistant/server/providers/tidal/helpers.py +++ b/music_assistant/server/providers/tidal/helpers.py @@ -24,11 +24,13 @@ from tidalapi.exceptions import ( MetadataNotAvailable, ObjectNotFound, TooManyRequests, - URLNotAvailable, ) from music_assistant.common.models.enums import MediaType -from music_assistant.common.models.errors import MediaNotFoundError +from music_assistant.common.models.errors import ( + MediaNotFoundError, + ResourceTemporarilyUnavailable, +) DEFAULT_LIMIT = 50 LOGGER = logging.getLogger(__name__) @@ -93,9 +95,12 @@ async def get_artist(session: TidalSession, prov_artist_id: str) -> TidalArtist: def inner() -> TidalArtist: try: return TidalArtist(session, prov_artist_id) - except (ObjectNotFound, TooManyRequests) as err: + except ObjectNotFound as err: msg = f"Artist {prov_artist_id} not found" raise MediaNotFoundError(msg) from err + except TooManyRequests: + msg = "Tidal API rate limit reached" + raise ResourceTemporarilyUnavailable(msg) return await asyncio.to_thread(inner) @@ -106,9 +111,12 @@ async def get_artist_albums(session: TidalSession, prov_artist_id: str) -> list[ def inner() -> list[TidalAlbum]: try: artist_obj = TidalArtist(session, prov_artist_id) - except (ObjectNotFound, TooManyRequests) as err: + except ObjectNotFound as err: msg = f"Artist {prov_artist_id} not found" raise MediaNotFoundError(msg) from err + except TooManyRequests: + msg = "Tidal API rate limit reached" + raise ResourceTemporarilyUnavailable(msg) else: all_albums = [] albums = artist_obj.get_albums(limit=DEFAULT_LIMIT) @@ -156,9 +164,12 @@ async def get_album(session: TidalSession, prov_album_id: str) -> TidalAlbum: def inner() -> TidalAlbum: try: return TidalAlbum(session, prov_album_id) - except (ObjectNotFound, TooManyRequests) as err: + except ObjectNotFound as err: msg = f"Album {prov_album_id} not found" raise MediaNotFoundError(msg) from err + except TooManyRequests: + msg = "Tidal API rate limit reached" + raise ResourceTemporarilyUnavailable(msg) return await asyncio.to_thread(inner) @@ -169,9 +180,12 @@ async def get_track(session: TidalSession, prov_track_id: str) -> TidalTrack: def inner() -> TidalTrack: try: return TidalTrack(session, prov_track_id) - except (ObjectNotFound, TooManyRequests) as err: + except ObjectNotFound as err: msg = f"Track {prov_track_id} not found" raise MediaNotFoundError(msg) from err + except TooManyRequests: + msg = "Tidal API rate limit reached" + raise ResourceTemporarilyUnavailable(msg) return await asyncio.to_thread(inner) @@ -183,9 +197,12 @@ async def get_track_url(session: TidalSession, prov_track_id: str) -> str: try: track_url: str = TidalTrack(session, prov_track_id).get_url() return track_url - except (ObjectNotFound, TooManyRequests, URLNotAvailable) as err: + except ObjectNotFound as err: msg = f"Track {prov_track_id} not found" raise MediaNotFoundError(msg) from err + except TooManyRequests: + msg = "Tidal API rate limit reached" + raise ResourceTemporarilyUnavailable(msg) return await asyncio.to_thread(inner) @@ -199,9 +216,12 @@ async def get_album_tracks(session: TidalSession, prov_album_id: str) -> list[Ti limit=DEFAULT_LIMIT ) return tracks - except (ObjectNotFound, TooManyRequests) as err: + except ObjectNotFound as err: msg = f"Album {prov_album_id} not found" raise MediaNotFoundError(msg) from err + except TooManyRequests: + msg = "Tidal API rate limit reached" + raise ResourceTemporarilyUnavailable(msg) return await asyncio.to_thread(inner) @@ -240,9 +260,12 @@ async def get_playlist(session: TidalSession, prov_playlist_id: str) -> TidalPla def inner() -> TidalPlaylist: try: return TidalPlaylist(session, prov_playlist_id) - except (ObjectNotFound, TooManyRequests) as err: + except ObjectNotFound as err: msg = f"Playlist {prov_playlist_id} not found" raise MediaNotFoundError(msg) from err + except TooManyRequests: + msg = "Tidal API rate limit reached" + raise ResourceTemporarilyUnavailable(msg) return await asyncio.to_thread(inner) @@ -261,9 +284,12 @@ async def get_playlist_tracks( limit=limit, offset=offset ) return tracks - except (ObjectNotFound, TooManyRequests) as err: + except ObjectNotFound as err: msg = f"Playlist {prov_playlist_id} not found" raise MediaNotFoundError(msg) from err + except TooManyRequests: + msg = "Tidal API rate limit reached" + raise ResourceTemporarilyUnavailable(msg) return await asyncio.to_thread(inner) @@ -305,9 +331,12 @@ async def get_similar_tracks( limit=limit ) return tracks - except (MetadataNotAvailable, ObjectNotFound, TooManyRequests) as err: + except (MetadataNotAvailable, ObjectNotFound) as err: msg = f"Track {prov_track_id} not found" raise MediaNotFoundError(msg) from err + except TooManyRequests: + msg = "Tidal API rate limit reached" + raise ResourceTemporarilyUnavailable(msg) return await asyncio.to_thread(inner)