Throttler: sleep without busy wait, log delayed calls (#1526)
authorKostas Chatzikokolakis <kostas@chatzi.org>
Mon, 29 Jul 2024 19:01:13 +0000 (22:01 +0300)
committerGitHub <noreply@github.com>
Mon, 29 Jul 2024 19:01:13 +0000 (21:01 +0200)
music_assistant/server/helpers/throttle_retry.py
music_assistant/server/providers/fanarttv/__init__.py
music_assistant/server/providers/theaudiodb/__init__.py
music_assistant/server/providers/tunein/__init__.py
pyproject.toml
requirements_all.txt

index 89b6f6bd74b9a6364ec783d479194cc4e620705b..149b41298116b6efed5207e47868625166e724a5 100644 (file)
@@ -3,11 +3,11 @@
 import asyncio
 import functools
 import logging
+import time
+from collections import deque
 from collections.abc import Awaitable, Callable, Coroutine
 from typing import TYPE_CHECKING, Any, Concatenate, ParamSpec, TypeVar
 
-from asyncio_throttle import Throttler
-
 from music_assistant.common.models.errors import ResourceTemporarilyUnavailable, RetriesExhausted
 from music_assistant.constants import MASS_LOGGER_NAME
 
@@ -20,12 +20,59 @@ _P = ParamSpec("_P")
 LOGGER = logging.getLogger(f"{MASS_LOGGER_NAME}.throttle_retry")
 
 
+class Throttler:
+    """asyncio_throttle (https://github.com/hallazzang/asyncio-throttle).
+
+    With improvements:
+    - Accurate sleep without "busy waiting" (PR #4)
+    - Return the delay caused by acquire()
+    """
+
+    def __init__(self, rate_limit: int, period=1.0):
+        """Initialize the Throttler."""
+        self.rate_limit = rate_limit
+        self.period = period
+
+        self._task_logs: deque[float] = deque()
+
+    def _flush(self):
+        now = time.monotonic()
+        while self._task_logs:
+            if now - self._task_logs[0] > self.period:
+                self._task_logs.popleft()
+            else:
+                break
+
+    async def _acquire(self):
+        cur_time = time.monotonic()
+        start_time = cur_time
+        while True:
+            self._flush()
+            if len(self._task_logs) < self.rate_limit:
+                break
+
+            # sleep the exact amount of time until the oldest task can be flushed
+            time_to_release = self._task_logs[0] + self.period - cur_time
+            await asyncio.sleep(time_to_release)
+            cur_time = time.monotonic()
+
+        self._task_logs.append(cur_time)
+        return cur_time - start_time  # exactly 0 if not throttled
+
+    async def __aenter__(self):
+        """Wait until the lock is acquired, return the time delay."""
+        return await self._acquire()
+
+    async def __aexit__(self, exc_type, exc, tb):
+        """Nothing to do on exit."""
+
+
 class ThrottlerManager(Throttler):
     """Throttler manager that extends asyncio Throttle by retrying."""
 
     def __init__(self, rate_limit: int, period: float = 1, retry_attempts=5, initial_backoff=5):
         """Initialize the AsyncThrottledContextManager."""
-        super().__init__(rate_limit=rate_limit, period=period, retry_interval=0.1)
+        super().__init__(rate_limit=rate_limit, period=period)
         self.retry_attempts = retry_attempts
         self.initial_backoff = initial_backoff
 
@@ -66,7 +113,12 @@ def throttle_with_retries(
         # the trottler attribute must be present on the class
         throttler = self.throttler
         backoff_time = throttler.initial_backoff
-        async with throttler:
+        async with throttler as delay:
+            if delay != 0:
+                self.logger.debug(
+                    "%s was delayed for %.3f secs due to throttling", func.__name__, delay
+                )
+
             for attempt in range(throttler.retry_attempts):
                 try:
                     return await func(self, *args, **kwargs)
index 229f9d90435bb08fd40a378a717b932c94b6c59f..2016cdadf381e95f803c4995c1f4e01622dc18b7 100644 (file)
@@ -6,12 +6,12 @@ from json import JSONDecodeError
 from typing import TYPE_CHECKING
 
 import aiohttp.client_exceptions
-from asyncio_throttle import Throttler
 
 from music_assistant.common.models.enums import ExternalID, ProviderFeature
 from music_assistant.common.models.media_items import ImageType, MediaItemImage, MediaItemMetadata
 from music_assistant.server.controllers.cache import use_cache
 from music_assistant.server.helpers.app_vars import app_var  # pylint: disable=no-name-in-module
+from music_assistant.server.helpers.throttle_retry import Throttler
 from music_assistant.server.models.metadata_provider import MetadataProvider
 
 if TYPE_CHECKING:
index 61c3d72ca9f8219b7ad22440bdfa7c1df8ae880f..a957f457b8d2b4c00e39d96023d01cd83ca860d3 100644 (file)
@@ -6,7 +6,6 @@ from json import JSONDecodeError
 from typing import TYPE_CHECKING, Any, cast
 
 import aiohttp.client_exceptions
-from asyncio_throttle import Throttler
 
 from music_assistant.common.models.enums import ExternalID, ProviderFeature
 from music_assistant.common.models.media_items import (
@@ -24,6 +23,7 @@ from music_assistant.common.models.media_items import (
 from music_assistant.server.controllers.cache import use_cache
 from music_assistant.server.helpers.app_vars import app_var  # type: ignore[attr-defined]
 from music_assistant.server.helpers.compare import compare_strings
+from music_assistant.server.helpers.throttle_retry import Throttler
 from music_assistant.server.models.metadata_provider import MetadataProvider
 
 if TYPE_CHECKING:
index ad51d912d7c2f84592073c305f7b76d57e42d18a..084c111fe2ea7934d23fc77c13f299c9c9e2b825 100644 (file)
@@ -4,8 +4,6 @@ from __future__ import annotations
 
 from typing import TYPE_CHECKING
 
-from asyncio_throttle import Throttler
-
 from music_assistant.common.models.config_entries import ConfigEntry, ConfigValueType
 from music_assistant.common.models.enums import ConfigEntryType, ProviderFeature, StreamType
 from music_assistant.common.models.errors import InvalidDataError, LoginFailed, MediaNotFoundError
@@ -20,6 +18,7 @@ from music_assistant.common.models.media_items import (
 )
 from music_assistant.common.models.streamdetails import StreamDetails
 from music_assistant.constants import CONF_USERNAME
+from music_assistant.server.helpers.throttle_retry import Throttler
 from music_assistant.server.models.music_provider import MusicProvider
 
 SUPPORTED_FEATURES = (
index 428c2d51b9d71866af79f91327a438e57aab1dcf..af6fa9e1d2b0d1c93525e9389ca48b5f823dd823 100644 (file)
@@ -22,7 +22,6 @@ server = [
   "aiodns>=3.0.0",
   "Brotli>=1.0.9",
   "aiohttp==3.9.5",
-  "asyncio-throttle==1.0.2",
   "aiofiles==24.1.0",
   "aiorun==2024.5.1",
   "certifi==2024.7.4",
index 4facbcf31d716742e887ea89752856f7df6315ea..989cea91b39e89515a4340a757355b51ebf7ec93 100644 (file)
@@ -9,7 +9,6 @@ aiorun==2024.5.1
 aioslimproto==3.0.1
 aiosqlite==0.20.0
 async-upnp-client==0.39.0
-asyncio-throttle==1.0.2
 bidict==0.23.1
 certifi==2024.7.4
 colorlog==6.8.2