429 backoff implementation (#1230)
authorJozef Kruszynski <60214390+jozefKruszynski@users.noreply.github.com>
Sun, 21 Apr 2024 09:35:29 +0000 (11:35 +0200)
committerGitHub <noreply@github.com>
Sun, 21 Apr 2024 09:35:29 +0000 (11:35 +0200)
music_assistant/common/models/errors.py
music_assistant/server/helpers/throttle_retry.py [new file with mode: 0644]
music_assistant/server/providers/tidal/__init__.py
music_assistant/server/providers/tidal/helpers.py

index ca8f3bfe753e4e634cdae4cfe31e748f6634d1eb..a641a98a291043e61f88606728706962fa7cd676 100644 (file)
@@ -104,3 +104,15 @@ class InvalidProviderID(MusicAssistantError):
     """Error thrown when a provider media item identifier does not match a known format."""
 
     error_code = 15
+
+
+class RetriesExhausted(MusicAssistantError):
+    """Error thrown when a retries to a given provider URI have been exhausted."""
+
+    error_code = 16
+
+
+class ResourceTemporarilyUnavailable(MusicAssistantError):
+    """Error thrown when a resource is temporarily unavailable."""
+
+    error_code = 17
diff --git a/music_assistant/server/helpers/throttle_retry.py b/music_assistant/server/helpers/throttle_retry.py
new file mode 100644 (file)
index 0000000..54dcc25
--- /dev/null
@@ -0,0 +1,53 @@
+"""Context manager using asyncio_throttle that catches and re-raises RetriesExhausted."""
+
+import asyncio
+import logging
+
+from asyncio_throttle import Throttler
+
+from music_assistant.common.models.errors import (
+    ResourceTemporarilyUnavailable,
+    RetriesExhausted,
+)
+from music_assistant.constants import MASS_LOGGER_NAME
+
+LOGGER = logging.getLogger(f"{MASS_LOGGER_NAME}.throttle_retry")
+
+
+class AsyncThrottleWithRetryContextManager:
+    """Context manager using asyncio_throttle that catches and re-raises RetriesExhausted."""
+
+    def __init__(self, rate_limit, period, retry_attempts=5, initial_backoff=5):
+        """Initialize the AsyncThrottledContextManager."""
+        self.rate_limit = rate_limit
+        self.period = period
+        self.retry_attempts = retry_attempts
+        self.initial_backoff = initial_backoff
+        self.throttler = Throttler(rate_limit=rate_limit, period=period)
+
+    async def __aenter__(self):
+        """Acquire the throttle when entering the async context."""
+        await self.throttler.acquire()
+        return self
+
+    async def __aexit__(self, exc_type, exc, tb):
+        """Release the throttle. If a RetriesExhausted occurs, re-raise it."""
+        self.throttler.flush()
+        if isinstance(exc, RetriesExhausted):
+            raise exc
+
+    async def wrapped_function_with_retry(self, func, *args, **kwargs):
+        """Async function wrapper with retry logic."""
+        backoff_time = self.initial_backoff
+        for attempt in range(self.retry_attempts):
+            try:
+                return await func(*args, **kwargs)
+            except ResourceTemporarilyUnavailable as e:
+                LOGGER.warning(f"Attempt {attempt + 1}/{self.retry_attempts} failed: {e}")
+                if attempt < self.retry_attempts - 1:
+                    LOGGER.warning(f"Retrying in {backoff_time} seconds...")
+                    await asyncio.sleep(backoff_time)
+                    backoff_time *= 2
+        else:  # noqa: PLW0120
+            msg = f"Retries exhausted, failed after {self.retry_attempts} attempts"
+            raise RetriesExhausted(msg)
index 428d181ff165c161c99c348b543d11828840080a..59cb8dfb9be4cb93b7207658d1472f637251bd44 100644 (file)
@@ -7,7 +7,6 @@ from contextlib import suppress
 from datetime import datetime, timedelta
 from typing import TYPE_CHECKING, Any, cast
 
-from asyncio_throttle import Throttler
 from tidalapi import Album as TidalAlbum
 from tidalapi import Artist as TidalArtist
 from tidalapi import Config as TidalConfig
@@ -31,7 +30,10 @@ from music_assistant.common.models.enums import (
     ProviderFeature,
     StreamType,
 )
-from music_assistant.common.models.errors import LoginFailed, MediaNotFoundError
+from music_assistant.common.models.errors import (
+    LoginFailed,
+    MediaNotFoundError,
+)
 from music_assistant.common.models.media_items import (
     Album,
     AlbumTrack,
@@ -40,7 +42,6 @@ from music_assistant.common.models.media_items import (
     ContentType,
     ItemMapping,
     MediaItemImage,
-    MediaItemType,
     Playlist,
     PlaylistTrack,
     ProviderMapping,
@@ -50,6 +51,9 @@ from music_assistant.common.models.media_items import (
 from music_assistant.common.models.streamdetails import StreamDetails
 from music_assistant.server.helpers.auth import AuthenticationHelper
 from music_assistant.server.helpers.tags import AudioTags, parse_tags
+from music_assistant.server.helpers.throttle_retry import (
+    AsyncThrottleWithRetryContextManager,
+)
 from music_assistant.server.models.music_provider import MusicProvider
 
 from .helpers import (
@@ -218,7 +222,9 @@ class TidalProvider(MusicProvider):
         """Handle async initialization of the provider."""
         self._tidal_user_id: str = self.config.get_value(CONF_USER_ID)
         self._tidal_session = await self._get_tidal_session()
-        self._throttler = Throttler(rate_limit=1, period=0.1)
+        self._throttle_retry = AsyncThrottleWithRetryContextManager(
+            rate_limit=1, period=0.5, retry_attempts=5, initial_backoff=5
+        )
 
     @property
     def supported_features(self) -> tuple[ProviderFeature, ...]:
@@ -310,38 +316,42 @@ class TidalProvider(MusicProvider):
     async def get_album_tracks(self, prov_album_id: str) -> list[AlbumTrack]:
         """Get album tracks for given album id."""
         tidal_session = await self._get_tidal_session()
-        async with self._throttler:
-            return cast(
-                list[AlbumTrack],
-                [
-                    self._parse_track(
-                        track_obj=track_obj,
-                        extra_init_kwargs={
-                            "disc_number": track_obj.volume_num,
-                            "track_number": track_obj.track_num,
-                        },
-                    )
-                    for track_obj in await get_album_tracks(tidal_session, prov_album_id)
-                ],
+        async with self._throttle_retry as manager:
+            album_obj = await manager.wrapped_function_with_retry(
+                get_album, tidal_session, prov_album_id
             )
 
+        async with self._throttle_retry as manager:
+            tracks_obj = await manager.wrapped_function_with_retry(
+                get_album_tracks, tidal_session, prov_album_id
+            )
+            return [
+                AlbumTrack.from_track(
+                    track=self._parse_track(track_obj=track_obj),
+                    album=self._parse_album(album_obj=album_obj),
+                    disc_number=track_obj.volume_num,
+                    track_number=track_obj.track_num,
+                )
+                for track_obj in tracks_obj
+            ]
+
     async def get_artist_albums(self, prov_artist_id: str) -> list[Album]:
         """Get a list of all albums for the given artist."""
         tidal_session = await self._get_tidal_session()
-        async with self._throttler:
-            return [
-                self._parse_album(album)
-                for album in await get_artist_albums(tidal_session, prov_artist_id)
-            ]
+        async with self._throttle_retry as manager:
+            artist_albums_obj = await manager.wrapped_function_with_retry(
+                get_artist_albums, tidal_session, prov_artist_id
+            )
+            return [self._parse_album(album) for album in artist_albums_obj]
 
     async def get_artist_toptracks(self, prov_artist_id: str) -> list[Track]:
         """Get a list of 10 most popular tracks for the given artist."""
         tidal_session = await self._get_tidal_session()
-        async with self._throttler:
-            return [
-                self._parse_track(track)
-                for track in await get_artist_toptracks(tidal_session, prov_artist_id)
-            ]
+        async with self._throttle_retry as manager:
+            artist_toptracks_obj = await manager.wrapped_function_with_retry(
+                get_artist_toptracks, tidal_session, prov_artist_id
+            )
+            return [self._parse_track(track) for track in artist_toptracks_obj]
 
     async def get_playlist_tracks(
         self, prov_playlist_id: str
@@ -354,29 +364,28 @@ class TidalProvider(MusicProvider):
             get_playlist_tracks, tidal_session, prov_playlist_id, limit=DEFAULT_LIMIT
         ):
             total_playlist_tracks += 1
-            track = self._parse_track(
-                track_obj=track_obj,
-                extra_init_kwargs={"position": total_playlist_tracks},
+            track = PlaylistTrack.from_track(
+                self._parse_track(track_obj=track_obj), total_playlist_tracks
             )
             yield track
 
     async def get_similar_tracks(self, prov_track_id: str, limit: int = 25) -> list[Track]:
         """Get similar tracks for given track id."""
         tidal_session = await self._get_tidal_session()
-        async with self._throttler:
-            return [
-                self._parse_track(track)
-                for track in await get_similar_tracks(tidal_session, prov_track_id, limit)
-            ]
+        async with self._throttle_retry as manager:
+            similar_tracks_obj = await manager.wrapped_function_with_retry(
+                get_similar_tracks, tidal_session, prov_track_id, limit
+            )
+            return [self._parse_track(track) for track in similar_tracks_obj]
 
-    async def library_add(self, item: MediaItemType) -> bool:
+    async def library_add(self, prov_item_id: str, media_type: MediaType) -> bool:
         """Add item to library."""
         tidal_session = await self._get_tidal_session()
         return await library_items_add_remove(
             tidal_session,
             str(self._tidal_user_id),
-            item.item_id,
-            item.media_type,
+            prov_item_id,
+            media_type,
             add=True,
         )
 
@@ -451,38 +460,43 @@ class TidalProvider(MusicProvider):
     async def get_artist(self, prov_artist_id: str) -> Artist:
         """Get artist details for given artist id."""
         tidal_session = await self._get_tidal_session()
-        async with self._throttler:
-            return self._parse_artist(
-                artist_obj=await get_artist(tidal_session, prov_artist_id),
+        async with self._throttle_retry as manager:
+            artist_obj = await manager.wrapped_function_with_retry(
+                get_artist, tidal_session, prov_artist_id
             )
+            return self._parse_artist(artist_obj)
 
     async def get_album(self, prov_album_id: str) -> Album:
         """Get album details for given album id."""
         tidal_session = await self._get_tidal_session()
-        async with self._throttler:
-            return self._parse_album(
-                album_obj=await get_album(tidal_session, prov_album_id),
+        async with self._throttle_retry as manager:
+            album_obj = await manager.wrapped_function_with_retry(
+                get_album, tidal_session, prov_album_id
             )
+            return self._parse_album(album_obj)
 
     async def get_track(self, prov_track_id: str) -> Track:
         """Get track details for given track id."""
         tidal_session = await self._get_tidal_session()
-        async with self._throttler:
-            track_obj = await get_track(tidal_session, prov_track_id)
+        async with self._throttle_retry as manager:
+            track_obj = await manager.wrapped_function_with_retry(
+                get_track, tidal_session, prov_track_id
+            )
             track = self._parse_track(track_obj)
             # get some extra details for the full track info
             with suppress(tidal_exceptions.MetadataNotAvailable):
-                lyrics: TidalLyrics = await asyncio.to_thread(track_obj.lyrics)
+                lyrics: TidalLyrics = await asyncio.to_thread(track.lyrics)
                 track.metadata.lyrics = lyrics.text
             return track
 
     async def get_playlist(self, prov_playlist_id: str) -> Playlist:
         """Get playlist details for given playlist id."""
         tidal_session = await self._get_tidal_session()
-        async with self._throttler:
-            return self._parse_playlist(
-                await get_playlist(tidal_session, prov_playlist_id),
+        async with self._throttle_retry as manager:
+            playlist_obj = await manager.wrapped_function_with_retry(
+                get_playlist, tidal_session, prov_playlist_id
             )
+            return self._parse_playlist(playlist_obj)
 
     def get_item_mapping(self, media_type: MediaType, key: str, name: str) -> ItemMapping:
         """Create a generic item mapping."""
@@ -570,8 +584,7 @@ class TidalProvider(MusicProvider):
                 MediaItemImage(
                     type=ImageType.THUMB,
                     path=image_url,
-                    provider=self.instance_id,
-                    remotely_accessible=True,
+                    provider=self.domain,
                 )
             ]
 
@@ -625,8 +638,7 @@ class TidalProvider(MusicProvider):
                 MediaItemImage(
                     type=ImageType.THUMB,
                     path=image_url,
-                    provider=self.instance_id,
-                    remotely_accessible=True,
+                    provider=self.domain,
                 )
             ]
 
@@ -694,8 +706,7 @@ class TidalProvider(MusicProvider):
                     MediaItemImage(
                         type=ImageType.THUMB,
                         path=image_url,
-                        provider=self.instance_id,
-                        remotely_accessible=True,
+                        provider=self.domain,
                     )
                 ]
         return track
@@ -722,7 +733,7 @@ class TidalProvider(MusicProvider):
         is_editable = bool(creator_id and str(creator_id) == self._tidal_user_id)
         playlist.is_editable = is_editable
         # metadata
-        playlist.metadata.cache_checksum = str(playlist_obj.last_updated)
+        playlist.metadata.checksum = str(playlist_obj.last_updated)
         playlist.metadata.popularity = playlist_obj.popularity
         if picture := (playlist_obj.square_picture or playlist_obj.picture):
             picture_id = picture.replace("-", "/")
@@ -731,8 +742,7 @@ class TidalProvider(MusicProvider):
                 MediaItemImage(
                     type=ImageType.THUMB,
                     path=image_url,
-                    provider=self.instance_id,
-                    remotely_accessible=True,
+                    provider=self.domain,
                 )
             ]
 
@@ -743,7 +753,7 @@ class TidalProvider(MusicProvider):
     ) -> AsyncGenerator[Any, None]:
         """Yield all items from a larger listing."""
         offset = 0
-        async with self._throttler:
+        async with self._throttle_retry:
             while True:
                 if asyncio.iscoroutinefunction(func):
                     chunk = await func(*args, **kwargs, offset=offset)
index 663d7d6b2b42d6b4ace2bc5ac8860b7e2bf5ab15..0fcb64eb1612f7b1b523af0e9a12754bbe1dd281 100644 (file)
@@ -24,11 +24,13 @@ from tidalapi.exceptions import (
     MetadataNotAvailable,
     ObjectNotFound,
     TooManyRequests,
-    URLNotAvailable,
 )
 
 from music_assistant.common.models.enums import MediaType
-from music_assistant.common.models.errors import MediaNotFoundError
+from music_assistant.common.models.errors import (
+    MediaNotFoundError,
+    ResourceTemporarilyUnavailable,
+)
 
 DEFAULT_LIMIT = 50
 LOGGER = logging.getLogger(__name__)
@@ -93,9 +95,12 @@ async def get_artist(session: TidalSession, prov_artist_id: str) -> TidalArtist:
     def inner() -> TidalArtist:
         try:
             return TidalArtist(session, prov_artist_id)
-        except (ObjectNotFound, TooManyRequests) as err:
+        except ObjectNotFound as err:
             msg = f"Artist {prov_artist_id} not found"
             raise MediaNotFoundError(msg) from err
+        except TooManyRequests:
+            msg = "Tidal API rate limit reached"
+            raise ResourceTemporarilyUnavailable(msg)
 
     return await asyncio.to_thread(inner)
 
@@ -106,9 +111,12 @@ async def get_artist_albums(session: TidalSession, prov_artist_id: str) -> list[
     def inner() -> list[TidalAlbum]:
         try:
             artist_obj = TidalArtist(session, prov_artist_id)
-        except (ObjectNotFound, TooManyRequests) as err:
+        except ObjectNotFound as err:
             msg = f"Artist {prov_artist_id} not found"
             raise MediaNotFoundError(msg) from err
+        except TooManyRequests:
+            msg = "Tidal API rate limit reached"
+            raise ResourceTemporarilyUnavailable(msg)
         else:
             all_albums = []
             albums = artist_obj.get_albums(limit=DEFAULT_LIMIT)
@@ -156,9 +164,12 @@ async def get_album(session: TidalSession, prov_album_id: str) -> TidalAlbum:
     def inner() -> TidalAlbum:
         try:
             return TidalAlbum(session, prov_album_id)
-        except (ObjectNotFound, TooManyRequests) as err:
+        except ObjectNotFound as err:
             msg = f"Album {prov_album_id} not found"
             raise MediaNotFoundError(msg) from err
+        except TooManyRequests:
+            msg = "Tidal API rate limit reached"
+            raise ResourceTemporarilyUnavailable(msg)
 
     return await asyncio.to_thread(inner)
 
@@ -169,9 +180,12 @@ async def get_track(session: TidalSession, prov_track_id: str) -> TidalTrack:
     def inner() -> TidalTrack:
         try:
             return TidalTrack(session, prov_track_id)
-        except (ObjectNotFound, TooManyRequests) as err:
+        except ObjectNotFound as err:
             msg = f"Track {prov_track_id} not found"
             raise MediaNotFoundError(msg) from err
+        except TooManyRequests:
+            msg = "Tidal API rate limit reached"
+            raise ResourceTemporarilyUnavailable(msg)
 
     return await asyncio.to_thread(inner)
 
@@ -183,9 +197,12 @@ async def get_track_url(session: TidalSession, prov_track_id: str) -> str:
         try:
             track_url: str = TidalTrack(session, prov_track_id).get_url()
             return track_url
-        except (ObjectNotFound, TooManyRequests, URLNotAvailable) as err:
+        except ObjectNotFound as err:
             msg = f"Track {prov_track_id} not found"
             raise MediaNotFoundError(msg) from err
+        except TooManyRequests:
+            msg = "Tidal API rate limit reached"
+            raise ResourceTemporarilyUnavailable(msg)
 
     return await asyncio.to_thread(inner)
 
@@ -199,9 +216,12 @@ async def get_album_tracks(session: TidalSession, prov_album_id: str) -> list[Ti
                 limit=DEFAULT_LIMIT
             )
             return tracks
-        except (ObjectNotFound, TooManyRequests) as err:
+        except ObjectNotFound as err:
             msg = f"Album {prov_album_id} not found"
             raise MediaNotFoundError(msg) from err
+        except TooManyRequests:
+            msg = "Tidal API rate limit reached"
+            raise ResourceTemporarilyUnavailable(msg)
 
     return await asyncio.to_thread(inner)
 
@@ -240,9 +260,12 @@ async def get_playlist(session: TidalSession, prov_playlist_id: str) -> TidalPla
     def inner() -> TidalPlaylist:
         try:
             return TidalPlaylist(session, prov_playlist_id)
-        except (ObjectNotFound, TooManyRequests) as err:
+        except ObjectNotFound as err:
             msg = f"Playlist {prov_playlist_id} not found"
             raise MediaNotFoundError(msg) from err
+        except TooManyRequests:
+            msg = "Tidal API rate limit reached"
+            raise ResourceTemporarilyUnavailable(msg)
 
     return await asyncio.to_thread(inner)
 
@@ -261,9 +284,12 @@ async def get_playlist_tracks(
                 limit=limit, offset=offset
             )
             return tracks
-        except (ObjectNotFound, TooManyRequests) as err:
+        except ObjectNotFound as err:
             msg = f"Playlist {prov_playlist_id} not found"
             raise MediaNotFoundError(msg) from err
+        except TooManyRequests:
+            msg = "Tidal API rate limit reached"
+            raise ResourceTemporarilyUnavailable(msg)
 
     return await asyncio.to_thread(inner)
 
@@ -305,9 +331,12 @@ async def get_similar_tracks(
                 limit=limit
             )
             return tracks
-        except (MetadataNotAvailable, ObjectNotFound, TooManyRequests) as err:
+        except (MetadataNotAvailable, ObjectNotFound) as err:
             msg = f"Track {prov_track_id} not found"
             raise MediaNotFoundError(msg) from err
+        except TooManyRequests:
+            msg = "Tidal API rate limit reached"
+            raise ResourceTemporarilyUnavailable(msg)
 
     return await asyncio.to_thread(inner)