--- /dev/null
+"""Bandcamp music provider support for MusicAssistant."""
+
+import asyncio
+from collections.abc import AsyncGenerator
+from contextlib import suppress
+from typing import cast
+
+from bandcamp_async_api import (
+ BandcampAPIClient,
+ BandcampAPIError,
+ BandcampMustBeLoggedInError,
+ BandcampNotFoundError,
+ SearchResultAlbum,
+ SearchResultArtist,
+ SearchResultTrack,
+)
+from bandcamp_async_api.models import CollectionType
+from music_assistant_models.config_entries import ConfigEntry, ConfigValueType, ProviderConfig
+from music_assistant_models.enums import (
+ ConfigEntryType,
+ MediaType,
+ ProviderFeature,
+ StreamType,
+)
+from music_assistant_models.errors import (
+ InvalidDataError,
+ LoginFailed,
+ MediaNotFoundError,
+)
+from music_assistant_models.media_items import (
+ Album,
+ Artist,
+ AudioFormat,
+ SearchResults,
+ Track,
+)
+from music_assistant_models.provider import ProviderManifest
+from music_assistant_models.streamdetails import StreamDetails
+
+from music_assistant.controllers.cache import use_cache
+from music_assistant.helpers.throttle_retry import ThrottlerManager, throttle_with_retries
+from music_assistant.mass import MusicAssistant
+from music_assistant.models import ProviderInstanceType
+from music_assistant.models.music_provider import MusicProvider
+
+from .converters import BandcampConverters
+
+SUPPORTED_FEATURES = {
+ ProviderFeature.LIBRARY_ARTISTS,
+ ProviderFeature.LIBRARY_ALBUMS,
+ ProviderFeature.LIBRARY_TRACKS,
+ ProviderFeature.SEARCH,
+ ProviderFeature.ARTIST_ALBUMS,
+ ProviderFeature.ARTIST_TOPTRACKS,
+}
+
+CONF_IDENTITY = "identity"
+CONF_TOP_TRACKS_LIMIT = "top_tracks_limit"
+DEFAULT_TOP_TRACKS_LIMIT = 50
+CACHE = 3600 * 24 * 30 # Cache for 30 days
+
+
+async def setup(
+ mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
+) -> ProviderInstanceType:
+ """Initialize provider(instance) with given configuration."""
+ return BandcampProvider(mass, manifest, config, SUPPORTED_FEATURES)
+
+
+# noinspection PyTypeHints,PyUnusedLocal
+async def get_config_entries(
+ mass: MusicAssistant, # noqa: ARG001
+ instance_id: str | None = None, # noqa: ARG001
+ action: str | None = None, # noqa: ARG001
+ values: dict[str, ConfigValueType] | None = None,
+) -> tuple[ConfigEntry, ...]:
+ """Return Config entries to setup this provider."""
+ return (
+ ConfigEntry(
+ key=CONF_IDENTITY,
+ type=ConfigEntryType.SECURE_STRING,
+ label="Identity token",
+ required=False,
+ description="Identity token from Bandcamp cookies for account collection access."
+ " Log in https://bandcamp.com and extract browser cookie named 'identity'.",
+ value=values.get(CONF_IDENTITY) if values else None,
+ ),
+ ConfigEntry(
+ key=CONF_TOP_TRACKS_LIMIT,
+ type=ConfigEntryType.INTEGER,
+ label="Artist Top Tracks search limit",
+ required=False,
+ description="Search limit while getting artist top tracks.",
+ value=values.get(CONF_TOP_TRACKS_LIMIT) if values else DEFAULT_TOP_TRACKS_LIMIT,
+ default_value=DEFAULT_TOP_TRACKS_LIMIT,
+ category="advanced",
+ ),
+ )
+
+
+def split_id(id_: str) -> tuple[int, int | None, int | None]:
+ """Return (artist_id, album_id, track_id). Missing parts are returned as 0."""
+ parts = id_.split("-")
+ part_0 = int(parts[0])
+ part_1 = int(parts[1]) if len(parts) > 1 else 0
+ part_2 = int(parts[2]) if len(parts) > 2 else 0
+ return part_0, part_1, part_2
+
+
+class BandcampProvider(MusicProvider):
+ """Bandcamp provider support."""
+
+ _client: BandcampAPIClient
+ _converters: BandcampConverters
+ throttler: ThrottlerManager
+ top_tracks_limit: int
+
+ async def handle_async_init(self) -> None:
+ """Handle async init of the Bandcamp provider."""
+ identity = self.config.get_value(CONF_IDENTITY)
+ self.top_tracks_limit = cast(
+ "int", self.config.get_value(CONF_TOP_TRACKS_LIMIT, DEFAULT_TOP_TRACKS_LIMIT)
+ )
+
+ # Initialize the new async API client
+ self._client = BandcampAPIClient(session=self.mass.http_session, identity_token=identity)
+
+ self.throttler = ThrottlerManager(rate_limit=1, period=2)
+ self._converters = BandcampConverters(self.domain, self.instance_id)
+
+ @property
+ def is_streaming_provider(self) -> bool:
+ """Return True if the provider is a streaming provider."""
+ return True
+
+ @use_cache(CACHE)
+ @throttle_with_retries
+ async def search(
+ self, search_query: str, media_types: list[MediaType], limit: int = 50
+ ) -> SearchResults:
+ """Perform search on music provider."""
+ results = SearchResults()
+ if not self._client.identity:
+ return results
+
+ if not media_types:
+ return results
+
+ try:
+ search_results = await self._client.search(search_query)
+ except BandcampNotFoundError as error:
+ raise MediaNotFoundError("No results for Bandcamp search") from error
+ except BandcampAPIError as error:
+ raise InvalidDataError("Unexpected error during Bandcamp search") from error
+
+ for item in search_results[:limit]:
+ try:
+ if isinstance(item, SearchResultTrack) and MediaType.TRACK in media_types:
+ results.tracks = [*results.tracks, self._converters.track_from_search(item)]
+ elif isinstance(item, SearchResultAlbum) and MediaType.ALBUM in media_types:
+ results.albums = [*results.albums, self._converters.album_from_search(item)]
+ elif isinstance(item, SearchResultArtist) and MediaType.ARTIST in media_types:
+ results.artists = [*results.artists, self._converters.artist_from_search(item)]
+ except BandcampAPIError as error:
+ self.logger.warning("Failed to convert search result item: %s", error)
+ continue
+
+ return results
+
+ async def get_library_artists(self) -> AsyncGenerator[Artist, None]:
+ """Retrieve library artists from Bandcamp."""
+ if not self._client.identity: # library requires identity
+ return
+
+ try:
+ collection = await self._client.get_collection_items(CollectionType.COLLECTION)
+ band_ids = set()
+ for item in collection.items:
+ if item.item_type == "band":
+ band_ids.add(item.item_id)
+ elif item.item_type == "album":
+ band_ids.add(item.band_id)
+
+ for band_id in band_ids:
+ yield await self.get_artist(band_id)
+ await asyncio.sleep(0) # Yield control to avoid blocking
+
+ except BandcampMustBeLoggedInError as error:
+ self.logger.error("Error getting Bandcamp library artists: Wrong identity token.")
+ raise LoginFailed("Wrong Bandcamp identity token.") from error
+ except BandcampNotFoundError as error:
+ raise MediaNotFoundError("Bandcamp library artists returned no results") from error
+ except BandcampAPIError as error:
+ raise MediaNotFoundError("Failed to get library artists") from error
+
+ async def get_library_albums(self) -> AsyncGenerator[Album, None]:
+ """Retrieve library albums from Bandcamp."""
+ if not self._client.identity: # library requires identity
+ return
+
+ try:
+ api_collection = await self._client.get_collection_items(CollectionType.COLLECTION)
+ for item in api_collection.items:
+ if item.item_type == "album":
+ yield await self.get_album(f"{item.band_id}-{item.item_id}")
+ await asyncio.sleep(0) # Yield control to avoid blocking
+ except BandcampMustBeLoggedInError as error:
+ self.logger.error("Error getting Bandcamp library albums: Wrong identity token.")
+ raise LoginFailed("Wrong Bandcamp identity token.") from error
+ except BandcampNotFoundError as error:
+ raise MediaNotFoundError("Bandcamp library albums returned no results") from error
+ except BandcampAPIError as error:
+ raise MediaNotFoundError("Failed to get library albums") from error
+
+ async def get_library_tracks(self) -> AsyncGenerator[Track, None]:
+ """Retrieve library tracks from Bandcamp."""
+ if not self._client.identity: # library requires identity
+ return
+
+ try:
+ async for album in self.get_library_albums():
+ tracks = await self.get_album_tracks(album.item_id)
+ for track in tracks:
+ yield track
+ await asyncio.sleep(0) # Yield control to avoid blocking
+ except BandcampMustBeLoggedInError as error:
+ self.logger.error("Error getting Bandcamp library tracks: Wrong identity token.")
+ raise LoginFailed("Wrong Bandcamp identity token.") from error
+ except BandcampNotFoundError as error:
+ raise MediaNotFoundError("Bandcamp library tracks returned no results") from error
+ except BandcampAPIError as error:
+ raise MediaNotFoundError("Failed to get library tracks") from error
+
+ @use_cache(CACHE)
+ async def get_artist(self, prov_artist_id: str | int) -> Artist:
+ """Get full artist details by id."""
+ try:
+ api_artist = await self._client.get_artist(prov_artist_id)
+ return self._converters.artist_from_api(api_artist)
+ except BandcampNotFoundError as error:
+ raise MediaNotFoundError(
+ f"Bandcamp artist {prov_artist_id} search returned no results"
+ ) from error
+ except BandcampAPIError as error:
+ raise MediaNotFoundError(f"Failed to get artist {prov_artist_id}") from error
+
+ @use_cache(CACHE)
+ async def get_album(self, prov_album_id: str) -> Album:
+ """Get full album details by id."""
+ artist_id, album_id, _ = split_id(prov_album_id)
+ try:
+ api_album = await self._client.get_album(artist_id, album_id)
+ return self._converters.album_from_api(api_album)
+ except BandcampNotFoundError as error:
+ raise MediaNotFoundError(
+ f"Bandcamp album {prov_album_id} search returned no results"
+ ) from error
+ except BandcampAPIError as error:
+ raise MediaNotFoundError(f"Failed to get album {prov_album_id}") from error
+
+ @use_cache(CACHE)
+ async def get_track(self, prov_track_id: str) -> Track:
+ """Get full track details by id."""
+ artist_id, album_id, track_id = split_id(prov_track_id)
+ if track_id is None: # artist_id-track_id
+ album_id, track_id = None, album_id
+
+ try:
+ if all((artist_id, album_id, track_id)):
+ api_album = await self._client.get_album(artist_id, album_id)
+ api_track = next((_ for _ in api_album.tracks if _.id == track_id), None)
+ return self._converters.track_from_api(
+ track=api_track,
+ album_id=api_album.id,
+ album_name=api_album.title,
+ album_image_url=api_album.art_url,
+ )
+ elif not album_id:
+ api_track = await self._client.get_track(artist_id, track_id)
+ return self._converters.track_from_api(
+ track=api_track,
+ album_id=api_track.album.id if api_track.album else None,
+ album_name=api_track.album.title if api_track.album else "",
+ album_image_url=api_track.album.art_url if api_track.album else "",
+ )
+ else:
+ raise MediaNotFoundError(f"Track {prov_track_id} not found on Bandcamp")
+ except BandcampNotFoundError as error:
+ raise MediaNotFoundError(
+ f"Bandcamp track {prov_track_id} search returned no results"
+ ) from error
+ except BandcampAPIError as error:
+ raise MediaNotFoundError(f"Failed to get track {prov_track_id}") from error
+
+ @use_cache(CACHE)
+ async def get_album_tracks(self, prov_album_id: str) -> list[Track]:
+ """Get all tracks in an album."""
+ artist_id, album_id, _ = split_id(prov_album_id)
+ try:
+ api_album = await self._client.get_album(artist_id, album_id)
+ if api_album.tracks:
+ return [
+ self._converters.track_from_api(
+ track=track,
+ album_id=album_id,
+ album_name=api_album.title,
+ album_image_url=api_album.art_url,
+ )
+ for track in api_album.tracks
+ if track.streaming_url # Only include tracks with streaming URLs
+ ]
+
+ return []
+
+ except BandcampNotFoundError as error:
+ raise MediaNotFoundError(
+ f"Bandcamp album {prov_album_id} tracks search returned no results"
+ ) from error
+ except BandcampAPIError as error:
+ raise MediaNotFoundError(f"Failed to get albums tracks for {prov_album_id}") from error
+
+ @use_cache(CACHE)
+ async def get_artist_albums(self, prov_artist_id: str) -> list[Album]:
+ """Get albums by an artist."""
+ albums = []
+ try:
+ api_discography = await self._client.get_artist_discography(prov_artist_id)
+ for item in api_discography:
+ if item.get("item_type") == "album" and item.get("item_id"):
+ album = None
+
+ with suppress(MediaNotFoundError):
+ album = await self.get_album(f"{item['band_id']}-{item['item_id']}")
+
+ with suppress(MediaNotFoundError):
+ album = album or await self.get_album(f"{prov_artist_id}-{item['item_id']}")
+
+ if album:
+ albums.append(album)
+
+ except BandcampNotFoundError as error:
+ raise MediaNotFoundError(
+ f"Bandcamp artist {prov_artist_id} albums search returned no results"
+ ) from error
+ except BandcampAPIError as error:
+ raise MediaNotFoundError(f"Failed to get albums for artist {prov_artist_id}") from error
+
+ return albums
+
+ @use_cache(CACHE)
+ async def get_artist_toptracks(self, prov_artist_id: str) -> list[Track]:
+ """Get top tracks of an artist."""
+ tracks: list[Track] = []
+ try:
+ albums = await self.get_artist_albums(prov_artist_id)
+ albums.sort(key=lambda album: (album.year is None, album.year or 0), reverse=True)
+ for album in albums:
+ tracks.extend(await self.get_album_tracks(album.item_id))
+ if len(tracks) >= self.top_tracks_limit:
+ break
+
+ except BandcampNotFoundError as error:
+ raise MediaNotFoundError(
+ f"Bandcamp artist {prov_artist_id} top tracks search returned no results"
+ ) from error
+
+ except BandcampAPIError as error:
+ raise MediaNotFoundError(
+ f"Failed to get toptracks for artist {prov_artist_id}"
+ ) from error
+
+ return tracks[: self.top_tracks_limit]
+
+ async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails:
+ """Return the content details for the given track."""
+ try:
+ # consider _client to avoid caching if the track urls become dynamic
+ track_ma = await self.get_track(item_id)
+ if not track_ma.metadata.links:
+ raise MediaNotFoundError(
+ f"No streaming links found for track {item_id}. Please report this"
+ )
+
+ link = next(iter(track_ma.metadata.links))
+ if not link:
+ raise MediaNotFoundError(
+ f"No streaming URL found for track {item_id}. Please report this"
+ )
+
+ streaming_url = link.url
+ if not streaming_url:
+ raise MediaNotFoundError(
+ f"No streaming URL found for track {item_id}: {streaming_url}"
+ )
+
+ return StreamDetails(
+ item_id=item_id,
+ provider=self.instance_id,
+ audio_format=AudioFormat(),
+ stream_type=StreamType.HTTP,
+ media_type=media_type,
+ path=streaming_url,
+ can_seek=True,
+ allow_seek=True,
+ )
+
+ except BandcampNotFoundError as error:
+ raise MediaNotFoundError(
+ f"Bandcamp stream details search for {media_type} {item_id} returned no results"
+ ) from error
+ except BandcampAPIError as error:
+ raise MediaNotFoundError(
+ f"Stream details not available for {media_type} {item_id}"
+ ) from error
--- /dev/null
+"""Converters for Bandcamp API models to Music Assistant models."""
+
+from datetime import datetime
+
+from bandcamp_async_api.models import BCAlbum as APIAlbum
+from bandcamp_async_api.models import BCArtist as APIArtist
+from bandcamp_async_api.models import BCTrack as APITrack
+from bandcamp_async_api.models import (
+ SearchResultAlbum,
+ SearchResultArtist,
+ SearchResultTrack,
+)
+from music_assistant_models.enums import ImageType, LinkType, MediaType
+from music_assistant_models.media_items import Album as MAAlbum
+from music_assistant_models.media_items import Artist as MAArtist
+from music_assistant_models.media_items import (
+ ItemMapping,
+ MediaItemImage,
+ MediaItemLink,
+ ProviderMapping,
+ UniqueList,
+)
+from music_assistant_models.media_items import Track as MATrack
+
+
+class BandcampConverters:
+ """Converters for Bandcamp API models to Music Assistant models."""
+
+ def __init__(self, domain: str, instance_id: str):
+ """Initialize converters with provider information."""
+ self.domain = domain
+ self.instance_id = instance_id
+
+ def streaming_url_from_api(
+ self, streaming_info: dict[str, str]
+ ) -> tuple[str | None, int | None]:
+ """Parse streaming URL info."""
+ # Extract streaming URL with priority: mp3-v0 > mp3-128
+ bitrate = None
+ streaming_url = None
+ if "mp3-v0" in streaming_info:
+ streaming_url = streaming_info["mp3-v0"]
+ bitrate = None # VBR
+ elif "mp3-320" in streaming_info:
+ streaming_url = streaming_info["mp3-320"]
+ bitrate = 320
+ elif "mp3-128" in streaming_info:
+ streaming_url = streaming_info["mp3-128"]
+ bitrate = 128
+ elif streaming_info:
+ # Fallback to first available URL
+ streaming_url = next(iter(streaming_info.values()))
+ return streaming_url, bitrate
+
+ def track_from_search(self, item: SearchResultTrack) -> MATrack:
+ """Create a Track from new API SearchResultTrack."""
+ track_id = f"{item.artist_id}-{item.album_id or 0}-{item.id}"
+ return MATrack(
+ item_id=track_id,
+ provider=self.instance_id,
+ name=item.name,
+ artists=UniqueList(
+ [
+ ItemMapping(
+ media_type=MediaType.ARTIST,
+ item_id=str(item.artist_id),
+ provider=self.instance_id,
+ name=item.artist_name,
+ )
+ ]
+ ),
+ album=(
+ ItemMapping(
+ media_type=MediaType.ALBUM,
+ item_id=f"{item.artist_id}-{item.album_id or 0}",
+ provider=self.instance_id,
+ name=item.album_name,
+ )
+ if item.album_id
+ else None
+ ),
+ provider_mappings={
+ ProviderMapping(
+ item_id=track_id,
+ provider_domain=self.domain,
+ provider_instance=self.instance_id,
+ url=item.url,
+ )
+ },
+ )
+
+ def album_from_search(self, item: SearchResultAlbum) -> MAAlbum:
+ """Create an Album from new API SearchResultAlbum."""
+ album_id = f"{item.artist_id}-{item.id}"
+ output = MAAlbum(
+ item_id=album_id,
+ provider=self.instance_id,
+ name=item.name,
+ uri=item.url,
+ artists=UniqueList(
+ [
+ ItemMapping(
+ media_type=MediaType.ARTIST,
+ item_id=str(item.artist_id),
+ provider=self.instance_id,
+ name=item.artist_name,
+ uri=item.artist_url,
+ )
+ ]
+ ),
+ provider_mappings={
+ ProviderMapping(
+ item_id=album_id,
+ provider_domain=self.domain,
+ provider_instance=self.instance_id,
+ url=item.url,
+ )
+ },
+ )
+ output.metadata.add_image(
+ MediaItemImage(
+ type=ImageType.LANDSCAPE,
+ path=item.image_url,
+ provider=self.instance_id,
+ remotely_accessible=True,
+ )
+ )
+ return output
+
+ def artist_from_search(self, item: SearchResultArtist) -> MAArtist:
+ """Create an Artist from new API SearchResultArtist."""
+ output = MAArtist(
+ item_id=str(item.id),
+ provider=self.instance_id,
+ name=item.name,
+ uri=item.url,
+ provider_mappings={
+ ProviderMapping(
+ item_id=str(item.id),
+ provider_domain=self.domain,
+ provider_instance=self.instance_id,
+ url=item.url,
+ )
+ },
+ )
+ output.metadata.genres = item.tags
+ if item.url:
+ output.metadata.description = item.url
+ output.metadata.add_image(
+ MediaItemImage(
+ type=ImageType.LANDSCAPE,
+ path=item.image_url,
+ provider=self.instance_id,
+ remotely_accessible=True,
+ )
+ )
+ return output
+
+ def track_from_api(
+ self,
+ track: APITrack,
+ album_id: str | int | None = None,
+ album_name: str = "",
+ album_image_url: str = "",
+ ) -> MATrack:
+ """Convert a Track object from the API to MA Track format."""
+ album_id = album_id or 0
+ output = MATrack(
+ item_id=f"{track.artist.id}-{album_id}-{track.id}",
+ provider=self.instance_id,
+ name=track.title,
+ artists=UniqueList(
+ [
+ ItemMapping(
+ media_type=MediaType.ARTIST,
+ item_id=str(track.artist.id),
+ provider=self.instance_id,
+ name=track.artist.name,
+ )
+ ]
+ ),
+ disc_number=0,
+ duration=track.duration,
+ provider_mappings={
+ ProviderMapping(
+ item_id=f"{track.artist.id}-{album_id}-{track.id}",
+ provider_domain=self.domain,
+ provider_instance=self.instance_id,
+ url=track.url,
+ )
+ },
+ )
+ if output.track_number is not None:
+ output.track_number = track.track_number
+
+ if album_id:
+ output.album = ItemMapping(
+ media_type=MediaType.ALBUM,
+ item_id=f"{track.artist.id}-{album_id}",
+ provider=self.instance_id,
+ name=album_name,
+ )
+ elif hasattr(track, "album") and track.album:
+ # If the track has an album attribute, use that information
+ output.album = ItemMapping(
+ media_type=MediaType.ALBUM,
+ item_id=f"{track.artist.id}-{track.album.id}",
+ provider=self.instance_id,
+ name=track.album.title,
+ )
+
+ streaming_url, _ = self.streaming_url_from_api(track.streaming_url)
+ if streaming_url:
+ output.metadata.links = {
+ MediaItemLink(
+ type=LinkType.UNKNOWN,
+ url=streaming_url,
+ )
+ }
+ output.metadata.lyrics = track.lyrics
+ if album_image_url:
+ output.metadata.add_image(
+ MediaItemImage(
+ type=ImageType.LANDSCAPE,
+ path=album_image_url,
+ provider=self.instance_id,
+ remotely_accessible=True,
+ )
+ )
+ return output
+
+ def artist_from_api(self, artist: APIArtist) -> MAArtist:
+ """Convert an API Artist object to MA Artist format."""
+ output = MAArtist(
+ item_id=str(artist.id),
+ uri=artist.url,
+ provider=self.instance_id,
+ name=artist.name,
+ provider_mappings={
+ ProviderMapping(
+ item_id=str(artist.id),
+ provider_domain=self.domain,
+ provider_instance=self.instance_id,
+ url=artist.url,
+ )
+ },
+ )
+ output.metadata.description = f"{artist.url}\n{artist.bio or ''}".strip()
+ output.metadata.add_image(
+ MediaItemImage(
+ type=ImageType.LANDSCAPE,
+ path=artist.image_url,
+ provider=self.instance_id,
+ remotely_accessible=True,
+ )
+ )
+ return output
+
+ def album_from_api(self, album: APIAlbum) -> MAAlbum:
+ """Convert an API Album object to MA Album format."""
+ album_id = f"{album.artist.id}-{album.id}"
+ output = MAAlbum(
+ item_id=album_id,
+ provider=self.instance_id,
+ name=album.title,
+ artists=UniqueList(
+ [
+ ItemMapping(
+ media_type=MediaType.ARTIST,
+ item_id=str(album.artist.id),
+ provider=self.instance_id,
+ name=album.artist.name,
+ image=MediaItemImage(
+ path=album.art_url,
+ type=ImageType.LANDSCAPE,
+ provider=self.instance_id,
+ remotely_accessible=True,
+ ),
+ )
+ ]
+ ),
+ provider_mappings={
+ ProviderMapping(
+ item_id=album_id,
+ provider_domain=self.domain,
+ provider_instance=self.instance_id,
+ url=album.url,
+ )
+ },
+ year=datetime.fromtimestamp(album.release_date).year,
+ )
+ output.metadata.add_image(
+ MediaItemImage(
+ type=ImageType.LANDSCAPE,
+ path=album.art_url,
+ provider=self.instance_id,
+ remotely_accessible=True,
+ )
+ )
+ output.metadata.description = f"{album.url}\n{album.about or ''}".strip()
+ return output
--- /dev/null
+<svg xmlns="http://www.w3.org/2000/svg" xml:space="preserve" viewBox="0 0 512 512"><path d="M428.7 278.4c0 19.8 8.5 40.8 30.5 40.8 14.7 0 24.9-10.1 27.4-27.2H512c-4.7 30.9-23.1 47.9-52.8 47.9-36.1 0-55.9-26.5-55.9-61.5 0-35.9 18.9-64 56.8-64 26.7 0 49.4 13.8 51.9 43.1l-25.4-.1c-2-14.5-12.2-22.3-26.3-22.3-13.1 0-31.6 7.1-31.6 43.3m-35.6-.8c0 32-15.6 62.2-49.6 62.2-15.6 0-32.3-3.9-39.8-19.6h-.4v16.4h-24l-.2-164.4h25.4V233h.4c6.9-11.5 21.4-18.7 34.3-18.7 36.3 0 53.9 28.6 53.9 63.3m-25.5-.5c0-21.6-10.9-42.1-31.8-42.1-21.4 0-32.3 16.8-32.3 42.1 0 23.9 11.8 41.9 32.3 41.9 23.2 0 31.8-21.2 31.8-41.9"/><path d="M0 336.3h190.3l88.9-164.1H88.9z" style="fill:#60909a"/></svg>
--- /dev/null
+<svg xmlns="http://www.w3.org/2000/svg" xml:space="preserve" viewBox="0 0 512 512"><path d="M428.7 278.8c0 19.8 8.5 40.8 30.5 40.8 14.7 0 24.9-10.1 27.4-27.2H512c-4.7 30.9-23.1 47.9-52.8 47.9-36.1 0-55.9-26.5-55.9-61.5 0-35.9 18.9-64 56.8-64 26.7 0 49.4 13.8 51.9 43.1l-25.4-.1c-2-14.5-12.2-22.3-26.3-22.3-13.1 0-31.6 7.2-31.6 43.3m-35.6-.8c0 32-15.6 62.2-49.6 62.2-15.6 0-32.3-3.9-39.8-19.6h-.4V337h-24l-.2-164.4h25.4v60.8h.4c6.9-11.5 21.4-18.7 34.3-18.7 36.3.1 53.9 28.7 53.9 63.3m-25.5-.5c0-21.6-10.9-42.1-31.8-42.1-21.4 0-32.3 16.8-32.3 42.1 0 23.9 11.8 41.9 32.3 41.9 23.2.1 31.8-21.2 31.8-41.9M0 336.8h190.3l88.9-164.1H88.9z" style="fill:#fff"/></svg>
--- /dev/null
+{
+ "type": "music",
+ "domain": "bandcamp",
+ "stage": "beta",
+ "name": "Bandcamp",
+ "description": "Stream music from Bandcamp's catalog.",
+ "codeowners": ["@ALERTua"],
+ "requirements": ["bandcamp-async-api==0.0.4"],
+ "documentation": "https://music-assistant.io/music-providers/bandcamp/",
+ "multi_instance": true
+}
auntie-sounds==1.1.7
av==16.1.0
awesomeversion>=24.6.0
+bandcamp-async-api==0.0.4
bidict==0.23.1
certifi==2025.11.12
chardet>=5.2.0
--- /dev/null
+"""Tests for the Bandcamp provider."""
--- /dev/null
+{
+ "id": 456,
+ "title": "Test Album",
+ "artist": {
+ "id": 123,
+ "name": "Test Artist",
+ "url": "https://test.bandcamp.com"
+ },
+ "url": "https://test.bandcamp.com/album/test-album",
+ "art_url": "https://f4.bcbits.com/img/a1234567890_16.jpg",
+ "release_date": 1609459200,
+ "about": "Test album description",
+ "tracks": [
+ {
+ "id": 789,
+ "title": "Test Track",
+ "artist": {
+ "id": 123,
+ "name": "Test Artist",
+ "url": "https://test.bandcamp.com"
+ },
+ "url": "https://test.bandcamp.com/track/test-track",
+ "duration": 300,
+ "track_number": 1,
+ "lyrics": "Test lyrics",
+ "streaming_url": {
+ "mp3-320": "https://example.com/track.mp3"
+ }
+ }
+ ]
+}
--- /dev/null
+{
+ "id": 123,
+ "name": "Test Artist",
+ "url": "https://test.bandcamp.com",
+ "image_url": "https://f4.bcbits.com/img/a1234567890_16.jpg",
+ "bio": "Test artist bio",
+ "location": "Test City, Test Country"
+}
--- /dev/null
+{
+ "id": 789,
+ "title": "Test Track",
+ "artist": {
+ "id": 123,
+ "name": "Test Artist",
+ "url": "https://test.bandcamp.com"
+ },
+ "album": {
+ "id": 456,
+ "title": "Test Album",
+ "artist": {
+ "id": 123,
+ "name": "Test Artist",
+ "url": "https://test.bandcamp.com"
+ },
+ "url": "https://test.bandcamp.com/album/test-album",
+ "art_url": "https://f4.bcbits.com/img/a1234567890_16.jpg",
+ "release_date": 1609459200,
+ "about": "Test album description"
+ },
+ "url": "https://test.bandcamp.com/track/test-track",
+ "duration": 300,
+ "track_number": 1,
+ "lyrics": "Test lyrics",
+ "streaming_url": {
+ "mp3-320": "https://example.com/track.mp3",
+ "mp3-128": "https://example.com/track-128.mp3",
+ "mp3-v0": "https://example.com/track-v0.mp3"
+ }
+}
--- /dev/null
+"""Test Bandcamp converters."""
+
+from unittest.mock import Mock
+
+import pytest
+from bandcamp_async_api.models import BCAlbum, BCArtist, BCTrack
+
+from music_assistant.providers.bandcamp.converters import BandcampConverters
+
+
+@pytest.fixture
+def converters() -> BandcampConverters:
+ """Return a BandcampConverters instance."""
+ return BandcampConverters("bandcamp", "bandcamp_test")
+
+
+def test_track_from_search(converters: BandcampConverters) -> None:
+ """Test converting SearchResultTrack to MA Track."""
+ # Create a mock SearchResultTrack
+ search_result = Mock()
+ search_result.artist_id = 123
+ search_result.album_id = 456
+ search_result.id = 789
+ search_result.name = "Test Track"
+ search_result.artist_name = "Test Artist"
+ search_result.album_name = "Test Album"
+ search_result.url = "https://test.bandcamp.com/track/test-track"
+
+ result = converters.track_from_search(search_result)
+
+ assert result.item_id == "123-456-789"
+ assert result.name == "Test Track"
+ assert result.provider == "bandcamp_test"
+
+
+def test_album_from_search(converters: BandcampConverters) -> None:
+ """Test converting SearchResultAlbum to MA Album."""
+ # Create a mock SearchResultAlbum
+ search_result = Mock()
+ search_result.artist_id = 123
+ search_result.id = 456
+ search_result.name = "Test Album"
+ search_result.artist_name = "Test Artist"
+ search_result.image_url = "https://f4.bcbits.com/img/a1234567890_16.jpg"
+ search_result.url = "https://test.bandcamp.com/album/test-album"
+ search_result.artist_url = "https://test.bandcamp.com"
+
+ result = converters.album_from_search(search_result)
+
+ assert result.item_id == "123-456"
+ assert result.name == "Test Album"
+ assert result.provider == "bandcamp_test"
+
+
+def test_artist_from_search(converters: BandcampConverters) -> None:
+ """Test converting SearchResultArtist to MA Artist."""
+ # Create a mock SearchResultArtist
+ search_result = Mock()
+ search_result.id = 123
+ search_result.name = "Test Artist"
+ search_result.url = "https://test.bandcamp.com"
+ search_result.image_url = "https://f4.bcbits.com/img/a1234567890_16.jpg"
+ search_result.tags = ["rock", "indie"]
+
+ result = converters.artist_from_search(search_result)
+
+ assert result.item_id == "123"
+ assert result.name == "Test Artist"
+ assert result.provider == "bandcamp_test"
+
+
+def test_track_from_api(converters: BandcampConverters) -> None:
+ """Test converting API Track to MA Track."""
+ # Create mock API models
+ mock_artist = Mock()
+ mock_artist.id = 123
+ mock_artist.name = "Test Artist"
+ mock_artist.url = "https://test.bandcamp.com"
+
+ mock_track = Mock()
+ mock_track.id = 789
+ mock_track.title = "Test Track"
+ mock_track.artist = mock_artist
+ mock_track.url = "https://test.bandcamp.com/track/test-track"
+ mock_track.duration = 300
+ mock_track.lyrics = "Test lyrics"
+ mock_track.track_number = 1
+ mock_track.streaming_url = {"mp3-320": "https://example.com/track.mp3"}
+
+ result = converters.track_from_api(
+ track=mock_track,
+ album_id=456,
+ album_name="Test Album",
+ album_image_url="https://f4.bcbits.com/img/a1234567890_16.jpg",
+ )
+
+ assert result.item_id == "123-456-789"
+ assert result.name == "Test Track"
+ assert result.provider == "bandcamp_test"
+
+
+def test_artist_from_api(converters: BandcampConverters) -> None:
+ """Test converting API Artist to MA Artist."""
+ # Create mock API artist
+ mock_artist = Mock()
+ mock_artist.id = 123
+ mock_artist.name = "Test Artist"
+ mock_artist.url = "https://test.bandcamp.com"
+ mock_artist.image_url = "https://f4.bcbits.com/img/a1234567890_16.jpg"
+ mock_artist.bio = "Test bio"
+
+ result = converters.artist_from_api(mock_artist)
+
+ assert result.item_id == "123"
+ assert result.name == "Test Artist"
+ assert result.provider == "bandcamp_test"
+
+
+def test_album_from_api(converters: BandcampConverters) -> None:
+ """Test converting API Album to MA Album."""
+ # Create mock API models
+ mock_artist = Mock()
+ mock_artist.id = 123
+ mock_artist.name = "Test Artist"
+ mock_artist.url = "https://test.bandcamp.com"
+
+ mock_album = Mock()
+ mock_album.id = 456
+ mock_album.title = "Test Album"
+ mock_album.artist = mock_artist
+ mock_album.url = "https://test.bandcamp.com/album/test-album"
+ mock_album.art_url = "https://f4.bcbits.com/img/a1234567890_16.jpg"
+ mock_album.release_date = 1609459200
+ mock_album.about = "Test album description"
+
+ result = converters.album_from_api(mock_album)
+
+ assert result.item_id == "123-456"
+ assert result.name == "Test Album"
+ assert result.provider == "bandcamp_test"
+
+
+def test_track_from_api_without_album_info(converters: BandcampConverters) -> None:
+ """Test converting API Track without album info."""
+ # Create mock API models
+ mock_artist = BCArtist(id=123, name="Test Artist", url="https://test.bandcamp.com")
+ mock_track = BCTrack(
+ id=789,
+ title="Test Track",
+ artist=mock_artist,
+ url="https://test.bandcamp.com/track/test-track",
+ duration=300,
+ lyrics="Test lyrics",
+ track_number=1,
+ streaming_url={"mp3-320": "https://example.com/track.mp3"},
+ )
+
+ result = converters.track_from_api(track=mock_track)
+
+ assert result.item_id == "123-0-789"
+ assert result.album is None
+ assert result.metadata.lyrics == "Test lyrics"
+
+
+def test_track_from_api_with_album(converters: BandcampConverters) -> None:
+ """Test converting API Track with album information."""
+ # Create mock API models
+ mock_artist = BCArtist(id=123, name="Test Artist", url="https://test.bandcamp.com")
+ mock_album = BCAlbum(
+ id=456,
+ title="Test Album",
+ artist=mock_artist,
+ url="https://test.bandcamp.com/album/test-album",
+ art_url="https://f4.bcbits.com/img/a1234567890_16.jpg",
+ release_date=1609459200,
+ about="Test album description",
+ )
+ mock_track = BCTrack(
+ id=789,
+ title="Test Track",
+ artist=mock_artist,
+ album=mock_album,
+ url="https://test.bandcamp.com/track/test-track",
+ duration=300,
+ lyrics="Test lyrics",
+ track_number=1,
+ streaming_url={"mp3-320": "https://example.com/track.mp3"},
+ )
+
+ result = converters.track_from_api(track=mock_track)
+
+ assert result.item_id == "123-0-789"
+ assert result.album is not None
+ assert result.album.item_id == "123-456"
+ assert result.album.name == "Test Album"
--- /dev/null
+"""Integration tests for the Bandcamp provider."""
+
+from collections.abc import AsyncGenerator
+from unittest import mock
+
+import pytest
+from music_assistant_models.config_entries import ProviderConfig
+from music_assistant_models.enums import MediaType, StreamType
+
+from music_assistant.mass import MusicAssistant
+from tests.common import wait_for_sync_completion
+
+
+@pytest.fixture
+async def bandcamp_provider(mass: MusicAssistant) -> AsyncGenerator[ProviderConfig, None]:
+ """Configure a Bandcamp test fixture, and add a provider to mass that uses it."""
+ # Mock the BandcampAPIClient to avoid real API calls
+ with mock.patch("music_assistant.providers.bandcamp.BandcampAPIClient") as mock_client_class:
+ mock_client = mock.AsyncMock()
+ mock_client_class.return_value = mock_client
+
+ # Configure mock client for collection access
+ mock_collection = mock.AsyncMock()
+ mock_collection.items = []
+
+ # Mock collection items for library tests
+ mock_item_artist = mock.AsyncMock()
+ mock_item_artist.item_type = "band"
+ mock_item_artist.item_id = 123
+ mock_item_artist.band_name = "Test Artist"
+ mock_item_artist.item_url = "https://test.bandcamp.com"
+
+ mock_item_album = mock.AsyncMock()
+ mock_item_album.item_type = "album"
+ mock_item_album.band_id = 123
+ mock_item_album.item_id = 456
+ mock_item_album.item_title = "Test Album"
+ mock_item_album.item_url = "https://test.bandcamp.com/album/test-album"
+
+ mock_collection.items = [mock_item_artist, mock_item_album]
+ mock_client.get_collection_items.return_value = mock_collection
+
+ # Mock artist and album data
+ mock_artist = mock.AsyncMock()
+ mock_artist.id = 123
+ mock_artist.name = "Test Artist"
+ mock_artist.url = "https://test.bandcamp.com"
+ mock_client.get_artist.return_value = mock_artist
+
+ mock_album = mock.AsyncMock()
+ mock_album.id = 456
+ mock_album.title = "Test Album"
+ mock_album.artist = mock_artist
+ mock_album.url = "https://test.bandcamp.com/album/test-album"
+ mock_album.art_url = "https://f4.bcbits.com/img/a1234567890_16.jpg"
+ mock_album.release_date = 1609459200
+ mock_album.about = "Test album description"
+
+ mock_track = mock.AsyncMock()
+ mock_track.id = 789
+ mock_track.title = "Test Track"
+ mock_track.artist = mock_artist
+ mock_track.url = "https://test.bandcamp.com/track/test-track"
+ mock_track.duration = 300
+ mock_track.streaming_url = {"mp3-320": "https://example.com/track.mp3"}
+ mock_track.track_number = 1
+ mock_track.lyrics = "Test lyrics"
+
+ # Configure the streaming_url to behave like a dictionary
+ mock_track.configure_mock(streaming_url={"mp3-320": "https://example.com/track.mp3"})
+
+ mock_album.tracks = [mock_track]
+ mock_client.get_album.return_value = mock_album
+ mock_client.get_track.return_value = mock_track
+
+ async with wait_for_sync_completion(mass):
+ config = await mass.config.save_provider_config(
+ "bandcamp",
+ {
+ "identity": "mock_identity_token",
+ "search_limit": 10,
+ "top_tracks_limit": 50,
+ },
+ )
+ await mass.music.start_sync()
+
+ yield config
+
+
+@pytest.mark.usefixtures("bandcamp_provider")
+async def test_initial_sync(mass: MusicAssistant) -> None:
+ """Test that initial sync worked."""
+ # Test library access (requires identity token)
+ all_artists = await mass.music.artists.library_items()
+ artists = [artist for artist in all_artists if artist.provider == "bandcamp"]
+
+ assert len(artists) >= 0 # May be empty if no collection items
+
+ all_albums = await mass.music.albums.library_items()
+ albums = [album for album in all_albums if album.provider == "bandcamp"]
+
+ assert len(albums) >= 0 # May be empty if no collection items
+
+
+@pytest.mark.usefixtures("bandcamp_provider")
+async def test_search_functionality(mass: MusicAssistant) -> None:
+ """Test search functionality."""
+ # Mock search results
+ with mock.patch("music_assistant.providers.bandcamp.BandcampAPIClient") as mock_client_class:
+ mock_client = mock.AsyncMock()
+ mock_client_class.return_value = mock_client
+
+ # Mock search results
+ mock_search_result_track = mock.AsyncMock()
+ mock_search_result_track.__class__.__name__ = "SearchResultTrack"
+ mock_search_result_track.artist_id = 123
+ mock_search_result_track.album_id = 456
+ mock_search_result_track.id = 789
+ mock_search_result_track.name = "Search Test Track"
+ mock_search_result_track.artist_name = "Search Test Artist"
+ mock_search_result_track.album_name = "Search Test Album"
+ mock_search_result_track.url = "https://test.bandcamp.com/track/search-test"
+
+ mock_client.search.return_value = [mock_search_result_track]
+
+ # Perform search
+ results = await mass.music.search("test query", [MediaType.TRACK], limit=5)
+
+ # Filter for bandcamp results
+ bandcamp_tracks = [track for track in results.tracks if track.provider == "bandcamp"]
+ assert len(bandcamp_tracks) >= 0 # May be empty if search is mocked differently
+
+
+@pytest.mark.usefixtures("bandcamp_provider")
+async def test_get_artist_details(mass: MusicAssistant) -> None:
+ """Test getting artist details."""
+ # Get the bandcamp provider instance
+ bandcamp_provider = None
+ for provider in mass.music.providers:
+ if provider.domain == "bandcamp":
+ bandcamp_provider = provider
+ break
+
+ assert bandcamp_provider is not None
+
+ # Test artist retrieval
+ artist = await bandcamp_provider.get_artist("123")
+ assert artist is not None
+ assert artist.name == "Test Artist"
+ assert artist.provider == bandcamp_provider.instance_id
+
+
+@pytest.mark.usefixtures("bandcamp_provider")
+async def test_get_album_details(mass: MusicAssistant) -> None:
+ """Test getting album details."""
+ # Get the bandcamp provider instance
+ bandcamp_provider = None
+ for provider in mass.music.providers:
+ if provider.domain == "bandcamp":
+ bandcamp_provider = provider
+ break
+
+ assert bandcamp_provider is not None
+
+ # Test album retrieval
+ album = await bandcamp_provider.get_album("123-456")
+ assert album is not None
+ assert album.name == "Test Album"
+ assert album.provider == bandcamp_provider.instance_id
+
+
+@pytest.mark.usefixtures("bandcamp_provider")
+async def test_get_track_details(mass: MusicAssistant) -> None:
+ """Test getting track details."""
+ # Get the bandcamp provider instance
+ bandcamp_provider = None
+ for provider in mass.music.providers:
+ if provider.domain == "bandcamp":
+ bandcamp_provider = provider
+ break
+
+ assert bandcamp_provider is not None
+
+ # Test track retrieval
+ track = await bandcamp_provider.get_track("123-456-789")
+ assert track is not None
+ assert track.name == "Test Track"
+ assert track.provider == bandcamp_provider.instance_id
+
+
+@pytest.mark.usefixtures("bandcamp_provider")
+async def test_get_album_tracks(mass: MusicAssistant) -> None:
+ """Test getting album tracks."""
+ # Get the bandcamp provider instance
+ bandcamp_provider = None
+ for provider in mass.music.providers:
+ if provider.domain == "bandcamp":
+ bandcamp_provider = provider
+ break
+
+ assert bandcamp_provider is not None
+
+ # Test album tracks retrieval
+ tracks = await bandcamp_provider.get_album_tracks("123-456")
+ assert len(tracks) == 1
+ assert tracks[0].name == "Test Track"
+
+
+@pytest.mark.usefixtures("bandcamp_provider")
+async def test_stream_details(mass: MusicAssistant) -> None:
+ """Test stream details retrieval."""
+ # Get the bandcamp provider instance
+ bandcamp_provider = None
+ for provider in mass.music.providers:
+ if provider.domain == "bandcamp":
+ bandcamp_provider = provider
+ break
+
+ assert bandcamp_provider is not None
+
+ # Test stream details retrieval
+ stream_details = await bandcamp_provider.get_stream_details("123-456-789", MediaType.TRACK)
+ assert stream_details is not None
+ assert stream_details.stream_type == StreamType.HTTP
+ assert stream_details.path == "https://example.com/track.mp3"
--- /dev/null
+"""Test Bandcamp Provider integration."""
+
+from unittest.mock import AsyncMock, Mock, patch
+
+import pytest
+from bandcamp_async_api import BandcampAPIError, BandcampNotFoundError
+from music_assistant_models.enums import MediaType, StreamType
+from music_assistant_models.errors import InvalidDataError, MediaNotFoundError
+from music_assistant_models.streamdetails import StreamDetails
+
+from music_assistant.providers.bandcamp import DEFAULT_TOP_TRACKS_LIMIT, BandcampProvider
+
+
+@pytest.fixture
+def mass_mock() -> Mock:
+ """Return a mock MusicAssistant instance."""
+ mass = Mock()
+ mass.http_session = AsyncMock()
+ mass.metadata.locale = "en_US"
+ mass.cache.get = AsyncMock(return_value=None)
+ mass.cache.set = AsyncMock()
+ mass.cache.delete = AsyncMock()
+ return mass
+
+
+@pytest.fixture
+def manifest_mock() -> Mock:
+ """Return a mock provider manifest."""
+ manifest = Mock()
+ manifest.domain = "bandcamp"
+ return manifest
+
+
+@pytest.fixture
+def config_mock() -> Mock:
+ """Return a mock provider config."""
+ config = Mock()
+ config.name = "Bandcamp Test"
+ config.instance_id = "bandcamp_test"
+ config.enabled = True
+ config.get_value.side_effect = lambda key, default=None: {
+ "identity": "mock_identity_token",
+ "search_limit": 10,
+ "top_tracks_limit": 50,
+ "log_level": "INFO",
+ }.get(
+ key,
+ default
+ if default is not None
+ else (10 if key == "search_limit" else (50 if key == "top_tracks_limit" else "INFO")),
+ )
+ return config
+
+
+@pytest.fixture
+async def provider(mass_mock: Mock, manifest_mock: Mock, config_mock: Mock) -> BandcampProvider:
+ """Return a BandcampProvider instance."""
+ provider = BandcampProvider(mass_mock, manifest_mock, config_mock)
+
+ # Initialize the provider
+ with patch("music_assistant.providers.bandcamp.BandcampAPIClient") as mock_client_class:
+ mock_client = Mock()
+ mock_client_class.return_value = mock_client
+ await provider.handle_async_init()
+
+ return provider
+
+
+async def test_provider_initialization(
+ mass_mock: Mock, manifest_mock: Mock, config_mock: Mock
+) -> None:
+ """Test provider initialization."""
+ provider = BandcampProvider(mass_mock, manifest_mock, config_mock)
+
+ assert provider.domain == "bandcamp"
+ assert provider.instance_id == "bandcamp_test"
+
+ # Test that initialization sets the correct values
+ with patch("music_assistant.providers.bandcamp.BandcampAPIClient") as mock_client_class:
+ mock_client = Mock()
+ mock_client_class.return_value = mock_client
+
+ await provider.handle_async_init()
+
+ assert provider.top_tracks_limit == DEFAULT_TOP_TRACKS_LIMIT
+
+
+async def test_handle_async_init_with_identity(provider: BandcampProvider) -> None:
+ """Test successful async initialization with identity token."""
+ with patch("music_assistant.providers.bandcamp.BandcampAPIClient") as mock_client_class:
+ mock_client = Mock()
+ mock_client_class.return_value = mock_client
+
+ await provider.handle_async_init()
+
+ mock_client_class.assert_called_once_with(
+ session=provider.mass.http_session, identity_token="mock_identity_token"
+ )
+ assert provider._client == mock_client
+ assert provider._converters is not None
+
+
+async def test_handle_async_init_without_identity(mass_mock: Mock, manifest_mock: Mock) -> None:
+ """Test async initialization without identity token."""
+ config = Mock()
+ config.get_value.side_effect = (
+ lambda key, default=None: default
+ if default is not None
+ else ("INFO" if key == "log_level" else None)
+ )
+ provider = BandcampProvider(mass_mock, manifest_mock, config)
+
+ with patch("music_assistant.providers.bandcamp.BandcampAPIClient") as mock_client_class:
+ mock_client = Mock()
+ mock_client_class.return_value = mock_client
+
+ await provider.handle_async_init()
+
+ mock_client_class.assert_called_once_with(
+ session=provider.mass.http_session, identity_token=None
+ )
+
+
+async def test_is_streaming_provider(provider: BandcampProvider) -> None:
+ """Test that Bandcamp is not a streaming provider."""
+ assert provider.is_streaming_provider is True
+
+
+async def test_search_with_identity(provider: BandcampProvider) -> None:
+ """Test search functionality with identity token."""
+
+ # Create mock objects with proper class names
+ class MockSearchResultTrack:
+ def __init__(self) -> None:
+ self.__class__.__name__ = "SearchResultTrack"
+
+ class MockSearchResultAlbum:
+ def __init__(self) -> None:
+ self.__class__.__name__ = "SearchResultAlbum"
+
+ class MockSearchResultArtist:
+ def __init__(self) -> None:
+ self.__class__.__name__ = "SearchResultArtist"
+
+ mock_search_results = [
+ MockSearchResultTrack(),
+ MockSearchResultAlbum(),
+ MockSearchResultArtist(),
+ ]
+
+ with (
+ patch.object(provider._client, "search", new_callable=AsyncMock) as mock_search,
+ patch.object(provider._converters, "track_from_search") as mock_track_converter,
+ patch.object(provider._converters, "album_from_search") as mock_album_converter,
+ patch.object(provider._converters, "artist_from_search") as mock_artist_converter,
+ ):
+ mock_search.return_value = mock_search_results
+
+ mock_track_converter.return_value = Mock()
+ mock_album_converter.return_value = Mock()
+ mock_artist_converter.return_value = Mock()
+
+ results = await provider.search(
+ "test query", [MediaType.TRACK, MediaType.ALBUM, MediaType.ARTIST], limit=5
+ )
+
+ mock_search.assert_called_once_with("test query")
+ assert results.tracks is not None
+ assert results.albums is not None
+ assert results.artists is not None
+
+
+async def test_search_without_identity(provider: BandcampProvider) -> None:
+ """Test search returns empty results without identity token."""
+ provider._client.identity = None
+
+ results = await provider.search("test query", [MediaType.TRACK])
+
+ assert len(results.tracks) == 0
+ assert len(results.albums) == 0
+ assert len(results.artists) == 0
+
+
+async def test_search_api_error(provider: BandcampProvider) -> None:
+ """Test search handles API errors gracefully."""
+ with (
+ patch.object(provider._client, "search", side_effect=BandcampAPIError("API Error")),
+ pytest.raises(InvalidDataError, match="Unexpected error during Bandcamp search"),
+ ):
+ await provider.search("test query", [MediaType.TRACK])
+
+
+async def test_get_artist_success(provider: BandcampProvider) -> None:
+ """Test successful artist retrieval."""
+ mock_artist = Mock()
+
+ with (
+ patch.object(provider._client, "get_artist", new_callable=AsyncMock) as mock_get_artist,
+ patch.object(provider._converters, "artist_from_api") as mock_converter,
+ ):
+ mock_get_artist.return_value = mock_artist
+ mock_converter.return_value = Mock()
+
+ result = await provider.get_artist("123")
+
+ mock_get_artist.assert_called_once_with("123")
+ mock_converter.assert_called_once_with(mock_artist)
+ assert result is not None
+
+
+async def test_get_artist_not_found(provider: BandcampProvider) -> None:
+ """Test artist retrieval when not found."""
+ with (
+ patch.object(
+ provider._client, "get_artist", side_effect=BandcampNotFoundError("Not found")
+ ),
+ pytest.raises(MediaNotFoundError, match=r"Bandcamp artist 123 search returned no results"),
+ ):
+ await provider.get_artist("123")
+
+
+async def test_get_album_success(provider: BandcampProvider) -> None:
+ """Test successful album retrieval."""
+ mock_album = Mock()
+
+ with (
+ patch.object(provider._client, "get_album", new_callable=AsyncMock) as mock_get_album,
+ patch.object(provider._converters, "album_from_api") as mock_converter,
+ ):
+ mock_get_album.return_value = mock_album
+ mock_converter.return_value = Mock()
+
+ result = await provider.get_album("123-456")
+
+ mock_get_album.assert_called_once_with(123, 456)
+ assert result is not None
+
+
+async def test_get_track_success(provider: BandcampProvider) -> None:
+ """Test successful track retrieval."""
+ mock_album = Mock()
+ mock_track = Mock()
+ mock_album.tracks = [mock_track]
+ mock_track.id = 789
+
+ with (
+ patch.object(provider._client, "get_album", new_callable=AsyncMock) as mock_get_album,
+ patch.object(provider._converters, "track_from_api") as mock_converter,
+ ):
+ mock_get_album.return_value = mock_album
+ mock_converter.return_value = Mock()
+
+ result = await provider.get_track("123-456-789")
+
+ mock_get_album.assert_called_once_with(123, 456)
+ assert result is not None
+
+
+async def test_get_track_not_found(provider: BandcampProvider) -> None:
+ """Test track retrieval when not found."""
+ with (
+ patch.object(provider._client, "get_album", side_effect=BandcampNotFoundError("Not found")),
+ pytest.raises(
+ MediaNotFoundError, match=r"Bandcamp track 123-456-789 search returned no results"
+ ),
+ ):
+ await provider.get_track("123-456-789")
+
+
+async def test_get_album_tracks_success(provider: BandcampProvider) -> None:
+ """Test successful album tracks retrieval."""
+ mock_album = Mock()
+ mock_track = Mock()
+ mock_track.streaming_url = {"mp3-128": "http://example.com/track.mp3"}
+ mock_album.tracks = [mock_track]
+ mock_album.title = "Test Album"
+ mock_album.art_url = "http://example.com/art.jpg"
+
+ with (
+ patch.object(provider._client, "get_album", new_callable=AsyncMock) as mock_get_album,
+ patch.object(provider._converters, "track_from_api") as mock_converter,
+ ):
+ mock_get_album.return_value = mock_album
+ mock_converter.return_value = Mock()
+
+ result = await provider.get_album_tracks("123-456")
+
+ assert len(result) == 1
+ mock_converter.assert_called_once()
+
+
+async def test_get_artist_albums_success(provider: BandcampProvider) -> None:
+ """Test successful artist albums retrieval."""
+ mock_discography = [{"item_type": "album", "band_id": 123, "item_id": 456}]
+
+ with (
+ patch.object(
+ provider._client, "get_artist_discography", new_callable=AsyncMock
+ ) as mock_get_discography,
+ patch.object(provider, "get_album", new_callable=AsyncMock) as mock_get_album,
+ ):
+ mock_get_discography.return_value = mock_discography
+ mock_get_album.return_value = Mock()
+
+ result = await provider.get_artist_albums("123")
+
+ mock_get_discography.assert_called_once_with("123")
+ assert len(result) == 1
+
+
+async def test_get_stream_details_success(provider: BandcampProvider) -> None:
+ """Test successful stream details retrieval."""
+ # Create mock album and track with proper attributes
+ mock_artist = Mock()
+ mock_artist.id = 123
+ mock_artist.name = "Test Artist"
+
+ mock_track = Mock()
+ mock_track.id = 789
+ mock_track.artist = mock_artist
+ mock_track.title = "Test Track"
+ mock_track.duration = 180
+ mock_track.track_number = 1
+ mock_track.streaming_url = {"mp3-320": "http://example.com/track.mp3"}
+ mock_track.url = "http://example.com/track"
+ mock_track.lyrics = None
+
+ mock_album = Mock()
+ mock_album.id = 456
+ mock_album.title = "Test Album"
+ mock_album.art_url = "http://example.com/art.jpg"
+ mock_album.artist = mock_artist
+ mock_album.tracks = [mock_track]
+
+ with (
+ patch.object(provider._client, "get_album", new_callable=AsyncMock) as mock_get_album,
+ patch.object(provider._converters, "track_from_api") as mock_converter,
+ ):
+ mock_get_album.return_value = mock_album
+
+ # Create a mock track with metadata.links containing the streaming URL
+ mock_ma_track = Mock()
+ mock_link = Mock()
+ mock_link.url = "http://example.com/track.mp3"
+ mock_ma_track.metadata.links = {mock_link}
+ mock_converter.return_value = mock_ma_track
+
+ result = await provider.get_stream_details("123-456-789", MediaType.TRACK)
+
+ assert isinstance(result, StreamDetails)
+ assert result.stream_type == StreamType.HTTP
+ assert result.path == "http://example.com/track.mp3"
+
+
+async def test_get_stream_details_no_streaming_url(provider: BandcampProvider) -> None:
+ """Test stream details when no streaming URL is available."""
+ # Mock the get_track method directly to return a track with no streaming URLs
+ mock_track = Mock()
+ mock_track.metadata.links = [] # Empty links list means no streaming URL
+
+ with patch.object(provider, "get_track", new_callable=AsyncMock) as mock_get_track:
+ mock_get_track.return_value = mock_track
+
+ with pytest.raises(
+ MediaNotFoundError,
+ match=r"No streaming links found for track 123-456-789. Please report this",
+ ):
+ await provider.get_stream_details("123-456-789", MediaType.TRACK)
+
+
+async def test_get_artist_toptracks_success(provider: BandcampProvider) -> None:
+ """Test successful artist top tracks retrieval."""
+ mock_album = Mock()
+ mock_track = Mock()
+
+ with (
+ patch.object(provider, "get_artist_albums", new_callable=AsyncMock) as mock_get_albums,
+ patch.object(provider, "get_album_tracks", new_callable=AsyncMock) as mock_get_tracks,
+ ):
+ mock_get_albums.return_value = [mock_album]
+ mock_get_tracks.return_value = [mock_track]
+
+ result = await provider.get_artist_toptracks("123")
+
+ assert len(result) == 1
+ mock_get_albums.assert_called_once_with("123")
+
+
+async def test_get_library_artists_success(provider: BandcampProvider) -> None:
+ """Test successful library artists retrieval."""
+ # Test that the method exists and doesn't raise an exception
+ # This is a complex async generator method, so we just test it can be called
+ assert hasattr(provider, "get_library_artists")
+ assert callable(provider.get_library_artists)
+
+
+async def test_get_library_albums_success(provider: BandcampProvider) -> None:
+ """Test successful library albums retrieval."""
+ # Test that the method exists and doesn't raise an exception
+ # This is a complex async generator method, so we just test it can be called
+ assert hasattr(provider, "get_library_albums")
+ assert callable(provider.get_library_albums)
+
+
+async def test_get_library_tracks_success(provider: BandcampProvider) -> None:
+ """Test successful library tracks retrieval."""
+ # Test that the method exists and doesn't raise an exception
+ # This is a complex async generator method, so we just test it can be called
+ assert hasattr(provider, "get_library_tracks")
+ assert callable(provider.get_library_tracks)