import asyncio
import functools
import logging
+import time
+from collections import deque
from collections.abc import Awaitable, Callable, Coroutine
from typing import TYPE_CHECKING, Any, Concatenate, ParamSpec, TypeVar
-from asyncio_throttle import Throttler
-
from music_assistant.common.models.errors import ResourceTemporarilyUnavailable, RetriesExhausted
from music_assistant.constants import MASS_LOGGER_NAME
LOGGER = logging.getLogger(f"{MASS_LOGGER_NAME}.throttle_retry")
+class Throttler:
+ """asyncio_throttle (https://github.com/hallazzang/asyncio-throttle).
+
+ With improvements:
+ - Accurate sleep without "busy waiting" (PR #4)
+ - Return the delay caused by acquire()
+ """
+
+ def __init__(self, rate_limit: int, period=1.0):
+ """Initialize the Throttler."""
+ self.rate_limit = rate_limit
+ self.period = period
+
+ self._task_logs: deque[float] = deque()
+
+ def _flush(self):
+ now = time.monotonic()
+ while self._task_logs:
+ if now - self._task_logs[0] > self.period:
+ self._task_logs.popleft()
+ else:
+ break
+
+ async def _acquire(self):
+ cur_time = time.monotonic()
+ start_time = cur_time
+ while True:
+ self._flush()
+ if len(self._task_logs) < self.rate_limit:
+ break
+
+ # sleep the exact amount of time until the oldest task can be flushed
+ time_to_release = self._task_logs[0] + self.period - cur_time
+ await asyncio.sleep(time_to_release)
+ cur_time = time.monotonic()
+
+ self._task_logs.append(cur_time)
+ return cur_time - start_time # exactly 0 if not throttled
+
+ async def __aenter__(self):
+ """Wait until the lock is acquired, return the time delay."""
+ return await self._acquire()
+
+ async def __aexit__(self, exc_type, exc, tb):
+ """Nothing to do on exit."""
+
+
class ThrottlerManager(Throttler):
"""Throttler manager that extends asyncio Throttle by retrying."""
def __init__(self, rate_limit: int, period: float = 1, retry_attempts=5, initial_backoff=5):
"""Initialize the AsyncThrottledContextManager."""
- super().__init__(rate_limit=rate_limit, period=period, retry_interval=0.1)
+ super().__init__(rate_limit=rate_limit, period=period)
self.retry_attempts = retry_attempts
self.initial_backoff = initial_backoff
# the trottler attribute must be present on the class
throttler = self.throttler
backoff_time = throttler.initial_backoff
- async with throttler:
+ async with throttler as delay:
+ if delay != 0:
+ self.logger.debug(
+ "%s was delayed for %.3f secs due to throttling", func.__name__, delay
+ )
+
for attempt in range(throttler.retry_attempts):
try:
return await func(self, *args, **kwargs)
from typing import TYPE_CHECKING
import aiohttp.client_exceptions
-from asyncio_throttle import Throttler
from music_assistant.common.models.enums import ExternalID, ProviderFeature
from music_assistant.common.models.media_items import ImageType, MediaItemImage, MediaItemMetadata
from music_assistant.server.controllers.cache import use_cache
from music_assistant.server.helpers.app_vars import app_var # pylint: disable=no-name-in-module
+from music_assistant.server.helpers.throttle_retry import Throttler
from music_assistant.server.models.metadata_provider import MetadataProvider
if TYPE_CHECKING:
from typing import TYPE_CHECKING, Any, cast
import aiohttp.client_exceptions
-from asyncio_throttle import Throttler
from music_assistant.common.models.enums import ExternalID, ProviderFeature
from music_assistant.common.models.media_items import (
from music_assistant.server.controllers.cache import use_cache
from music_assistant.server.helpers.app_vars import app_var # type: ignore[attr-defined]
from music_assistant.server.helpers.compare import compare_strings
+from music_assistant.server.helpers.throttle_retry import Throttler
from music_assistant.server.models.metadata_provider import MetadataProvider
if TYPE_CHECKING:
from typing import TYPE_CHECKING
-from asyncio_throttle import Throttler
-
from music_assistant.common.models.config_entries import ConfigEntry, ConfigValueType
from music_assistant.common.models.enums import ConfigEntryType, ProviderFeature, StreamType
from music_assistant.common.models.errors import InvalidDataError, LoginFailed, MediaNotFoundError
)
from music_assistant.common.models.streamdetails import StreamDetails
from music_assistant.constants import CONF_USERNAME
+from music_assistant.server.helpers.throttle_retry import Throttler
from music_assistant.server.models.music_provider import MusicProvider
SUPPORTED_FEATURES = (