class ResourceTemporarilyUnavailable(MusicAssistantError):
"""Error thrown when a resource is temporarily unavailable."""
+ def __init__(self, *args, backoff_time: int = 0) -> None:
+ """Initialize."""
+ super().__init__(*args)
+ self.backoff_time = backoff_time
+
error_code = 17
# create indexes
await self.database.execute(
- f"CREATE INDEX IF NOT EXISTS {DB_TABLE_CACHE}_key_idx on {DB_TABLE_CACHE}(key);"
+ f"CREATE UNIQUE INDEX IF NOT EXISTS {DB_TABLE_CACHE}_key_idx on {DB_TABLE_CACHE}(key);"
)
+ await self.database.commit()
def __schedule_cleanup_task(self) -> None:
"""Schedule the cleanup task."""
item = await ctrl.get_library_item(library_item_id)
# remove from all providers
for provider_mapping in item.provider_mappings:
- prov_controller = self.mass.get_provider(provider_mapping.provider_instance)
- with suppress(NotImplementedError):
- await prov_controller.library_remove(provider_mapping.item_id, item.media_type)
+ if prov_controller := self.mass.get_provider(provider_mapping.provider_instance):
+ with suppress(NotImplementedError):
+ await prov_controller.library_remove(provider_mapping.item_id, item.media_type)
await ctrl.remove_item_from_library(library_item_id)
@api_command("music/library/add_item")
DB_TABLE_PROVIDER_MAPPINGS,
):
await self.database.execute(f"DROP TABLE IF EXISTS {table}")
+ await self.database.commit()
# recreate missing tables
await self.__create_database_tables()
UNIQUE(album_id, artist_id)
);"""
)
+ await self.database.commit()
async def __create_database_indexes(self) -> None:
"""Create database indexes."""
f"CREATE INDEX IF NOT EXISTS {DB_TABLE_ALBUM_ARTISTS}_artist_id_idx "
f"on {DB_TABLE_ALBUM_ARTISTS}(artist_id);"
)
+ await self.database.commit()
async def __create_database_triggers(self) -> None:
"""Create database triggers."""
END;
"""
)
+ await self.database.commit()
"""Perform async initialization."""
self._db = await aiosqlite.connect(self.db_path)
self._db.row_factory = aiosqlite.Row
+ await self.execute("PRAGMA analysis_limit=400;")
+ await self.execute("PRAGMA optimize;")
+ await self.commit()
async def close(self) -> None:
"""Close db connection on exit."""
+ await self.execute("PRAGMA optimize;")
+ await self.commit()
await self._db.close()
async def get_rows(
"""Execute command on the database."""
return await self._db.execute(query, values)
+ async def commit(self) -> None:
+ """Commit the current transaction."""
+ return await self._db.commit()
+
async def iter_items(
self,
table: str,
from typing import TYPE_CHECKING
import aiofiles
-from PIL import Image
+from aiohttp.client_exceptions import ClientError
+from PIL import Image, UnidentifiedImageError
from music_assistant.server.helpers.tags import get_embedded_image
from music_assistant.server.models.metadata_provider import MetadataProvider
path_or_url = resolved_image
# handle HTTP location
if path_or_url.startswith("http"):
- async with mass.http_session.get(path_or_url) as resp:
- return await resp.read()
+ try:
+ async with mass.http_session.get(path_or_url, raise_for_status=True) as resp:
+ return await resp.read()
+ except ClientError as err:
+ raise FileNotFoundError from err
# handle FILE location (of type image)
if path_or_url.endswith(("jpg", "JPG", "png", "PNG", "jpeg")):
if await asyncio.to_thread(os.path.isfile, path_or_url):
def _create_image():
data = BytesIO()
- img = Image.open(BytesIO(img_data))
+ try:
+ img = Image.open(BytesIO(img_data))
+ except UnidentifiedImageError:
+ raise FileNotFoundError(f"Invalid image: {path_or_url}")
if size:
img.thumbnail((size, size), Image.LANCZOS) # pylint: disable=no-member
"""Context manager using asyncio_throttle that catches and re-raises RetriesExhausted."""
import asyncio
+import functools
import logging
+from collections.abc import Awaitable, Callable, Coroutine
+from typing import TYPE_CHECKING, Any, Concatenate, ParamSpec, TypeVar
from asyncio_throttle import Throttler
-from music_assistant.common.models.errors import (
- ResourceTemporarilyUnavailable,
- RetriesExhausted,
-)
+from music_assistant.common.models.errors import ResourceTemporarilyUnavailable, RetriesExhausted
from music_assistant.constants import MASS_LOGGER_NAME
+if TYPE_CHECKING:
+ from music_assistant.server.models.provider import Provider
+
+_ProviderT = TypeVar("_ProviderT", bound="Provider")
+_R = TypeVar("_R")
+_P = ParamSpec("_P")
LOGGER = logging.getLogger(f"{MASS_LOGGER_NAME}.throttle_retry")
-class AsyncThrottleWithRetryContextManager:
- """Context manager using asyncio_throttle that catches and re-raises RetriesExhausted."""
+class ThrottlerManager(Throttler):
+ """Throttler manager that extends asyncio Throttle by retrying."""
- def __init__(self, rate_limit, period, retry_attempts=5, initial_backoff=5):
+ def __init__(self, rate_limit: int, period: float = 1, retry_attempts=5, initial_backoff=5):
"""Initialize the AsyncThrottledContextManager."""
- self.rate_limit = rate_limit
- self.period = period
+ super().__init__(rate_limit=rate_limit, period=period, retry_interval=0.1)
self.retry_attempts = retry_attempts
self.initial_backoff = initial_backoff
- self.throttler = Throttler(rate_limit=rate_limit, period=period)
-
- async def __aenter__(self):
- """Acquire the throttle when entering the async context."""
- await self.throttler.acquire()
- return self
- async def __aexit__(self, exc_type, exc, tb):
- """Release the throttle. If a RetriesExhausted occurs, re-raise it."""
- self.throttler.flush()
- if isinstance(exc, RetriesExhausted):
- raise exc
-
- async def wrapped_function_with_retry(self, func, *args, **kwargs):
+ async def wrap(
+ self,
+ func: Callable[_P, Awaitable[_R]],
+ *args: _P.args,
+ **kwargs: _P.kwargs,
+ ):
"""Async function wrapper with retry logic."""
backoff_time = self.initial_backoff
for attempt in range(self.retry_attempts):
try:
- return await func(*args, **kwargs)
+ async with self:
+ return await func(self, *args, **kwargs)
except ResourceTemporarilyUnavailable as e:
+ if e.backoff_time:
+ backoff_time = e.backoff_time
LOGGER.warning(f"Attempt {attempt + 1}/{self.retry_attempts} failed: {e}")
if attempt < self.retry_attempts - 1:
LOGGER.warning(f"Retrying in {backoff_time} seconds...")
else: # noqa: PLW0120
msg = f"Retries exhausted, failed after {self.retry_attempts} attempts"
raise RetriesExhausted(msg)
+
+
+def throttle_with_retries(
+ func: Callable[Concatenate[_ProviderT, _P], Awaitable[_R]],
+) -> Callable[Concatenate[_ProviderT, _P], Coroutine[Any, Any, _R | None]]:
+ """Call async function using the throttler with retries."""
+
+ @functools.wraps(func)
+ async def wrapper(self: _ProviderT, *args: _P.args, **kwargs: _P.kwargs) -> _R | None:
+ """Call async function using the throttler with retries."""
+ # the trottler attribute must be present on the class
+ throttler = self.throttler
+ backoff_time = throttler.initial_backoff
+ async with throttler:
+ for attempt in range(throttler.retry_attempts):
+ try:
+ return await func(self, *args, **kwargs)
+ except ResourceTemporarilyUnavailable as e:
+ backoff_time += e.backoff_time
+ self.logger.warning(
+ f"Attempt {attempt + 1}/{throttler.retry_attempts} failed: {e}"
+ )
+ if attempt < throttler.retry_attempts - 1:
+ self.logger.warning(f"Retrying in {backoff_time} seconds...")
+ await asyncio.sleep(backoff_time)
+ backoff_time *= 2
+ else: # noqa: PLW0120
+ msg = f"Retries exhausted, failed after {throttler.retry_attempts} attempts"
+ raise RetriesExhausted(msg)
+
+ return wrapper
from typing import TYPE_CHECKING
from music_assistant.constants import CONF_LOG_LEVEL, MASS_LOGGER_NAME
+from music_assistant.server.helpers.throttle_retry import ThrottlerManager
if TYPE_CHECKING:
from zeroconf import ServiceStateChange
class Provider:
"""Base representation of a Provider implementation within Music Assistant."""
+ throttler: ThrottlerManager # optional throttler
+
def __init__(
self, mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
) -> None:
kwargs["fmt"] = "json" # type: ignore[assignment]
async with (
self.throttler,
- self.mass.http_session.get(url, headers=headers, params=kwargs, ssl=False) as response,
+ self.mass.http_session.get(
+ url, headers=headers, params=kwargs, raise_for_status=True
+ ) as response,
):
try:
result = await response.json()
from tempfile import gettempdir
from typing import TYPE_CHECKING, Any
-from asyncio_throttle import Throttler
-
from music_assistant.common.helpers.json import json_loads
from music_assistant.common.helpers.util import parse_title_and_version
from music_assistant.common.models.config_entries import ConfigEntry, ConfigValueType
ProviderFeature,
StreamType,
)
-from music_assistant.common.models.errors import LoginFailed, MediaNotFoundError
+from music_assistant.common.models.errors import (
+ LoginFailed,
+ MediaNotFoundError,
+ ResourceTemporarilyUnavailable,
+)
from music_assistant.common.models.media_items import (
Album,
AlbumType,
# pylint: enable=no-name-in-module
from music_assistant.server.helpers.audio import get_chunksize
from music_assistant.server.helpers.process import AsyncProcess, check_output
+from music_assistant.server.helpers.throttle_retry import ThrottlerManager, throttle_with_retries
from music_assistant.server.models.music_provider import MusicProvider
if TYPE_CHECKING:
_librespot_bin: str | None = None
# rate limiter needs to be specified on provider-level,
# so make it an instance attribute
- _throttler = Throttler(rate_limit=1, period=1)
+ throttler = ThrottlerManager(rate_limit=1, period=1)
async def handle_async_init(self) -> None:
"""Handle async initialization of the provider."""
break
return all_items
+ @throttle_with_retries
async def _get_data(self, endpoint, **kwargs) -> dict[str, Any]:
"""Get data from api."""
url = f"https://api.spotify.com/v1/{endpoint}"
language = locale.split("-")[0]
headers["Accept-Language"] = f"{locale}, {language};q=0.9, *;q=0.5"
async with (
- self._throttler,
self.mass.http_session.get(
url, headers=headers, params=kwargs, ssl=True, timeout=120
) as response,
# handle spotify rate limiter
if response.status == 429:
backoff_time = int(response.headers["Retry-After"])
- self.logger.debug("Waiting %s seconds on Spotify rate limiter", backoff_time)
- await asyncio.sleep(backoff_time)
- return await self._get_data(endpoint, **kwargs)
- # handle temporary server error
- if response.status == 503:
- self.logger.debug(
- "Request to %s failed with 503 error, retrying in 30 seconds...",
- endpoint,
+ raise ResourceTemporarilyUnavailable(
+ "Spotify Rate Limiter", backoff_time=backoff_time
)
- await asyncio.sleep(30)
- return await self._get_data(endpoint, **kwargs)
+ # handle temporary server error
+ if response.status in (502, 503):
+ raise ResourceTemporarilyUnavailable(backoff_time=30)
+
# handle 404 not found, convert to MediaNotFoundError
if response.status == 404:
raise MediaNotFoundError(f"{endpoint} not found")
response.raise_for_status()
return await response.json(loads=json_loads)
+ @throttle_with_retries
async def _delete_data(self, endpoint, data=None, **kwargs) -> str:
"""Delete data from api."""
url = f"https://api.spotify.com/v1/{endpoint}"
# handle spotify rate limiter
if response.status == 429:
backoff_time = int(response.headers["Retry-After"])
- self.logger.debug("Waiting %s seconds on Spotify rate limiter", backoff_time)
- await asyncio.sleep(backoff_time)
- return await self._delete_data(endpoint, data=data, **kwargs)
+ raise ResourceTemporarilyUnavailable(
+ "Spotify Rate Limiter", backoff_time=backoff_time
+ )
+ # handle temporary server error
+ if response.status in (502, 503):
+ raise ResourceTemporarilyUnavailable(backoff_time=30)
response.raise_for_status()
return await response.text()
+ @throttle_with_retries
async def _put_data(self, endpoint, data=None, **kwargs) -> str:
"""Put data on api."""
url = f"https://api.spotify.com/v1/{endpoint}"
# handle spotify rate limiter
if response.status == 429:
backoff_time = int(response.headers["Retry-After"])
- self.logger.debug("Waiting %s seconds on Spotify rate limiter", backoff_time)
- await asyncio.sleep(backoff_time)
- return await self._put_data(endpoint, data=data, **kwargs)
+ raise ResourceTemporarilyUnavailable(
+ "Spotify Rate Limiter", backoff_time=backoff_time
+ )
+ # handle temporary server error
+ if response.status in (502, 503):
+ raise ResourceTemporarilyUnavailable(backoff_time=30)
response.raise_for_status()
return await response.text()
+ @throttle_with_retries
async def _post_data(self, endpoint, data=None, **kwargs) -> str:
"""Post data on api."""
url = f"https://api.spotify.com/v1/{endpoint}"
# handle spotify rate limiter
if response.status == 429:
backoff_time = int(response.headers["Retry-After"])
- self.logger.debug("Waiting %s seconds on Spotify rate limiter", backoff_time)
- await asyncio.sleep(backoff_time)
- return await self._post_data(endpoint, data=data, **kwargs)
+ raise ResourceTemporarilyUnavailable(
+ "Spotify Rate Limiter", backoff_time=backoff_time
+ )
+ # handle temporary server error
+ if response.status in (502, 503):
+ raise ResourceTemporarilyUnavailable(backoff_time=30)
response.raise_for_status()
return await response.text()
from music_assistant.common.models.streamdetails import StreamDetails
from music_assistant.server.helpers.auth import AuthenticationHelper
from music_assistant.server.helpers.tags import AudioTags, parse_tags
-from music_assistant.server.helpers.throttle_retry import (
- AsyncThrottleWithRetryContextManager,
-)
+from music_assistant.server.helpers.throttle_retry import ThrottlerManager, throttle_with_retries
from music_assistant.server.models.music_provider import MusicProvider
from .helpers import (
_tidal_session: TidalSession | None = None
_tidal_user_id: str | None = None
+ # rate limiter needs to be specified on provider-level,
+ # so make it an instance attribute
+ throttler = ThrottlerManager(rate_limit=1, period=0.5)
async def handle_async_init(self) -> None:
"""Handle async initialization of the provider."""
self._tidal_user_id: str = self.config.get_value(CONF_USER_ID)
self._tidal_session = await self._get_tidal_session()
- self._throttle_retry = AsyncThrottleWithRetryContextManager(
- rate_limit=1, period=0.5, retry_attempts=5, initial_backoff=5
- )
@property
def supported_features(self) -> tuple[ProviderFeature, ...]:
):
yield self._parse_playlist(playlist)
+ @throttle_with_retries
async def get_album_tracks(self, prov_album_id: str) -> list[Track]:
"""Get album tracks for given album id."""
tidal_session = await self._get_tidal_session()
- async with self._throttle_retry as manager:
- tracks_obj = await manager.wrapped_function_with_retry(
- get_album_tracks, tidal_session, prov_album_id
- )
- return [self._parse_track(track_obj=track_obj) for track_obj in tracks_obj]
+ tracks_obj = await get_album_tracks(tidal_session, prov_album_id)
+ return [self._parse_track(track_obj=track_obj) for track_obj in tracks_obj]
+ @throttle_with_retries
async def get_artist_albums(self, prov_artist_id: str) -> list[Album]:
"""Get a list of all albums for the given artist."""
tidal_session = await self._get_tidal_session()
- async with self._throttle_retry as manager:
- artist_albums_obj = await manager.wrapped_function_with_retry(
- get_artist_albums, tidal_session, prov_artist_id
- )
- return [self._parse_album(album) for album in artist_albums_obj]
+ artist_albums_obj = await get_artist_albums(tidal_session, prov_artist_id)
+ return [self._parse_album(album) for album in artist_albums_obj]
+ @throttle_with_retries
async def get_artist_toptracks(self, prov_artist_id: str) -> list[Track]:
"""Get a list of 10 most popular tracks for the given artist."""
tidal_session = await self._get_tidal_session()
- async with self._throttle_retry as manager:
- artist_toptracks_obj = await manager.wrapped_function_with_retry(
- get_artist_toptracks, tidal_session, prov_artist_id
- )
- return [self._parse_track(track) for track in artist_toptracks_obj]
+ artist_toptracks_obj = await get_artist_toptracks(tidal_session, prov_artist_id)
+ return [self._parse_track(track) for track in artist_toptracks_obj]
async def get_playlist_tracks(self, prov_playlist_id: str) -> AsyncGenerator[Track, None]:
"""Get all playlist tracks for given playlist id."""
track.position = total_playlist_tracks
yield track
+ @throttle_with_retries
async def get_similar_tracks(self, prov_track_id: str, limit: int = 25) -> list[Track]:
"""Get similar tracks for given track id."""
tidal_session = await self._get_tidal_session()
- async with self._throttle_retry as manager:
- similar_tracks_obj = await manager.wrapped_function_with_retry(
- get_similar_tracks, tidal_session, prov_track_id, limit
- )
- return [self._parse_track(track) for track in similar_tracks_obj]
+ similar_tracks_obj = await get_similar_tracks(tidal_session, prov_track_id, limit)
+ return [self._parse_track(track) for track in similar_tracks_obj]
async def library_add(self, item: MediaItemType) -> bool:
"""Add item to library."""
path=url,
)
+ @throttle_with_retries
async def get_artist(self, prov_artist_id: str) -> Artist:
"""Get artist details for given artist id."""
tidal_session = await self._get_tidal_session()
- async with self._throttle_retry as manager:
- artist_obj = await manager.wrapped_function_with_retry(
- get_artist, tidal_session, prov_artist_id
- )
- return self._parse_artist(artist_obj)
+ artist_obj = await get_artist(tidal_session, prov_artist_id)
+ return self._parse_artist(artist_obj)
+ @throttle_with_retries
async def get_album(self, prov_album_id: str) -> Album:
"""Get album details for given album id."""
tidal_session = await self._get_tidal_session()
- async with self._throttle_retry as manager:
- album_obj = await manager.wrapped_function_with_retry(
- get_album, tidal_session, prov_album_id
- )
- return self._parse_album(album_obj)
+ album_obj = await get_album(tidal_session, prov_album_id)
+ return self._parse_album(album_obj)
+ @throttle_with_retries
async def get_track(self, prov_track_id: str) -> Track:
"""Get track details for given track id."""
tidal_session = await self._get_tidal_session()
- async with self._throttle_retry as manager:
- track_obj = await manager.wrapped_function_with_retry(
- get_track, tidal_session, prov_track_id
- )
- track = self._parse_track(track_obj)
- # get some extra details for the full track info
- with suppress(tidal_exceptions.MetadataNotAvailable, AttributeError):
- lyrics: TidalLyrics = await asyncio.to_thread(track.lyrics)
- track.metadata.lyrics = lyrics.text
- return track
+ track_obj = await get_track(tidal_session, prov_track_id)
+ track = self._parse_track(track_obj)
+ # get some extra details for the full track info
+ with suppress(tidal_exceptions.MetadataNotAvailable, AttributeError):
+ lyrics: TidalLyrics = await asyncio.to_thread(track.lyrics)
+ track.metadata.lyrics = lyrics.text
+ return track
+ @throttle_with_retries
async def get_playlist(self, prov_playlist_id: str) -> Playlist:
"""Get playlist details for given playlist id."""
tidal_session = await self._get_tidal_session()
- async with self._throttle_retry as manager:
- playlist_obj = await manager.wrapped_function_with_retry(
- get_playlist, tidal_session, prov_playlist_id
- )
- return self._parse_playlist(playlist_obj)
+ playlist_obj = await get_playlist(tidal_session, prov_playlist_id)
+ return self._parse_playlist(playlist_obj)
def get_item_mapping(self, media_type: MediaType, key: str, name: str) -> ItemMapping:
"""Create a generic item mapping."""
) -> AsyncGenerator[Any, None]:
"""Yield all items from a larger listing."""
offset = 0
- async with self._throttle_retry:
- while True:
- if asyncio.iscoroutinefunction(func):
- chunk = await func(*args, **kwargs, offset=offset)
- else:
- chunk = await asyncio.to_thread(func, *args, **kwargs, offset=offset)
- offset += len(chunk)
- for item in chunk:
- yield item
- if len(chunk) < DEFAULT_LIMIT:
- break
+ while True:
+ if asyncio.iscoroutinefunction(func):
+ chunk = await func(*args, **kwargs, offset=offset)
+ else:
+ chunk = await asyncio.to_thread(func, *args, **kwargs, offset=offset)
+ offset += len(chunk)
+ for item in chunk:
+ yield item
+ if len(chunk) < DEFAULT_LIMIT:
+ break
async def _get_media_info(
self, item_id: str, url: str, force_refresh: bool = False
import asyncio
import logging
import os
-import sys
from collections.abc import Awaitable, Callable, Coroutine
from typing import TYPE_CHECKING, Any, Self
from uuid import uuid4
EventCallBackType, tuple[EventType, ...] | None, tuple[str, ...] | None
]
-ENABLE_DEBUG = bool(os.environ.get("PYTHONDEVMODE", "0"))
+ENABLE_DEBUG = os.environ.get("PYTHONDEVMODE") == "1"
LOGGER = logging.getLogger(MASS_LOGGER_NAME)
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
PROVIDERS_PATH = os.path.join(BASE_DIR, "providers")
-ENABLE_HTTP_CLEANUP_CLOSED = not (3, 11, 1) <= sys.version_info < (3, 11, 4)
-# Enabling cleanup closed on python 3.11.1+ leaks memory relatively quickly
-# see https://github.com/aio-libs/aiohttp/issues/7252
-# aiohttp interacts poorly with https://github.com/python/cpython/pull/98540
-# The issue was fixed in 3.11.4 via https://github.com/python/cpython/pull/104485
-
class MusicAssistant:
"""Main MusicAssistant (Server) object."""
loop=self.loop,
connector=TCPConnector(
ssl=False,
- enable_cleanup_closed=ENABLE_HTTP_CLEANUP_CLOSED,
+ enable_cleanup_closed=True,
limit=4096,
limit_per_host=100,
),