From: Alexey ALERT Rubashёff Date: Thu, 15 Jan 2026 07:55:06 +0000 (+0200) Subject: Add Bandcamp Music Provider (#2871) X-Git-Url: https://git.kitaultman.com/?a=commitdiff_plain;h=177d236840dc45a8ad4c7ad31ba4cd16d8555c30;p=music-assistant-server.git Add Bandcamp Music Provider (#2871) * bandcamp * mypy, ruff * fix provider stream details tests * fix eol for icons * final touch * bump bandcamp-async-api to 0.0.3 * add BandcampMustBeLoggedInError support - bump bandcamp-async-api to 0.0.4 * fix: PR review - is_streaming_provider is now True - catching and raising exceptions revamp - remove redundant stream content type * requirements_all.txt line endings * fix: Add error handling for missing streaming links in BandcampProvider. - Add error handling for missing streaming links in BandcampProvider. - Enhance error handling and update test assertions for Bandcamp provider. * fix: typing * minor: specify the exact cookie name * PR review fixes - Remove search limit configuration. - CONF_TOP_TRACKS_LIMIT moved to advanced config category - Update test to use `DEFAULT_TOP_TRACKS_LIMIT` constant. * PR review: reintroduced search limit, set it to 50 --- diff --git a/music_assistant/providers/bandcamp/__init__.py b/music_assistant/providers/bandcamp/__init__.py new file mode 100644 index 00000000..a0ac7120 --- /dev/null +++ b/music_assistant/providers/bandcamp/__init__.py @@ -0,0 +1,414 @@ +"""Bandcamp music provider support for MusicAssistant.""" + +import asyncio +from collections.abc import AsyncGenerator +from contextlib import suppress +from typing import cast + +from bandcamp_async_api import ( + BandcampAPIClient, + BandcampAPIError, + BandcampMustBeLoggedInError, + BandcampNotFoundError, + SearchResultAlbum, + SearchResultArtist, + SearchResultTrack, +) +from bandcamp_async_api.models import CollectionType +from music_assistant_models.config_entries import ConfigEntry, ConfigValueType, ProviderConfig +from music_assistant_models.enums import ( + ConfigEntryType, + MediaType, + ProviderFeature, + StreamType, +) +from music_assistant_models.errors import ( + InvalidDataError, + LoginFailed, + MediaNotFoundError, +) +from music_assistant_models.media_items import ( + Album, + Artist, + AudioFormat, + SearchResults, + Track, +) +from music_assistant_models.provider import ProviderManifest +from music_assistant_models.streamdetails import StreamDetails + +from music_assistant.controllers.cache import use_cache +from music_assistant.helpers.throttle_retry import ThrottlerManager, throttle_with_retries +from music_assistant.mass import MusicAssistant +from music_assistant.models import ProviderInstanceType +from music_assistant.models.music_provider import MusicProvider + +from .converters import BandcampConverters + +SUPPORTED_FEATURES = { + ProviderFeature.LIBRARY_ARTISTS, + ProviderFeature.LIBRARY_ALBUMS, + ProviderFeature.LIBRARY_TRACKS, + ProviderFeature.SEARCH, + ProviderFeature.ARTIST_ALBUMS, + ProviderFeature.ARTIST_TOPTRACKS, +} + +CONF_IDENTITY = "identity" +CONF_TOP_TRACKS_LIMIT = "top_tracks_limit" +DEFAULT_TOP_TRACKS_LIMIT = 50 +CACHE = 3600 * 24 * 30 # Cache for 30 days + + +async def setup( + mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig +) -> ProviderInstanceType: + """Initialize provider(instance) with given configuration.""" + return BandcampProvider(mass, manifest, config, SUPPORTED_FEATURES) + + +# noinspection PyTypeHints,PyUnusedLocal +async def get_config_entries( + mass: MusicAssistant, # noqa: ARG001 + instance_id: str | None = None, # noqa: ARG001 + action: str | None = None, # noqa: ARG001 + values: dict[str, ConfigValueType] | None = None, +) -> tuple[ConfigEntry, ...]: + """Return Config entries to setup this provider.""" + return ( + ConfigEntry( + key=CONF_IDENTITY, + type=ConfigEntryType.SECURE_STRING, + label="Identity token", + required=False, + description="Identity token from Bandcamp cookies for account collection access." + " Log in https://bandcamp.com and extract browser cookie named 'identity'.", + value=values.get(CONF_IDENTITY) if values else None, + ), + ConfigEntry( + key=CONF_TOP_TRACKS_LIMIT, + type=ConfigEntryType.INTEGER, + label="Artist Top Tracks search limit", + required=False, + description="Search limit while getting artist top tracks.", + value=values.get(CONF_TOP_TRACKS_LIMIT) if values else DEFAULT_TOP_TRACKS_LIMIT, + default_value=DEFAULT_TOP_TRACKS_LIMIT, + category="advanced", + ), + ) + + +def split_id(id_: str) -> tuple[int, int | None, int | None]: + """Return (artist_id, album_id, track_id). Missing parts are returned as 0.""" + parts = id_.split("-") + part_0 = int(parts[0]) + part_1 = int(parts[1]) if len(parts) > 1 else 0 + part_2 = int(parts[2]) if len(parts) > 2 else 0 + return part_0, part_1, part_2 + + +class BandcampProvider(MusicProvider): + """Bandcamp provider support.""" + + _client: BandcampAPIClient + _converters: BandcampConverters + throttler: ThrottlerManager + top_tracks_limit: int + + async def handle_async_init(self) -> None: + """Handle async init of the Bandcamp provider.""" + identity = self.config.get_value(CONF_IDENTITY) + self.top_tracks_limit = cast( + "int", self.config.get_value(CONF_TOP_TRACKS_LIMIT, DEFAULT_TOP_TRACKS_LIMIT) + ) + + # Initialize the new async API client + self._client = BandcampAPIClient(session=self.mass.http_session, identity_token=identity) + + self.throttler = ThrottlerManager(rate_limit=1, period=2) + self._converters = BandcampConverters(self.domain, self.instance_id) + + @property + def is_streaming_provider(self) -> bool: + """Return True if the provider is a streaming provider.""" + return True + + @use_cache(CACHE) + @throttle_with_retries + async def search( + self, search_query: str, media_types: list[MediaType], limit: int = 50 + ) -> SearchResults: + """Perform search on music provider.""" + results = SearchResults() + if not self._client.identity: + return results + + if not media_types: + return results + + try: + search_results = await self._client.search(search_query) + except BandcampNotFoundError as error: + raise MediaNotFoundError("No results for Bandcamp search") from error + except BandcampAPIError as error: + raise InvalidDataError("Unexpected error during Bandcamp search") from error + + for item in search_results[:limit]: + try: + if isinstance(item, SearchResultTrack) and MediaType.TRACK in media_types: + results.tracks = [*results.tracks, self._converters.track_from_search(item)] + elif isinstance(item, SearchResultAlbum) and MediaType.ALBUM in media_types: + results.albums = [*results.albums, self._converters.album_from_search(item)] + elif isinstance(item, SearchResultArtist) and MediaType.ARTIST in media_types: + results.artists = [*results.artists, self._converters.artist_from_search(item)] + except BandcampAPIError as error: + self.logger.warning("Failed to convert search result item: %s", error) + continue + + return results + + async def get_library_artists(self) -> AsyncGenerator[Artist, None]: + """Retrieve library artists from Bandcamp.""" + if not self._client.identity: # library requires identity + return + + try: + collection = await self._client.get_collection_items(CollectionType.COLLECTION) + band_ids = set() + for item in collection.items: + if item.item_type == "band": + band_ids.add(item.item_id) + elif item.item_type == "album": + band_ids.add(item.band_id) + + for band_id in band_ids: + yield await self.get_artist(band_id) + await asyncio.sleep(0) # Yield control to avoid blocking + + except BandcampMustBeLoggedInError as error: + self.logger.error("Error getting Bandcamp library artists: Wrong identity token.") + raise LoginFailed("Wrong Bandcamp identity token.") from error + except BandcampNotFoundError as error: + raise MediaNotFoundError("Bandcamp library artists returned no results") from error + except BandcampAPIError as error: + raise MediaNotFoundError("Failed to get library artists") from error + + async def get_library_albums(self) -> AsyncGenerator[Album, None]: + """Retrieve library albums from Bandcamp.""" + if not self._client.identity: # library requires identity + return + + try: + api_collection = await self._client.get_collection_items(CollectionType.COLLECTION) + for item in api_collection.items: + if item.item_type == "album": + yield await self.get_album(f"{item.band_id}-{item.item_id}") + await asyncio.sleep(0) # Yield control to avoid blocking + except BandcampMustBeLoggedInError as error: + self.logger.error("Error getting Bandcamp library albums: Wrong identity token.") + raise LoginFailed("Wrong Bandcamp identity token.") from error + except BandcampNotFoundError as error: + raise MediaNotFoundError("Bandcamp library albums returned no results") from error + except BandcampAPIError as error: + raise MediaNotFoundError("Failed to get library albums") from error + + async def get_library_tracks(self) -> AsyncGenerator[Track, None]: + """Retrieve library tracks from Bandcamp.""" + if not self._client.identity: # library requires identity + return + + try: + async for album in self.get_library_albums(): + tracks = await self.get_album_tracks(album.item_id) + for track in tracks: + yield track + await asyncio.sleep(0) # Yield control to avoid blocking + except BandcampMustBeLoggedInError as error: + self.logger.error("Error getting Bandcamp library tracks: Wrong identity token.") + raise LoginFailed("Wrong Bandcamp identity token.") from error + except BandcampNotFoundError as error: + raise MediaNotFoundError("Bandcamp library tracks returned no results") from error + except BandcampAPIError as error: + raise MediaNotFoundError("Failed to get library tracks") from error + + @use_cache(CACHE) + async def get_artist(self, prov_artist_id: str | int) -> Artist: + """Get full artist details by id.""" + try: + api_artist = await self._client.get_artist(prov_artist_id) + return self._converters.artist_from_api(api_artist) + except BandcampNotFoundError as error: + raise MediaNotFoundError( + f"Bandcamp artist {prov_artist_id} search returned no results" + ) from error + except BandcampAPIError as error: + raise MediaNotFoundError(f"Failed to get artist {prov_artist_id}") from error + + @use_cache(CACHE) + async def get_album(self, prov_album_id: str) -> Album: + """Get full album details by id.""" + artist_id, album_id, _ = split_id(prov_album_id) + try: + api_album = await self._client.get_album(artist_id, album_id) + return self._converters.album_from_api(api_album) + except BandcampNotFoundError as error: + raise MediaNotFoundError( + f"Bandcamp album {prov_album_id} search returned no results" + ) from error + except BandcampAPIError as error: + raise MediaNotFoundError(f"Failed to get album {prov_album_id}") from error + + @use_cache(CACHE) + async def get_track(self, prov_track_id: str) -> Track: + """Get full track details by id.""" + artist_id, album_id, track_id = split_id(prov_track_id) + if track_id is None: # artist_id-track_id + album_id, track_id = None, album_id + + try: + if all((artist_id, album_id, track_id)): + api_album = await self._client.get_album(artist_id, album_id) + api_track = next((_ for _ in api_album.tracks if _.id == track_id), None) + return self._converters.track_from_api( + track=api_track, + album_id=api_album.id, + album_name=api_album.title, + album_image_url=api_album.art_url, + ) + elif not album_id: + api_track = await self._client.get_track(artist_id, track_id) + return self._converters.track_from_api( + track=api_track, + album_id=api_track.album.id if api_track.album else None, + album_name=api_track.album.title if api_track.album else "", + album_image_url=api_track.album.art_url if api_track.album else "", + ) + else: + raise MediaNotFoundError(f"Track {prov_track_id} not found on Bandcamp") + except BandcampNotFoundError as error: + raise MediaNotFoundError( + f"Bandcamp track {prov_track_id} search returned no results" + ) from error + except BandcampAPIError as error: + raise MediaNotFoundError(f"Failed to get track {prov_track_id}") from error + + @use_cache(CACHE) + async def get_album_tracks(self, prov_album_id: str) -> list[Track]: + """Get all tracks in an album.""" + artist_id, album_id, _ = split_id(prov_album_id) + try: + api_album = await self._client.get_album(artist_id, album_id) + if api_album.tracks: + return [ + self._converters.track_from_api( + track=track, + album_id=album_id, + album_name=api_album.title, + album_image_url=api_album.art_url, + ) + for track in api_album.tracks + if track.streaming_url # Only include tracks with streaming URLs + ] + + return [] + + except BandcampNotFoundError as error: + raise MediaNotFoundError( + f"Bandcamp album {prov_album_id} tracks search returned no results" + ) from error + except BandcampAPIError as error: + raise MediaNotFoundError(f"Failed to get albums tracks for {prov_album_id}") from error + + @use_cache(CACHE) + async def get_artist_albums(self, prov_artist_id: str) -> list[Album]: + """Get albums by an artist.""" + albums = [] + try: + api_discography = await self._client.get_artist_discography(prov_artist_id) + for item in api_discography: + if item.get("item_type") == "album" and item.get("item_id"): + album = None + + with suppress(MediaNotFoundError): + album = await self.get_album(f"{item['band_id']}-{item['item_id']}") + + with suppress(MediaNotFoundError): + album = album or await self.get_album(f"{prov_artist_id}-{item['item_id']}") + + if album: + albums.append(album) + + except BandcampNotFoundError as error: + raise MediaNotFoundError( + f"Bandcamp artist {prov_artist_id} albums search returned no results" + ) from error + except BandcampAPIError as error: + raise MediaNotFoundError(f"Failed to get albums for artist {prov_artist_id}") from error + + return albums + + @use_cache(CACHE) + async def get_artist_toptracks(self, prov_artist_id: str) -> list[Track]: + """Get top tracks of an artist.""" + tracks: list[Track] = [] + try: + albums = await self.get_artist_albums(prov_artist_id) + albums.sort(key=lambda album: (album.year is None, album.year or 0), reverse=True) + for album in albums: + tracks.extend(await self.get_album_tracks(album.item_id)) + if len(tracks) >= self.top_tracks_limit: + break + + except BandcampNotFoundError as error: + raise MediaNotFoundError( + f"Bandcamp artist {prov_artist_id} top tracks search returned no results" + ) from error + + except BandcampAPIError as error: + raise MediaNotFoundError( + f"Failed to get toptracks for artist {prov_artist_id}" + ) from error + + return tracks[: self.top_tracks_limit] + + async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails: + """Return the content details for the given track.""" + try: + # consider _client to avoid caching if the track urls become dynamic + track_ma = await self.get_track(item_id) + if not track_ma.metadata.links: + raise MediaNotFoundError( + f"No streaming links found for track {item_id}. Please report this" + ) + + link = next(iter(track_ma.metadata.links)) + if not link: + raise MediaNotFoundError( + f"No streaming URL found for track {item_id}. Please report this" + ) + + streaming_url = link.url + if not streaming_url: + raise MediaNotFoundError( + f"No streaming URL found for track {item_id}: {streaming_url}" + ) + + return StreamDetails( + item_id=item_id, + provider=self.instance_id, + audio_format=AudioFormat(), + stream_type=StreamType.HTTP, + media_type=media_type, + path=streaming_url, + can_seek=True, + allow_seek=True, + ) + + except BandcampNotFoundError as error: + raise MediaNotFoundError( + f"Bandcamp stream details search for {media_type} {item_id} returned no results" + ) from error + except BandcampAPIError as error: + raise MediaNotFoundError( + f"Stream details not available for {media_type} {item_id}" + ) from error diff --git a/music_assistant/providers/bandcamp/converters.py b/music_assistant/providers/bandcamp/converters.py new file mode 100644 index 00000000..6cd22f85 --- /dev/null +++ b/music_assistant/providers/bandcamp/converters.py @@ -0,0 +1,301 @@ +"""Converters for Bandcamp API models to Music Assistant models.""" + +from datetime import datetime + +from bandcamp_async_api.models import BCAlbum as APIAlbum +from bandcamp_async_api.models import BCArtist as APIArtist +from bandcamp_async_api.models import BCTrack as APITrack +from bandcamp_async_api.models import ( + SearchResultAlbum, + SearchResultArtist, + SearchResultTrack, +) +from music_assistant_models.enums import ImageType, LinkType, MediaType +from music_assistant_models.media_items import Album as MAAlbum +from music_assistant_models.media_items import Artist as MAArtist +from music_assistant_models.media_items import ( + ItemMapping, + MediaItemImage, + MediaItemLink, + ProviderMapping, + UniqueList, +) +from music_assistant_models.media_items import Track as MATrack + + +class BandcampConverters: + """Converters for Bandcamp API models to Music Assistant models.""" + + def __init__(self, domain: str, instance_id: str): + """Initialize converters with provider information.""" + self.domain = domain + self.instance_id = instance_id + + def streaming_url_from_api( + self, streaming_info: dict[str, str] + ) -> tuple[str | None, int | None]: + """Parse streaming URL info.""" + # Extract streaming URL with priority: mp3-v0 > mp3-128 + bitrate = None + streaming_url = None + if "mp3-v0" in streaming_info: + streaming_url = streaming_info["mp3-v0"] + bitrate = None # VBR + elif "mp3-320" in streaming_info: + streaming_url = streaming_info["mp3-320"] + bitrate = 320 + elif "mp3-128" in streaming_info: + streaming_url = streaming_info["mp3-128"] + bitrate = 128 + elif streaming_info: + # Fallback to first available URL + streaming_url = next(iter(streaming_info.values())) + return streaming_url, bitrate + + def track_from_search(self, item: SearchResultTrack) -> MATrack: + """Create a Track from new API SearchResultTrack.""" + track_id = f"{item.artist_id}-{item.album_id or 0}-{item.id}" + return MATrack( + item_id=track_id, + provider=self.instance_id, + name=item.name, + artists=UniqueList( + [ + ItemMapping( + media_type=MediaType.ARTIST, + item_id=str(item.artist_id), + provider=self.instance_id, + name=item.artist_name, + ) + ] + ), + album=( + ItemMapping( + media_type=MediaType.ALBUM, + item_id=f"{item.artist_id}-{item.album_id or 0}", + provider=self.instance_id, + name=item.album_name, + ) + if item.album_id + else None + ), + provider_mappings={ + ProviderMapping( + item_id=track_id, + provider_domain=self.domain, + provider_instance=self.instance_id, + url=item.url, + ) + }, + ) + + def album_from_search(self, item: SearchResultAlbum) -> MAAlbum: + """Create an Album from new API SearchResultAlbum.""" + album_id = f"{item.artist_id}-{item.id}" + output = MAAlbum( + item_id=album_id, + provider=self.instance_id, + name=item.name, + uri=item.url, + artists=UniqueList( + [ + ItemMapping( + media_type=MediaType.ARTIST, + item_id=str(item.artist_id), + provider=self.instance_id, + name=item.artist_name, + uri=item.artist_url, + ) + ] + ), + provider_mappings={ + ProviderMapping( + item_id=album_id, + provider_domain=self.domain, + provider_instance=self.instance_id, + url=item.url, + ) + }, + ) + output.metadata.add_image( + MediaItemImage( + type=ImageType.LANDSCAPE, + path=item.image_url, + provider=self.instance_id, + remotely_accessible=True, + ) + ) + return output + + def artist_from_search(self, item: SearchResultArtist) -> MAArtist: + """Create an Artist from new API SearchResultArtist.""" + output = MAArtist( + item_id=str(item.id), + provider=self.instance_id, + name=item.name, + uri=item.url, + provider_mappings={ + ProviderMapping( + item_id=str(item.id), + provider_domain=self.domain, + provider_instance=self.instance_id, + url=item.url, + ) + }, + ) + output.metadata.genres = item.tags + if item.url: + output.metadata.description = item.url + output.metadata.add_image( + MediaItemImage( + type=ImageType.LANDSCAPE, + path=item.image_url, + provider=self.instance_id, + remotely_accessible=True, + ) + ) + return output + + def track_from_api( + self, + track: APITrack, + album_id: str | int | None = None, + album_name: str = "", + album_image_url: str = "", + ) -> MATrack: + """Convert a Track object from the API to MA Track format.""" + album_id = album_id or 0 + output = MATrack( + item_id=f"{track.artist.id}-{album_id}-{track.id}", + provider=self.instance_id, + name=track.title, + artists=UniqueList( + [ + ItemMapping( + media_type=MediaType.ARTIST, + item_id=str(track.artist.id), + provider=self.instance_id, + name=track.artist.name, + ) + ] + ), + disc_number=0, + duration=track.duration, + provider_mappings={ + ProviderMapping( + item_id=f"{track.artist.id}-{album_id}-{track.id}", + provider_domain=self.domain, + provider_instance=self.instance_id, + url=track.url, + ) + }, + ) + if output.track_number is not None: + output.track_number = track.track_number + + if album_id: + output.album = ItemMapping( + media_type=MediaType.ALBUM, + item_id=f"{track.artist.id}-{album_id}", + provider=self.instance_id, + name=album_name, + ) + elif hasattr(track, "album") and track.album: + # If the track has an album attribute, use that information + output.album = ItemMapping( + media_type=MediaType.ALBUM, + item_id=f"{track.artist.id}-{track.album.id}", + provider=self.instance_id, + name=track.album.title, + ) + + streaming_url, _ = self.streaming_url_from_api(track.streaming_url) + if streaming_url: + output.metadata.links = { + MediaItemLink( + type=LinkType.UNKNOWN, + url=streaming_url, + ) + } + output.metadata.lyrics = track.lyrics + if album_image_url: + output.metadata.add_image( + MediaItemImage( + type=ImageType.LANDSCAPE, + path=album_image_url, + provider=self.instance_id, + remotely_accessible=True, + ) + ) + return output + + def artist_from_api(self, artist: APIArtist) -> MAArtist: + """Convert an API Artist object to MA Artist format.""" + output = MAArtist( + item_id=str(artist.id), + uri=artist.url, + provider=self.instance_id, + name=artist.name, + provider_mappings={ + ProviderMapping( + item_id=str(artist.id), + provider_domain=self.domain, + provider_instance=self.instance_id, + url=artist.url, + ) + }, + ) + output.metadata.description = f"{artist.url}\n{artist.bio or ''}".strip() + output.metadata.add_image( + MediaItemImage( + type=ImageType.LANDSCAPE, + path=artist.image_url, + provider=self.instance_id, + remotely_accessible=True, + ) + ) + return output + + def album_from_api(self, album: APIAlbum) -> MAAlbum: + """Convert an API Album object to MA Album format.""" + album_id = f"{album.artist.id}-{album.id}" + output = MAAlbum( + item_id=album_id, + provider=self.instance_id, + name=album.title, + artists=UniqueList( + [ + ItemMapping( + media_type=MediaType.ARTIST, + item_id=str(album.artist.id), + provider=self.instance_id, + name=album.artist.name, + image=MediaItemImage( + path=album.art_url, + type=ImageType.LANDSCAPE, + provider=self.instance_id, + remotely_accessible=True, + ), + ) + ] + ), + provider_mappings={ + ProviderMapping( + item_id=album_id, + provider_domain=self.domain, + provider_instance=self.instance_id, + url=album.url, + ) + }, + year=datetime.fromtimestamp(album.release_date).year, + ) + output.metadata.add_image( + MediaItemImage( + type=ImageType.LANDSCAPE, + path=album.art_url, + provider=self.instance_id, + remotely_accessible=True, + ) + ) + output.metadata.description = f"{album.url}\n{album.about or ''}".strip() + return output diff --git a/music_assistant/providers/bandcamp/icon.svg b/music_assistant/providers/bandcamp/icon.svg new file mode 100644 index 00000000..d54a62b0 --- /dev/null +++ b/music_assistant/providers/bandcamp/icon.svg @@ -0,0 +1 @@ + diff --git a/music_assistant/providers/bandcamp/icon_monochrome.svg b/music_assistant/providers/bandcamp/icon_monochrome.svg new file mode 100644 index 00000000..09cf5741 --- /dev/null +++ b/music_assistant/providers/bandcamp/icon_monochrome.svg @@ -0,0 +1 @@ + diff --git a/music_assistant/providers/bandcamp/manifest.json b/music_assistant/providers/bandcamp/manifest.json new file mode 100644 index 00000000..422676dc --- /dev/null +++ b/music_assistant/providers/bandcamp/manifest.json @@ -0,0 +1,11 @@ +{ + "type": "music", + "domain": "bandcamp", + "stage": "beta", + "name": "Bandcamp", + "description": "Stream music from Bandcamp's catalog.", + "codeowners": ["@ALERTua"], + "requirements": ["bandcamp-async-api==0.0.4"], + "documentation": "https://music-assistant.io/music-providers/bandcamp/", + "multi_instance": true +} diff --git a/requirements_all.txt b/requirements_all.txt index 9b9d6d68..929a7da5 100644 --- a/requirements_all.txt +++ b/requirements_all.txt @@ -22,6 +22,7 @@ audible==0.10.0 auntie-sounds==1.1.7 av==16.1.0 awesomeversion>=24.6.0 +bandcamp-async-api==0.0.4 bidict==0.23.1 certifi==2025.11.12 chardet>=5.2.0 diff --git a/tests/providers/bandcamp/__init__.py b/tests/providers/bandcamp/__init__.py new file mode 100644 index 00000000..90189a78 --- /dev/null +++ b/tests/providers/bandcamp/__init__.py @@ -0,0 +1 @@ +"""Tests for the Bandcamp provider.""" diff --git a/tests/providers/bandcamp/fixtures/albums/album.json b/tests/providers/bandcamp/fixtures/albums/album.json new file mode 100644 index 00000000..76943739 --- /dev/null +++ b/tests/providers/bandcamp/fixtures/albums/album.json @@ -0,0 +1,31 @@ +{ + "id": 456, + "title": "Test Album", + "artist": { + "id": 123, + "name": "Test Artist", + "url": "https://test.bandcamp.com" + }, + "url": "https://test.bandcamp.com/album/test-album", + "art_url": "https://f4.bcbits.com/img/a1234567890_16.jpg", + "release_date": 1609459200, + "about": "Test album description", + "tracks": [ + { + "id": 789, + "title": "Test Track", + "artist": { + "id": 123, + "name": "Test Artist", + "url": "https://test.bandcamp.com" + }, + "url": "https://test.bandcamp.com/track/test-track", + "duration": 300, + "track_number": 1, + "lyrics": "Test lyrics", + "streaming_url": { + "mp3-320": "https://example.com/track.mp3" + } + } + ] +} diff --git a/tests/providers/bandcamp/fixtures/artists/artist.json b/tests/providers/bandcamp/fixtures/artists/artist.json new file mode 100644 index 00000000..f65b84d4 --- /dev/null +++ b/tests/providers/bandcamp/fixtures/artists/artist.json @@ -0,0 +1,8 @@ +{ + "id": 123, + "name": "Test Artist", + "url": "https://test.bandcamp.com", + "image_url": "https://f4.bcbits.com/img/a1234567890_16.jpg", + "bio": "Test artist bio", + "location": "Test City, Test Country" +} diff --git a/tests/providers/bandcamp/fixtures/tracks/track.json b/tests/providers/bandcamp/fixtures/tracks/track.json new file mode 100644 index 00000000..b410b09e --- /dev/null +++ b/tests/providers/bandcamp/fixtures/tracks/track.json @@ -0,0 +1,31 @@ +{ + "id": 789, + "title": "Test Track", + "artist": { + "id": 123, + "name": "Test Artist", + "url": "https://test.bandcamp.com" + }, + "album": { + "id": 456, + "title": "Test Album", + "artist": { + "id": 123, + "name": "Test Artist", + "url": "https://test.bandcamp.com" + }, + "url": "https://test.bandcamp.com/album/test-album", + "art_url": "https://f4.bcbits.com/img/a1234567890_16.jpg", + "release_date": 1609459200, + "about": "Test album description" + }, + "url": "https://test.bandcamp.com/track/test-track", + "duration": 300, + "track_number": 1, + "lyrics": "Test lyrics", + "streaming_url": { + "mp3-320": "https://example.com/track.mp3", + "mp3-128": "https://example.com/track-128.mp3", + "mp3-v0": "https://example.com/track-v0.mp3" + } +} diff --git a/tests/providers/bandcamp/test_converters.py b/tests/providers/bandcamp/test_converters.py new file mode 100644 index 00000000..e5202281 --- /dev/null +++ b/tests/providers/bandcamp/test_converters.py @@ -0,0 +1,195 @@ +"""Test Bandcamp converters.""" + +from unittest.mock import Mock + +import pytest +from bandcamp_async_api.models import BCAlbum, BCArtist, BCTrack + +from music_assistant.providers.bandcamp.converters import BandcampConverters + + +@pytest.fixture +def converters() -> BandcampConverters: + """Return a BandcampConverters instance.""" + return BandcampConverters("bandcamp", "bandcamp_test") + + +def test_track_from_search(converters: BandcampConverters) -> None: + """Test converting SearchResultTrack to MA Track.""" + # Create a mock SearchResultTrack + search_result = Mock() + search_result.artist_id = 123 + search_result.album_id = 456 + search_result.id = 789 + search_result.name = "Test Track" + search_result.artist_name = "Test Artist" + search_result.album_name = "Test Album" + search_result.url = "https://test.bandcamp.com/track/test-track" + + result = converters.track_from_search(search_result) + + assert result.item_id == "123-456-789" + assert result.name == "Test Track" + assert result.provider == "bandcamp_test" + + +def test_album_from_search(converters: BandcampConverters) -> None: + """Test converting SearchResultAlbum to MA Album.""" + # Create a mock SearchResultAlbum + search_result = Mock() + search_result.artist_id = 123 + search_result.id = 456 + search_result.name = "Test Album" + search_result.artist_name = "Test Artist" + search_result.image_url = "https://f4.bcbits.com/img/a1234567890_16.jpg" + search_result.url = "https://test.bandcamp.com/album/test-album" + search_result.artist_url = "https://test.bandcamp.com" + + result = converters.album_from_search(search_result) + + assert result.item_id == "123-456" + assert result.name == "Test Album" + assert result.provider == "bandcamp_test" + + +def test_artist_from_search(converters: BandcampConverters) -> None: + """Test converting SearchResultArtist to MA Artist.""" + # Create a mock SearchResultArtist + search_result = Mock() + search_result.id = 123 + search_result.name = "Test Artist" + search_result.url = "https://test.bandcamp.com" + search_result.image_url = "https://f4.bcbits.com/img/a1234567890_16.jpg" + search_result.tags = ["rock", "indie"] + + result = converters.artist_from_search(search_result) + + assert result.item_id == "123" + assert result.name == "Test Artist" + assert result.provider == "bandcamp_test" + + +def test_track_from_api(converters: BandcampConverters) -> None: + """Test converting API Track to MA Track.""" + # Create mock API models + mock_artist = Mock() + mock_artist.id = 123 + mock_artist.name = "Test Artist" + mock_artist.url = "https://test.bandcamp.com" + + mock_track = Mock() + mock_track.id = 789 + mock_track.title = "Test Track" + mock_track.artist = mock_artist + mock_track.url = "https://test.bandcamp.com/track/test-track" + mock_track.duration = 300 + mock_track.lyrics = "Test lyrics" + mock_track.track_number = 1 + mock_track.streaming_url = {"mp3-320": "https://example.com/track.mp3"} + + result = converters.track_from_api( + track=mock_track, + album_id=456, + album_name="Test Album", + album_image_url="https://f4.bcbits.com/img/a1234567890_16.jpg", + ) + + assert result.item_id == "123-456-789" + assert result.name == "Test Track" + assert result.provider == "bandcamp_test" + + +def test_artist_from_api(converters: BandcampConverters) -> None: + """Test converting API Artist to MA Artist.""" + # Create mock API artist + mock_artist = Mock() + mock_artist.id = 123 + mock_artist.name = "Test Artist" + mock_artist.url = "https://test.bandcamp.com" + mock_artist.image_url = "https://f4.bcbits.com/img/a1234567890_16.jpg" + mock_artist.bio = "Test bio" + + result = converters.artist_from_api(mock_artist) + + assert result.item_id == "123" + assert result.name == "Test Artist" + assert result.provider == "bandcamp_test" + + +def test_album_from_api(converters: BandcampConverters) -> None: + """Test converting API Album to MA Album.""" + # Create mock API models + mock_artist = Mock() + mock_artist.id = 123 + mock_artist.name = "Test Artist" + mock_artist.url = "https://test.bandcamp.com" + + mock_album = Mock() + mock_album.id = 456 + mock_album.title = "Test Album" + mock_album.artist = mock_artist + mock_album.url = "https://test.bandcamp.com/album/test-album" + mock_album.art_url = "https://f4.bcbits.com/img/a1234567890_16.jpg" + mock_album.release_date = 1609459200 + mock_album.about = "Test album description" + + result = converters.album_from_api(mock_album) + + assert result.item_id == "123-456" + assert result.name == "Test Album" + assert result.provider == "bandcamp_test" + + +def test_track_from_api_without_album_info(converters: BandcampConverters) -> None: + """Test converting API Track without album info.""" + # Create mock API models + mock_artist = BCArtist(id=123, name="Test Artist", url="https://test.bandcamp.com") + mock_track = BCTrack( + id=789, + title="Test Track", + artist=mock_artist, + url="https://test.bandcamp.com/track/test-track", + duration=300, + lyrics="Test lyrics", + track_number=1, + streaming_url={"mp3-320": "https://example.com/track.mp3"}, + ) + + result = converters.track_from_api(track=mock_track) + + assert result.item_id == "123-0-789" + assert result.album is None + assert result.metadata.lyrics == "Test lyrics" + + +def test_track_from_api_with_album(converters: BandcampConverters) -> None: + """Test converting API Track with album information.""" + # Create mock API models + mock_artist = BCArtist(id=123, name="Test Artist", url="https://test.bandcamp.com") + mock_album = BCAlbum( + id=456, + title="Test Album", + artist=mock_artist, + url="https://test.bandcamp.com/album/test-album", + art_url="https://f4.bcbits.com/img/a1234567890_16.jpg", + release_date=1609459200, + about="Test album description", + ) + mock_track = BCTrack( + id=789, + title="Test Track", + artist=mock_artist, + album=mock_album, + url="https://test.bandcamp.com/track/test-track", + duration=300, + lyrics="Test lyrics", + track_number=1, + streaming_url={"mp3-320": "https://example.com/track.mp3"}, + ) + + result = converters.track_from_api(track=mock_track) + + assert result.item_id == "123-0-789" + assert result.album is not None + assert result.album.item_id == "123-456" + assert result.album.name == "Test Album" diff --git a/tests/providers/bandcamp/test_init.py b/tests/providers/bandcamp/test_init.py new file mode 100644 index 00000000..17805cdb --- /dev/null +++ b/tests/providers/bandcamp/test_init.py @@ -0,0 +1,225 @@ +"""Integration tests for the Bandcamp provider.""" + +from collections.abc import AsyncGenerator +from unittest import mock + +import pytest +from music_assistant_models.config_entries import ProviderConfig +from music_assistant_models.enums import MediaType, StreamType + +from music_assistant.mass import MusicAssistant +from tests.common import wait_for_sync_completion + + +@pytest.fixture +async def bandcamp_provider(mass: MusicAssistant) -> AsyncGenerator[ProviderConfig, None]: + """Configure a Bandcamp test fixture, and add a provider to mass that uses it.""" + # Mock the BandcampAPIClient to avoid real API calls + with mock.patch("music_assistant.providers.bandcamp.BandcampAPIClient") as mock_client_class: + mock_client = mock.AsyncMock() + mock_client_class.return_value = mock_client + + # Configure mock client for collection access + mock_collection = mock.AsyncMock() + mock_collection.items = [] + + # Mock collection items for library tests + mock_item_artist = mock.AsyncMock() + mock_item_artist.item_type = "band" + mock_item_artist.item_id = 123 + mock_item_artist.band_name = "Test Artist" + mock_item_artist.item_url = "https://test.bandcamp.com" + + mock_item_album = mock.AsyncMock() + mock_item_album.item_type = "album" + mock_item_album.band_id = 123 + mock_item_album.item_id = 456 + mock_item_album.item_title = "Test Album" + mock_item_album.item_url = "https://test.bandcamp.com/album/test-album" + + mock_collection.items = [mock_item_artist, mock_item_album] + mock_client.get_collection_items.return_value = mock_collection + + # Mock artist and album data + mock_artist = mock.AsyncMock() + mock_artist.id = 123 + mock_artist.name = "Test Artist" + mock_artist.url = "https://test.bandcamp.com" + mock_client.get_artist.return_value = mock_artist + + mock_album = mock.AsyncMock() + mock_album.id = 456 + mock_album.title = "Test Album" + mock_album.artist = mock_artist + mock_album.url = "https://test.bandcamp.com/album/test-album" + mock_album.art_url = "https://f4.bcbits.com/img/a1234567890_16.jpg" + mock_album.release_date = 1609459200 + mock_album.about = "Test album description" + + mock_track = mock.AsyncMock() + mock_track.id = 789 + mock_track.title = "Test Track" + mock_track.artist = mock_artist + mock_track.url = "https://test.bandcamp.com/track/test-track" + mock_track.duration = 300 + mock_track.streaming_url = {"mp3-320": "https://example.com/track.mp3"} + mock_track.track_number = 1 + mock_track.lyrics = "Test lyrics" + + # Configure the streaming_url to behave like a dictionary + mock_track.configure_mock(streaming_url={"mp3-320": "https://example.com/track.mp3"}) + + mock_album.tracks = [mock_track] + mock_client.get_album.return_value = mock_album + mock_client.get_track.return_value = mock_track + + async with wait_for_sync_completion(mass): + config = await mass.config.save_provider_config( + "bandcamp", + { + "identity": "mock_identity_token", + "search_limit": 10, + "top_tracks_limit": 50, + }, + ) + await mass.music.start_sync() + + yield config + + +@pytest.mark.usefixtures("bandcamp_provider") +async def test_initial_sync(mass: MusicAssistant) -> None: + """Test that initial sync worked.""" + # Test library access (requires identity token) + all_artists = await mass.music.artists.library_items() + artists = [artist for artist in all_artists if artist.provider == "bandcamp"] + + assert len(artists) >= 0 # May be empty if no collection items + + all_albums = await mass.music.albums.library_items() + albums = [album for album in all_albums if album.provider == "bandcamp"] + + assert len(albums) >= 0 # May be empty if no collection items + + +@pytest.mark.usefixtures("bandcamp_provider") +async def test_search_functionality(mass: MusicAssistant) -> None: + """Test search functionality.""" + # Mock search results + with mock.patch("music_assistant.providers.bandcamp.BandcampAPIClient") as mock_client_class: + mock_client = mock.AsyncMock() + mock_client_class.return_value = mock_client + + # Mock search results + mock_search_result_track = mock.AsyncMock() + mock_search_result_track.__class__.__name__ = "SearchResultTrack" + mock_search_result_track.artist_id = 123 + mock_search_result_track.album_id = 456 + mock_search_result_track.id = 789 + mock_search_result_track.name = "Search Test Track" + mock_search_result_track.artist_name = "Search Test Artist" + mock_search_result_track.album_name = "Search Test Album" + mock_search_result_track.url = "https://test.bandcamp.com/track/search-test" + + mock_client.search.return_value = [mock_search_result_track] + + # Perform search + results = await mass.music.search("test query", [MediaType.TRACK], limit=5) + + # Filter for bandcamp results + bandcamp_tracks = [track for track in results.tracks if track.provider == "bandcamp"] + assert len(bandcamp_tracks) >= 0 # May be empty if search is mocked differently + + +@pytest.mark.usefixtures("bandcamp_provider") +async def test_get_artist_details(mass: MusicAssistant) -> None: + """Test getting artist details.""" + # Get the bandcamp provider instance + bandcamp_provider = None + for provider in mass.music.providers: + if provider.domain == "bandcamp": + bandcamp_provider = provider + break + + assert bandcamp_provider is not None + + # Test artist retrieval + artist = await bandcamp_provider.get_artist("123") + assert artist is not None + assert artist.name == "Test Artist" + assert artist.provider == bandcamp_provider.instance_id + + +@pytest.mark.usefixtures("bandcamp_provider") +async def test_get_album_details(mass: MusicAssistant) -> None: + """Test getting album details.""" + # Get the bandcamp provider instance + bandcamp_provider = None + for provider in mass.music.providers: + if provider.domain == "bandcamp": + bandcamp_provider = provider + break + + assert bandcamp_provider is not None + + # Test album retrieval + album = await bandcamp_provider.get_album("123-456") + assert album is not None + assert album.name == "Test Album" + assert album.provider == bandcamp_provider.instance_id + + +@pytest.mark.usefixtures("bandcamp_provider") +async def test_get_track_details(mass: MusicAssistant) -> None: + """Test getting track details.""" + # Get the bandcamp provider instance + bandcamp_provider = None + for provider in mass.music.providers: + if provider.domain == "bandcamp": + bandcamp_provider = provider + break + + assert bandcamp_provider is not None + + # Test track retrieval + track = await bandcamp_provider.get_track("123-456-789") + assert track is not None + assert track.name == "Test Track" + assert track.provider == bandcamp_provider.instance_id + + +@pytest.mark.usefixtures("bandcamp_provider") +async def test_get_album_tracks(mass: MusicAssistant) -> None: + """Test getting album tracks.""" + # Get the bandcamp provider instance + bandcamp_provider = None + for provider in mass.music.providers: + if provider.domain == "bandcamp": + bandcamp_provider = provider + break + + assert bandcamp_provider is not None + + # Test album tracks retrieval + tracks = await bandcamp_provider.get_album_tracks("123-456") + assert len(tracks) == 1 + assert tracks[0].name == "Test Track" + + +@pytest.mark.usefixtures("bandcamp_provider") +async def test_stream_details(mass: MusicAssistant) -> None: + """Test stream details retrieval.""" + # Get the bandcamp provider instance + bandcamp_provider = None + for provider in mass.music.providers: + if provider.domain == "bandcamp": + bandcamp_provider = provider + break + + assert bandcamp_provider is not None + + # Test stream details retrieval + stream_details = await bandcamp_provider.get_stream_details("123-456-789", MediaType.TRACK) + assert stream_details is not None + assert stream_details.stream_type == StreamType.HTTP + assert stream_details.path == "https://example.com/track.mp3" diff --git a/tests/providers/bandcamp/test_provider.py b/tests/providers/bandcamp/test_provider.py new file mode 100644 index 00000000..5295613e --- /dev/null +++ b/tests/providers/bandcamp/test_provider.py @@ -0,0 +1,410 @@ +"""Test Bandcamp Provider integration.""" + +from unittest.mock import AsyncMock, Mock, patch + +import pytest +from bandcamp_async_api import BandcampAPIError, BandcampNotFoundError +from music_assistant_models.enums import MediaType, StreamType +from music_assistant_models.errors import InvalidDataError, MediaNotFoundError +from music_assistant_models.streamdetails import StreamDetails + +from music_assistant.providers.bandcamp import DEFAULT_TOP_TRACKS_LIMIT, BandcampProvider + + +@pytest.fixture +def mass_mock() -> Mock: + """Return a mock MusicAssistant instance.""" + mass = Mock() + mass.http_session = AsyncMock() + mass.metadata.locale = "en_US" + mass.cache.get = AsyncMock(return_value=None) + mass.cache.set = AsyncMock() + mass.cache.delete = AsyncMock() + return mass + + +@pytest.fixture +def manifest_mock() -> Mock: + """Return a mock provider manifest.""" + manifest = Mock() + manifest.domain = "bandcamp" + return manifest + + +@pytest.fixture +def config_mock() -> Mock: + """Return a mock provider config.""" + config = Mock() + config.name = "Bandcamp Test" + config.instance_id = "bandcamp_test" + config.enabled = True + config.get_value.side_effect = lambda key, default=None: { + "identity": "mock_identity_token", + "search_limit": 10, + "top_tracks_limit": 50, + "log_level": "INFO", + }.get( + key, + default + if default is not None + else (10 if key == "search_limit" else (50 if key == "top_tracks_limit" else "INFO")), + ) + return config + + +@pytest.fixture +async def provider(mass_mock: Mock, manifest_mock: Mock, config_mock: Mock) -> BandcampProvider: + """Return a BandcampProvider instance.""" + provider = BandcampProvider(mass_mock, manifest_mock, config_mock) + + # Initialize the provider + with patch("music_assistant.providers.bandcamp.BandcampAPIClient") as mock_client_class: + mock_client = Mock() + mock_client_class.return_value = mock_client + await provider.handle_async_init() + + return provider + + +async def test_provider_initialization( + mass_mock: Mock, manifest_mock: Mock, config_mock: Mock +) -> None: + """Test provider initialization.""" + provider = BandcampProvider(mass_mock, manifest_mock, config_mock) + + assert provider.domain == "bandcamp" + assert provider.instance_id == "bandcamp_test" + + # Test that initialization sets the correct values + with patch("music_assistant.providers.bandcamp.BandcampAPIClient") as mock_client_class: + mock_client = Mock() + mock_client_class.return_value = mock_client + + await provider.handle_async_init() + + assert provider.top_tracks_limit == DEFAULT_TOP_TRACKS_LIMIT + + +async def test_handle_async_init_with_identity(provider: BandcampProvider) -> None: + """Test successful async initialization with identity token.""" + with patch("music_assistant.providers.bandcamp.BandcampAPIClient") as mock_client_class: + mock_client = Mock() + mock_client_class.return_value = mock_client + + await provider.handle_async_init() + + mock_client_class.assert_called_once_with( + session=provider.mass.http_session, identity_token="mock_identity_token" + ) + assert provider._client == mock_client + assert provider._converters is not None + + +async def test_handle_async_init_without_identity(mass_mock: Mock, manifest_mock: Mock) -> None: + """Test async initialization without identity token.""" + config = Mock() + config.get_value.side_effect = ( + lambda key, default=None: default + if default is not None + else ("INFO" if key == "log_level" else None) + ) + provider = BandcampProvider(mass_mock, manifest_mock, config) + + with patch("music_assistant.providers.bandcamp.BandcampAPIClient") as mock_client_class: + mock_client = Mock() + mock_client_class.return_value = mock_client + + await provider.handle_async_init() + + mock_client_class.assert_called_once_with( + session=provider.mass.http_session, identity_token=None + ) + + +async def test_is_streaming_provider(provider: BandcampProvider) -> None: + """Test that Bandcamp is not a streaming provider.""" + assert provider.is_streaming_provider is True + + +async def test_search_with_identity(provider: BandcampProvider) -> None: + """Test search functionality with identity token.""" + + # Create mock objects with proper class names + class MockSearchResultTrack: + def __init__(self) -> None: + self.__class__.__name__ = "SearchResultTrack" + + class MockSearchResultAlbum: + def __init__(self) -> None: + self.__class__.__name__ = "SearchResultAlbum" + + class MockSearchResultArtist: + def __init__(self) -> None: + self.__class__.__name__ = "SearchResultArtist" + + mock_search_results = [ + MockSearchResultTrack(), + MockSearchResultAlbum(), + MockSearchResultArtist(), + ] + + with ( + patch.object(provider._client, "search", new_callable=AsyncMock) as mock_search, + patch.object(provider._converters, "track_from_search") as mock_track_converter, + patch.object(provider._converters, "album_from_search") as mock_album_converter, + patch.object(provider._converters, "artist_from_search") as mock_artist_converter, + ): + mock_search.return_value = mock_search_results + + mock_track_converter.return_value = Mock() + mock_album_converter.return_value = Mock() + mock_artist_converter.return_value = Mock() + + results = await provider.search( + "test query", [MediaType.TRACK, MediaType.ALBUM, MediaType.ARTIST], limit=5 + ) + + mock_search.assert_called_once_with("test query") + assert results.tracks is not None + assert results.albums is not None + assert results.artists is not None + + +async def test_search_without_identity(provider: BandcampProvider) -> None: + """Test search returns empty results without identity token.""" + provider._client.identity = None + + results = await provider.search("test query", [MediaType.TRACK]) + + assert len(results.tracks) == 0 + assert len(results.albums) == 0 + assert len(results.artists) == 0 + + +async def test_search_api_error(provider: BandcampProvider) -> None: + """Test search handles API errors gracefully.""" + with ( + patch.object(provider._client, "search", side_effect=BandcampAPIError("API Error")), + pytest.raises(InvalidDataError, match="Unexpected error during Bandcamp search"), + ): + await provider.search("test query", [MediaType.TRACK]) + + +async def test_get_artist_success(provider: BandcampProvider) -> None: + """Test successful artist retrieval.""" + mock_artist = Mock() + + with ( + patch.object(provider._client, "get_artist", new_callable=AsyncMock) as mock_get_artist, + patch.object(provider._converters, "artist_from_api") as mock_converter, + ): + mock_get_artist.return_value = mock_artist + mock_converter.return_value = Mock() + + result = await provider.get_artist("123") + + mock_get_artist.assert_called_once_with("123") + mock_converter.assert_called_once_with(mock_artist) + assert result is not None + + +async def test_get_artist_not_found(provider: BandcampProvider) -> None: + """Test artist retrieval when not found.""" + with ( + patch.object( + provider._client, "get_artist", side_effect=BandcampNotFoundError("Not found") + ), + pytest.raises(MediaNotFoundError, match=r"Bandcamp artist 123 search returned no results"), + ): + await provider.get_artist("123") + + +async def test_get_album_success(provider: BandcampProvider) -> None: + """Test successful album retrieval.""" + mock_album = Mock() + + with ( + patch.object(provider._client, "get_album", new_callable=AsyncMock) as mock_get_album, + patch.object(provider._converters, "album_from_api") as mock_converter, + ): + mock_get_album.return_value = mock_album + mock_converter.return_value = Mock() + + result = await provider.get_album("123-456") + + mock_get_album.assert_called_once_with(123, 456) + assert result is not None + + +async def test_get_track_success(provider: BandcampProvider) -> None: + """Test successful track retrieval.""" + mock_album = Mock() + mock_track = Mock() + mock_album.tracks = [mock_track] + mock_track.id = 789 + + with ( + patch.object(provider._client, "get_album", new_callable=AsyncMock) as mock_get_album, + patch.object(provider._converters, "track_from_api") as mock_converter, + ): + mock_get_album.return_value = mock_album + mock_converter.return_value = Mock() + + result = await provider.get_track("123-456-789") + + mock_get_album.assert_called_once_with(123, 456) + assert result is not None + + +async def test_get_track_not_found(provider: BandcampProvider) -> None: + """Test track retrieval when not found.""" + with ( + patch.object(provider._client, "get_album", side_effect=BandcampNotFoundError("Not found")), + pytest.raises( + MediaNotFoundError, match=r"Bandcamp track 123-456-789 search returned no results" + ), + ): + await provider.get_track("123-456-789") + + +async def test_get_album_tracks_success(provider: BandcampProvider) -> None: + """Test successful album tracks retrieval.""" + mock_album = Mock() + mock_track = Mock() + mock_track.streaming_url = {"mp3-128": "http://example.com/track.mp3"} + mock_album.tracks = [mock_track] + mock_album.title = "Test Album" + mock_album.art_url = "http://example.com/art.jpg" + + with ( + patch.object(provider._client, "get_album", new_callable=AsyncMock) as mock_get_album, + patch.object(provider._converters, "track_from_api") as mock_converter, + ): + mock_get_album.return_value = mock_album + mock_converter.return_value = Mock() + + result = await provider.get_album_tracks("123-456") + + assert len(result) == 1 + mock_converter.assert_called_once() + + +async def test_get_artist_albums_success(provider: BandcampProvider) -> None: + """Test successful artist albums retrieval.""" + mock_discography = [{"item_type": "album", "band_id": 123, "item_id": 456}] + + with ( + patch.object( + provider._client, "get_artist_discography", new_callable=AsyncMock + ) as mock_get_discography, + patch.object(provider, "get_album", new_callable=AsyncMock) as mock_get_album, + ): + mock_get_discography.return_value = mock_discography + mock_get_album.return_value = Mock() + + result = await provider.get_artist_albums("123") + + mock_get_discography.assert_called_once_with("123") + assert len(result) == 1 + + +async def test_get_stream_details_success(provider: BandcampProvider) -> None: + """Test successful stream details retrieval.""" + # Create mock album and track with proper attributes + mock_artist = Mock() + mock_artist.id = 123 + mock_artist.name = "Test Artist" + + mock_track = Mock() + mock_track.id = 789 + mock_track.artist = mock_artist + mock_track.title = "Test Track" + mock_track.duration = 180 + mock_track.track_number = 1 + mock_track.streaming_url = {"mp3-320": "http://example.com/track.mp3"} + mock_track.url = "http://example.com/track" + mock_track.lyrics = None + + mock_album = Mock() + mock_album.id = 456 + mock_album.title = "Test Album" + mock_album.art_url = "http://example.com/art.jpg" + mock_album.artist = mock_artist + mock_album.tracks = [mock_track] + + with ( + patch.object(provider._client, "get_album", new_callable=AsyncMock) as mock_get_album, + patch.object(provider._converters, "track_from_api") as mock_converter, + ): + mock_get_album.return_value = mock_album + + # Create a mock track with metadata.links containing the streaming URL + mock_ma_track = Mock() + mock_link = Mock() + mock_link.url = "http://example.com/track.mp3" + mock_ma_track.metadata.links = {mock_link} + mock_converter.return_value = mock_ma_track + + result = await provider.get_stream_details("123-456-789", MediaType.TRACK) + + assert isinstance(result, StreamDetails) + assert result.stream_type == StreamType.HTTP + assert result.path == "http://example.com/track.mp3" + + +async def test_get_stream_details_no_streaming_url(provider: BandcampProvider) -> None: + """Test stream details when no streaming URL is available.""" + # Mock the get_track method directly to return a track with no streaming URLs + mock_track = Mock() + mock_track.metadata.links = [] # Empty links list means no streaming URL + + with patch.object(provider, "get_track", new_callable=AsyncMock) as mock_get_track: + mock_get_track.return_value = mock_track + + with pytest.raises( + MediaNotFoundError, + match=r"No streaming links found for track 123-456-789. Please report this", + ): + await provider.get_stream_details("123-456-789", MediaType.TRACK) + + +async def test_get_artist_toptracks_success(provider: BandcampProvider) -> None: + """Test successful artist top tracks retrieval.""" + mock_album = Mock() + mock_track = Mock() + + with ( + patch.object(provider, "get_artist_albums", new_callable=AsyncMock) as mock_get_albums, + patch.object(provider, "get_album_tracks", new_callable=AsyncMock) as mock_get_tracks, + ): + mock_get_albums.return_value = [mock_album] + mock_get_tracks.return_value = [mock_track] + + result = await provider.get_artist_toptracks("123") + + assert len(result) == 1 + mock_get_albums.assert_called_once_with("123") + + +async def test_get_library_artists_success(provider: BandcampProvider) -> None: + """Test successful library artists retrieval.""" + # Test that the method exists and doesn't raise an exception + # This is a complex async generator method, so we just test it can be called + assert hasattr(provider, "get_library_artists") + assert callable(provider.get_library_artists) + + +async def test_get_library_albums_success(provider: BandcampProvider) -> None: + """Test successful library albums retrieval.""" + # Test that the method exists and doesn't raise an exception + # This is a complex async generator method, so we just test it can be called + assert hasattr(provider, "get_library_albums") + assert callable(provider.get_library_albums) + + +async def test_get_library_tracks_success(provider: BandcampProvider) -> None: + """Test successful library tracks retrieval.""" + # Test that the method exists and doesn't raise an exception + # This is a complex async generator method, so we just test it can be called + assert hasattr(provider, "get_library_tracks") + assert callable(provider.get_library_tracks)