from typing import TYPE_CHECKING
import aiohttp
-from asyncio_throttle import Throttler
+from music_assistant.common.helpers.json import json_loads
from music_assistant.common.helpers.util import parse_title_and_version, try_parse_int
from music_assistant.common.models.config_entries import ConfigEntry, ConfigValueType
from music_assistant.common.models.enums import (
ProviderFeature,
StreamType,
)
-from music_assistant.common.models.errors import LoginFailed, MediaNotFoundError
+from music_assistant.common.models.errors import (
+ LoginFailed,
+ MediaNotFoundError,
+ ResourceTemporarilyUnavailable,
+)
from music_assistant.common.models.media_items import (
Album,
AlbumType,
from music_assistant.server.helpers.app_vars import app_var
# pylint: enable=no-name-in-module
+from music_assistant.server.helpers.throttle_retry import ThrottlerManager, throttle_with_retries
from music_assistant.server.models.music_provider import MusicProvider
if TYPE_CHECKING:
"""Provider for the Qobux music service."""
_user_auth_info: str | None = None
- _throttler: Throttler
+ # rate limiter needs to be specified on provider-level,
+ # so make it an instance attribute
+ throttler = ThrottlerManager(rate_limit=1, period=1)
async def handle_async_init(self) -> None:
"""Handle async initialization of the provider."""
- self._throttler = Throttler(rate_limit=4, period=1)
-
if not self.config.get_value(CONF_USERNAME) or not self.config.get_value(CONF_PASSWORD):
msg = "Invalid login credentials"
raise LoginFailed(msg)
break
return all_items
+ @throttle_with_retries
async def _get_data(self, endpoint, sign_request=False, **kwargs):
"""Get data from api."""
# pylint: disable=too-many-branches
kwargs["app_id"] = app_var(0)
kwargs["user_auth_token"] = await self._auth_token()
async with (
- self._throttler,
- self.mass.http_session.get(url, headers=headers, params=kwargs, ssl=False) as response,
+ self.mass.http_session.get(url, headers=headers, params=kwargs) as response,
):
- try:
- result = await response.json()
- # check for error in json
- if error := result.get("error"):
- raise ValueError(error)
- if result.get("status") and "error" in result["status"]:
- raise ValueError(result["status"])
- except (
- aiohttp.ContentTypeError,
- JSONDecodeError,
- AssertionError,
- ValueError,
- ) as err:
- text = await response.text()
- self.logger.exception("Error while processing %s: %s", endpoint, text, exc_info=err)
- return None
- return result
-
+ # handle rate limiter
+ if response.status == 429:
+ backoff_time = int(response.headers.get("Retry-After", 0))
+ raise ResourceTemporarilyUnavailable("Rate Limiter", backoff_time=backoff_time)
+ # handle temporary server error
+ if response.status in (502, 503):
+ raise ResourceTemporarilyUnavailable(backoff_time=30)
+ # handle 404 not found, convert to MediaNotFoundError
+ if response.status == 404:
+ raise MediaNotFoundError(f"{endpoint} not found")
+ response.raise_for_status()
+ return await response.json(loads=json_loads)
+
+ @throttle_with_retries
async def _post_data(self, endpoint, params=None, data=None):
"""Post data to api."""
if not params:
async with self.mass.http_session.post(
url, params=params, json=data, ssl=False
) as response:
- try:
- result = await response.json()
- # check for error in json
- if error := result.get("error"):
- raise ValueError(error)
- if result.get("status") and "error" in result["status"]:
- raise ValueError(result["status"])
- except (
- aiohttp.ContentTypeError,
- JSONDecodeError,
- AssertionError,
- ValueError,
- ):
- text = await response.text()
- self.logger.error("Error while processing %s: %s", endpoint, text)
- return None
- return result
+ # handle rate limiter
+ if response.status == 429:
+ backoff_time = int(response.headers.get("Retry-After", 0))
+ raise ResourceTemporarilyUnavailable("Rate Limiter", backoff_time=backoff_time)
+ # handle temporary server error
+ if response.status in (502, 503):
+ raise ResourceTemporarilyUnavailable(backoff_time=30)
+ # handle 404 not found, convert to MediaNotFoundError
+ if response.status == 404:
+ raise MediaNotFoundError(f"{endpoint} not found")
+ response.raise_for_status()
+ return await response.json(loads=json_loads)
def __get_image(self, obj: dict) -> str | None:
"""Try to parse image from Qobuz media object."""