BandcampAPIError,
BandcampMustBeLoggedInError,
BandcampNotFoundError,
+ BandcampRateLimitError,
SearchResultAlbum,
SearchResultArtist,
SearchResultTrack,
from bandcamp_async_api.models import CollectionType
from music_assistant_models.config_entries import ConfigEntry, ConfigValueType, ProviderConfig
from music_assistant_models.enums import ConfigEntryType, MediaType, ProviderFeature, StreamType
-from music_assistant_models.errors import InvalidDataError, LoginFailed, MediaNotFoundError
+from music_assistant_models.errors import (
+ InvalidDataError,
+ LoginFailed,
+ MediaNotFoundError,
+ ResourceTemporarilyUnavailable,
+)
from music_assistant_models.media_items import Album, Artist, AudioFormat, SearchResults, Track
from music_assistant_models.provider import ProviderManifest
from music_assistant_models.streamdetails import StreamDetails
_client: BandcampAPIClient
_converters: BandcampConverters
- throttler: ThrottlerManager
+ throttler: ThrottlerManager = ThrottlerManager(
+ rate_limit=50, # requests per period seconds
+ period=10,
+ initial_backoff=3, # Bandcamp responds with Retry-After 3
+ retry_attempts=10,
+ )
top_tracks_limit: int
async def handle_async_init(self) -> None:
self.top_tracks_limit = cast(
"int", self.config.get_value(CONF_TOP_TRACKS_LIMIT, DEFAULT_TOP_TRACKS_LIMIT)
)
-
- # Initialize the new async API client
- self._client = BandcampAPIClient(session=self.mass.http_session, identity_token=identity)
-
- self.throttler = ThrottlerManager(rate_limit=1, period=2)
+ self._client = BandcampAPIClient(
+ session=self.mass.http_session,
+ identity_token=identity,
+ default_retry_after=3, # Bandcamp responds with Retry-After 3
+ )
self._converters = BandcampConverters(self.domain, self.instance_id)
@property
search_results = await self._client.search(search_query)
except BandcampNotFoundError as error:
raise MediaNotFoundError("No results for Bandcamp search") from error
+ except BandcampRateLimitError as error:
+ raise ResourceTemporarilyUnavailable(
+ "Bandcamp rate limit reached", backoff_time=error.retry_after
+ ) from error
except BandcampAPIError as error:
raise InvalidDataError("Unexpected error during Bandcamp search") from error
return
try:
- collection = await self._client.get_collection_items(CollectionType.COLLECTION)
+ async with self.throttler.acquire(): # AsyncGenerator method cannot be decorated
+ collection = await self._client.get_collection_items(CollectionType.COLLECTION)
band_ids = set()
for item in collection.items:
if item.item_type == "band":
raise LoginFailed("Wrong Bandcamp identity token.") from error
except BandcampNotFoundError as error:
raise MediaNotFoundError("Bandcamp library artists returned no results") from error
+ except BandcampRateLimitError as error:
+ raise ResourceTemporarilyUnavailable(
+ "Bandcamp rate limit reached", backoff_time=error.retry_after
+ ) from error
except BandcampAPIError as error:
raise MediaNotFoundError("Failed to get library artists") from error
return
try:
- api_collection = await self._client.get_collection_items(CollectionType.COLLECTION)
+ async with self.throttler.acquire(): # AsyncGenerator method cannot be decorated
+ api_collection = await self._client.get_collection_items(CollectionType.COLLECTION)
for item in api_collection.items:
if item.item_type == "album":
yield await self.get_album(f"{item.band_id}-{item.item_id}")
raise LoginFailed("Wrong Bandcamp identity token.") from error
except BandcampNotFoundError as error:
raise MediaNotFoundError("Bandcamp library albums returned no results") from error
+ except BandcampRateLimitError as error:
+ raise ResourceTemporarilyUnavailable(
+ "Bandcamp rate limit reached", backoff_time=error.retry_after
+ ) from error
except BandcampAPIError as error:
raise MediaNotFoundError("Failed to get library albums") from error
if not self._client.identity: # library requires identity
return
- try:
- async for album in self.get_library_albums():
- tracks = await self.get_album_tracks(album.item_id)
- for track in tracks:
- yield track
- await asyncio.sleep(0) # Yield control to avoid blocking
- except BandcampMustBeLoggedInError as error:
- self.logger.error("Error getting Bandcamp library tracks: Wrong identity token.")
- raise LoginFailed("Wrong Bandcamp identity token.") from error
- except BandcampNotFoundError as error:
- raise MediaNotFoundError("Bandcamp library tracks returned no results") from error
- except BandcampAPIError as error:
- raise MediaNotFoundError("Failed to get library tracks") from error
+ async for album in self.get_library_albums():
+ tracks = await self.get_album_tracks(album.item_id)
+ for track in tracks:
+ yield track
+ await asyncio.sleep(0) # Yield control to avoid blocking
@use_cache(CACHE)
+ @throttle_with_retries
async def get_artist(self, prov_artist_id: str | int) -> Artist:
"""Get full artist details by id."""
try:
raise MediaNotFoundError(
f"Bandcamp artist {prov_artist_id} search returned no results"
) from error
+ except BandcampRateLimitError as error:
+ raise ResourceTemporarilyUnavailable(
+ "Bandcamp rate limit reached", backoff_time=error.retry_after
+ ) from error
except BandcampAPIError as error:
raise MediaNotFoundError(f"Failed to get artist {prov_artist_id}") from error
@use_cache(CACHE)
+ @throttle_with_retries
async def get_album(self, prov_album_id: str) -> Album:
"""Get full album details by id."""
artist_id, album_id, _ = split_id(prov_album_id)
raise MediaNotFoundError(
f"Bandcamp album {prov_album_id} search returned no results"
) from error
+ except BandcampRateLimitError as error:
+ raise ResourceTemporarilyUnavailable(
+ "Bandcamp rate limit reached", backoff_time=error.retry_after
+ ) from error
except BandcampAPIError as error:
raise MediaNotFoundError(f"Failed to get album {prov_album_id}") from error
@use_cache(CACHE)
+ @throttle_with_retries
async def get_track(self, prov_track_id: str) -> Track:
"""Get full track details by id."""
artist_id, album_id, track_id = split_id(prov_track_id)
raise MediaNotFoundError(
f"Bandcamp track {prov_track_id} search returned no results"
) from error
+ except BandcampRateLimitError as error:
+ raise ResourceTemporarilyUnavailable(
+ "Bandcamp rate limit reached", backoff_time=error.retry_after
+ ) from error
except BandcampAPIError as error:
raise MediaNotFoundError(f"Failed to get track {prov_track_id}") from error
@use_cache(CACHE)
+ @throttle_with_retries
async def get_album_tracks(self, prov_album_id: str) -> list[Track]:
"""Get all tracks in an album."""
artist_id, album_id, _ = split_id(prov_album_id)
raise MediaNotFoundError(
f"Bandcamp album {prov_album_id} tracks search returned no results"
) from error
+ except BandcampRateLimitError as error:
+ raise ResourceTemporarilyUnavailable(
+ "Bandcamp rate limit reached", backoff_time=error.retry_after
+ ) from error
except BandcampAPIError as error:
raise MediaNotFoundError(f"Failed to get albums tracks for {prov_album_id}") from error
@use_cache(CACHE)
+ @throttle_with_retries
async def get_artist_albums(self, prov_artist_id: str) -> list[Album]:
"""Get albums by an artist."""
albums = []
raise MediaNotFoundError(
f"Bandcamp artist {prov_artist_id} albums search returned no results"
) from error
+ except BandcampRateLimitError as error:
+ raise ResourceTemporarilyUnavailable(
+ "Bandcamp rate limit reached", backoff_time=error.retry_after
+ ) from error
except BandcampAPIError as error:
raise MediaNotFoundError(f"Failed to get albums for artist {prov_artist_id}") from error
return albums
@use_cache(CACHE)
+ @throttle_with_retries
async def get_artist_toptracks(self, prov_artist_id: str) -> list[Track]:
"""Get top tracks of an artist."""
tracks: list[Track] = []
- try:
- albums = await self.get_artist_albums(prov_artist_id)
- albums.sort(key=lambda album: (album.year is None, album.year or 0), reverse=True)
- for album in albums:
- tracks.extend(await self.get_album_tracks(album.item_id))
- if len(tracks) >= self.top_tracks_limit:
- break
-
- except BandcampNotFoundError as error:
- raise MediaNotFoundError(
- f"Bandcamp artist {prov_artist_id} top tracks search returned no results"
- ) from error
-
- except BandcampAPIError as error:
- raise MediaNotFoundError(
- f"Failed to get toptracks for artist {prov_artist_id}"
- ) from error
+ # get_artist_albums and get_album_tracks already handle exceptions and rate limiting
+ albums = await self.get_artist_albums(prov_artist_id)
+ albums.sort(key=lambda album: (album.year is None, album.year or 0), reverse=True)
+ for album in albums:
+ tracks.extend(await self.get_album_tracks(album.item_id))
+ if len(tracks) >= self.top_tracks_limit:
+ break
return tracks[: self.top_tracks_limit]
async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails:
"""Return the content details for the given track."""
- try:
- # consider _client to avoid caching if the track urls become dynamic
- track_ma = await self.get_track(item_id)
- if not track_ma.metadata.links:
- raise MediaNotFoundError(
- f"No streaming links found for track {item_id}. Please report this"
- )
-
- link = next(iter(track_ma.metadata.links))
- if not link:
- raise MediaNotFoundError(
- f"No streaming URL found for track {item_id}. Please report this"
- )
-
- streaming_url = link.url
- if not streaming_url:
- raise MediaNotFoundError(
- f"No streaming URL found for track {item_id}: {streaming_url}"
- )
-
- return StreamDetails(
- item_id=item_id,
- provider=self.instance_id,
- audio_format=AudioFormat(),
- stream_type=StreamType.HTTP,
- media_type=media_type,
- path=streaming_url,
- can_seek=True,
- allow_seek=True,
+ # get_track already handles exceptions and rate limiting
+ track_ma = await self.get_track(item_id)
+ if not track_ma.metadata.links:
+ raise MediaNotFoundError(
+ f"No streaming links found for track {item_id}. Please report this"
)
- except BandcampNotFoundError as error:
- raise MediaNotFoundError(
- f"Bandcamp stream details search for {media_type} {item_id} returned no results"
- ) from error
- except BandcampAPIError as error:
+ link = next(iter(track_ma.metadata.links))
+ if not link:
raise MediaNotFoundError(
- f"Stream details not available for {media_type} {item_id}"
- ) from error
+ f"No streaming URL found for track {item_id}. Please report this"
+ )
+
+ streaming_url = link.url
+ if not streaming_url:
+ raise MediaNotFoundError(f"No streaming URL found for track {item_id}: {streaming_url}")
+
+ return StreamDetails(
+ item_id=item_id,
+ provider=self.instance_id,
+ audio_format=AudioFormat(),
+ stream_type=StreamType.HTTP,
+ media_type=media_type,
+ path=streaming_url,
+ can_seek=True,
+ allow_seek=True,
+ )