From d93e4df5685dc5b56f87553604c94a886dcd6849 Mon Sep 17 00:00:00 2001 From: David Bishop Date: Wed, 25 Feb 2026 23:58:26 -0800 Subject: [PATCH] Set audio format on Bandcamp track ProviderMapping (#3213) * Set audio format and fetch fresh streaming URLs for Bandcamp Extract _fetch_api_track from get_track so get_stream_details can bypass the 30-day cache and always retrieve a fresh streaming URL. Set AudioFormat (content_type, bit_rate) on both StreamDetails and ProviderMapping instead of stashing the URL in metadata.links. Also: guard split_id against malformed IDs, handle BandcampMustBeLoggedInError explicitly, and fix album_from_api for albums with no release_date. Tests cover _fetch_api_track paths, streaming URL priority/format selection, library async generators, split_id validation, and error mapping. Co-Authored-By: Claude Opus 4.6 * Fix Bandcamp provider for unauthenticated use - Remove early return in search() that required identity token; the Bandcamp search API works without authentication. - Handle non-integer media IDs in genre lookup gracefully; provider- only items (e.g. Bandcamp compound IDs) can't have genre mappings so return an empty list instead of crashing with ValueError. - Update comment to reflect that unauthenticated mode supports both search and streaming, not just search. Co-Authored-By: Claude Opus 4.6 --------- Co-authored-by: David Bishop Co-authored-by: Claude Opus 4.6 --- music_assistant/controllers/media/genres.py | 5 +- .../providers/bandcamp/__init__.py | 138 ++--- .../providers/bandcamp/converters.py | 38 +- tests/core/test_genres.py | 7 + tests/providers/bandcamp/test_converters.py | 115 +++++ tests/providers/bandcamp/test_provider.py | 474 ++++++++++++++---- 6 files changed, 611 insertions(+), 166 deletions(-) diff --git a/music_assistant/controllers/media/genres.py b/music_assistant/controllers/media/genres.py index ebeea959..1d26d8e7 100644 --- a/music_assistant/controllers/media/genres.py +++ b/music_assistant/controllers/media/genres.py @@ -407,7 +407,10 @@ class GenreController(MediaControllerBase[Genre]): :param media_type: The type of media item. :param media_id: The database ID of the media item. """ - media_id_int = int(media_id) + try: + media_id_int = int(media_id) + except (ValueError, TypeError): + return [] gm = DB_TABLE_GENRE_MEDIA_ITEM_MAPPING query = ( f"EXISTS(SELECT 1 FROM {gm} gm " diff --git a/music_assistant/providers/bandcamp/__init__.py b/music_assistant/providers/bandcamp/__init__.py index e15a0d68..fa8ed734 100644 --- a/music_assistant/providers/bandcamp/__init__.py +++ b/music_assistant/providers/bandcamp/__init__.py @@ -15,9 +15,14 @@ from bandcamp_async_api import ( SearchResultArtist, SearchResultTrack, ) -from bandcamp_async_api.models import CollectionType +from bandcamp_async_api.models import BCAlbum, BCTrack, CollectionType from music_assistant_models.config_entries import ConfigEntry, ConfigValueType, ProviderConfig -from music_assistant_models.enums import ConfigEntryType, MediaType, ProviderFeature, StreamType +from music_assistant_models.enums import ( + ConfigEntryType, + MediaType, + ProviderFeature, + StreamType, +) from music_assistant_models.errors import ( InvalidDataError, LoginFailed, @@ -89,12 +94,19 @@ async def get_config_entries( ) -def split_id(id_: str) -> tuple[int, int | None, int | None]: - """Return (artist_id, album_id, track_id). Missing parts are returned as 0.""" - parts = id_.split("-") - part_0 = int(parts[0]) - part_1 = int(parts[1]) if len(parts) > 1 else 0 - part_2 = int(parts[2]) if len(parts) > 2 else 0 +def split_id(id_: str) -> tuple[int, int, int]: + """Return (artist_id, album_id, track_id). Missing parts are returned as 0. + + :param id_: Compound ID string, e.g. "123-456-789". + :raises InvalidDataError: If the ID contains non-numeric parts. + """ + try: + parts = id_.split("-") + part_0 = int(parts[0]) + part_1 = int(parts[1]) if len(parts) > 1 else 0 + part_2 = int(parts[2]) if len(parts) > 2 else 0 + except (ValueError, IndexError) as error: + raise InvalidDataError(f"Malformed Bandcamp ID: {id_}") from error return part_0, part_1, part_2 @@ -124,7 +136,7 @@ class BandcampProvider(MusicProvider): ) self._converters = BandcampConverters(self.domain, self.instance_id) - # The provider can function without login (search-only), + # The provider can function without login (search and streaming), # but if credentials were explicitly configured, validate them now. # A bad login fails hard so the user can fix it immediately; # transient errors (rate limits, network) are logged and the provider @@ -149,9 +161,6 @@ class BandcampProvider(MusicProvider): ) -> SearchResults: """Perform search on music provider.""" results = SearchResults() - if not self._client.identity: - return results - if not media_types: return results @@ -254,9 +263,7 @@ class BandcampProvider(MusicProvider): api_artist = await self._client.get_artist(prov_artist_id) return self._converters.artist_from_api(api_artist) except BandcampNotFoundError as error: - raise MediaNotFoundError( - f"Bandcamp artist {prov_artist_id} search returned no results" - ) from error + raise MediaNotFoundError(f"Artist {prov_artist_id} not found on Bandcamp") from error except BandcampRateLimitError as error: raise ResourceTemporarilyUnavailable( "Bandcamp rate limit reached", backoff_time=error.retry_after @@ -273,9 +280,7 @@ class BandcampProvider(MusicProvider): api_album = await self._client.get_album(artist_id, album_id) return self._converters.album_from_api(api_album) except BandcampNotFoundError as error: - raise MediaNotFoundError( - f"Bandcamp album {prov_album_id} search returned no results" - ) from error + raise MediaNotFoundError(f"Album {prov_album_id} not found on Bandcamp") from error except BandcampRateLimitError as error: raise ResourceTemporarilyUnavailable( "Bandcamp rate limit reached", backoff_time=error.retry_after @@ -283,43 +288,55 @@ class BandcampProvider(MusicProvider): except BandcampAPIError as error: raise MediaNotFoundError(f"Failed to get album {prov_album_id}") from error - @use_cache(CACHE) @throttle_with_retries - async def get_track(self, prov_track_id: str) -> Track: - """Get full track details by id.""" - artist_id, album_id, track_id = split_id(prov_track_id) - if track_id is None: # artist_id-track_id - album_id, track_id = None, album_id + async def _fetch_api_track(self, item_id: str) -> tuple[BCTrack, BCAlbum | None]: + """Fetch a raw API track and its parent album by compound item ID. + + Uses get_album when album_id is present (most tracks), falling back + to get_track for standalone tracks (album_id=0). + + :param item_id: Compound track ID in the form artist_id-album_id-track_id. + """ + artist_id, album_id, track_id = split_id(item_id) + if not track_id: + album_id, track_id = 0, album_id try: - if all((artist_id, album_id, track_id)): + if album_id: api_album = await self._client.get_album(artist_id, album_id) - api_track = next((_ for _ in api_album.tracks if _.id == track_id), None) - return self._converters.track_from_api( - track=api_track, - album_id=api_album.id, - album_name=api_album.title, - album_image_url=api_album.art_url, - ) - if not album_id: - api_track = await self._client.get_track(artist_id, track_id) - return self._converters.track_from_api( - track=api_track, - album_id=api_track.album.id if api_track.album else None, - album_name=api_track.album.title if api_track.album else "", - album_image_url=api_track.album.art_url if api_track.album else "", - ) - raise MediaNotFoundError(f"Track {prov_track_id} not found on Bandcamp") + api_track = next((t for t in api_album.tracks if t.id == track_id), None) + if not api_track: + raise MediaNotFoundError(f"Track {item_id} not found in album on Bandcamp") + return api_track, api_album + return await self._client.get_track(artist_id, track_id), None + except BandcampMustBeLoggedInError as error: + raise LoginFailed("Bandcamp login is invalid or expired.") from error except BandcampNotFoundError as error: - raise MediaNotFoundError( - f"Bandcamp track {prov_track_id} search returned no results" - ) from error + raise MediaNotFoundError(f"Track {item_id} not found on Bandcamp") from error except BandcampRateLimitError as error: raise ResourceTemporarilyUnavailable( "Bandcamp rate limit reached", backoff_time=error.retry_after ) from error except BandcampAPIError as error: - raise MediaNotFoundError(f"Failed to get track {prov_track_id}") from error + raise MediaNotFoundError(f"Failed to get track {item_id}") from error + + @use_cache(CACHE) + async def get_track(self, prov_track_id: str) -> Track: + """Get full track details by id.""" + api_track, api_album = await self._fetch_api_track(prov_track_id) + if api_album: + return self._converters.track_from_api( + track=api_track, + album_id=api_album.id, + album_name=api_album.title, + album_image_url=api_album.art_url, + ) + return self._converters.track_from_api( + track=api_track, + album_id=api_track.album.id if api_track.album else None, + album_name=api_track.album.title if api_track.album else "", + album_image_url=api_track.album.art_url if api_track.album else "", + ) @use_cache(CACHE) @throttle_with_retries @@ -344,7 +361,7 @@ class BandcampProvider(MusicProvider): except BandcampNotFoundError as error: raise MediaNotFoundError( - f"Bandcamp album {prov_album_id} tracks search returned no results" + f"Album tracks for {prov_album_id} not found on Bandcamp" ) from error except BandcampRateLimitError as error: raise ResourceTemporarilyUnavailable( @@ -375,7 +392,7 @@ class BandcampProvider(MusicProvider): except BandcampNotFoundError as error: raise MediaNotFoundError( - f"Bandcamp artist {prov_artist_id} albums search returned no results" + f"Artist {prov_artist_id} albums not found on Bandcamp" ) from error except BandcampRateLimitError as error: raise ResourceTemporarilyUnavailable( @@ -402,28 +419,25 @@ class BandcampProvider(MusicProvider): return tracks[: self.top_tracks_limit] async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails: - """Return the content details for the given track.""" - # get_track already handles exceptions and rate limiting - track_ma = await self.get_track(item_id) - if not track_ma.metadata.links: - raise MediaNotFoundError( - f"No streaming links found for track {item_id}. Please report this" - ) + """Return the content details for the given track. - link = next(iter(track_ma.metadata.links)) - if not link: - raise MediaNotFoundError( - f"No streaming URL found for track {item_id}. Please report this" - ) + Fetches fresh from the Bandcamp API since streaming URLs may expire. + """ + api_track, _ = await self._fetch_api_track(item_id) - streaming_url = link.url + streaming_url, bitrate, content_type = self._converters.streaming_url_from_api( + api_track.streaming_url or {} + ) if not streaming_url: - raise MediaNotFoundError(f"No streaming URL found for track {item_id}: {streaming_url}") + raise MediaNotFoundError(f"No streaming URL found for track {item_id}") return StreamDetails( item_id=item_id, provider=self.instance_id, - audio_format=AudioFormat(), + audio_format=AudioFormat( + content_type=content_type, + bit_rate=bitrate, + ), stream_type=StreamType.HTTP, media_type=media_type, path=streaming_url, diff --git a/music_assistant/providers/bandcamp/converters.py b/music_assistant/providers/bandcamp/converters.py index 64bc4dee..0a792ae4 100644 --- a/music_assistant/providers/bandcamp/converters.py +++ b/music_assistant/providers/bandcamp/converters.py @@ -10,13 +10,13 @@ from bandcamp_async_api.models import ( SearchResultArtist, SearchResultTrack, ) -from music_assistant_models.enums import ImageType, LinkType, MediaType +from music_assistant_models.enums import ContentType, ImageType, MediaType from music_assistant_models.media_items import Album as MAAlbum from music_assistant_models.media_items import Artist as MAArtist from music_assistant_models.media_items import ( + AudioFormat, ItemMapping, MediaItemImage, - MediaItemLink, ProviderMapping, UniqueList, ) @@ -31,16 +31,20 @@ class BandcampConverters: self.domain = domain self.instance_id = instance_id + @staticmethod def streaming_url_from_api( - self, streaming_info: dict[str, str] - ) -> tuple[str | None, int | None]: - """Parse streaming URL info.""" - # Extract streaming URL with priority: mp3-v0 > mp3-128 + streaming_info: dict[str, str], + ) -> tuple[str | None, int | None, ContentType]: + """Parse streaming URL info. + + :param streaming_info: Dict of format keys to URLs from the Bandcamp API. + """ + # Extract streaming URL with priority: mp3-v0 > mp3-320 > mp3-128 bitrate = None streaming_url = None + content_type = ContentType.MP3 if "mp3-v0" in streaming_info: streaming_url = streaming_info["mp3-v0"] - bitrate = None # VBR elif "mp3-320" in streaming_info: streaming_url = streaming_info["mp3-320"] bitrate = 320 @@ -48,9 +52,9 @@ class BandcampConverters: streaming_url = streaming_info["mp3-128"] bitrate = 128 elif streaming_info: - # Fallback to first available URL streaming_url = next(iter(streaming_info.values())) - return streaming_url, bitrate + content_type = ContentType.UNKNOWN + return streaming_url, bitrate, content_type def track_from_search(self, item: SearchResultTrack) -> MATrack: """Create a Track from new API SearchResultTrack.""" @@ -165,6 +169,7 @@ class BandcampConverters: ) -> MATrack: """Convert a Track object from the API to MA Track format.""" album_id = album_id or 0 + _, bitrate, content_type = self.streaming_url_from_api(track.streaming_url or {}) output = MATrack( item_id=f"{track.artist.id}-{album_id}-{track.id}", provider=self.instance_id, @@ -187,6 +192,10 @@ class BandcampConverters: provider_domain=self.domain, provider_instance=self.instance_id, url=track.url, + audio_format=AudioFormat( + content_type=content_type, + bit_rate=bitrate, + ), ) }, ) @@ -208,15 +217,6 @@ class BandcampConverters: provider=self.instance_id, name=track.album.title, ) - - streaming_url, _ = self.streaming_url_from_api(track.streaming_url) - if streaming_url: - output.metadata.links = { - MediaItemLink( - type=LinkType.UNKNOWN, - url=streaming_url, - ) - } output.metadata.lyrics = track.lyrics if album_image_url: output.metadata.add_image( @@ -287,7 +287,7 @@ class BandcampConverters: url=album.url, ) }, - year=datetime.fromtimestamp(album.release_date).year, + year=datetime.fromtimestamp(album.release_date).year if album.release_date else None, ) output.metadata.add_image( MediaItemImage( diff --git a/tests/core/test_genres.py b/tests/core/test_genres.py index f970625c..a5724239 100644 --- a/tests/core/test_genres.py +++ b/tests/core/test_genres.py @@ -942,6 +942,13 @@ class TestQueryMethods: genres = await genre_ctrl.get_genres_for_media_item(MediaType.TRACK, track.item_id) assert genres == [] + async def test_get_genres_for_media_item_non_integer_id( + self, mass: MusicAssistant, genre_ctrl: GenreController + ) -> None: + """Returns empty list for non-integer provider item IDs (e.g. Bandcamp compound IDs).""" + genres = await genre_ctrl.get_genres_for_media_item(MediaType.ALBUM, "3957198221-190478553") + assert genres == [] + async def test_library_items_hide_empty_true( self, mass: MusicAssistant, genre_ctrl: GenreController ) -> None: diff --git a/tests/providers/bandcamp/test_converters.py b/tests/providers/bandcamp/test_converters.py index e5202281..2b946699 100644 --- a/tests/providers/bandcamp/test_converters.py +++ b/tests/providers/bandcamp/test_converters.py @@ -4,6 +4,7 @@ from unittest.mock import Mock import pytest from bandcamp_async_api.models import BCAlbum, BCArtist, BCTrack +from music_assistant_models.enums import ContentType from music_assistant.providers.bandcamp.converters import BandcampConverters @@ -193,3 +194,117 @@ def test_track_from_api_with_album(converters: BandcampConverters) -> None: assert result.album is not None assert result.album.item_id == "123-456" assert result.album.name == "Test Album" + + +def _make_mock_track(streaming_url: dict[str, str]) -> Mock: + """Create a mock API track with the given streaming URL.""" + mock_artist = Mock() + mock_artist.id = 123 + mock_artist.name = "Test Artist" + mock_artist.url = "https://test.bandcamp.com" + + mock_track = Mock() + mock_track.id = 789 + mock_track.title = "Test Track" + mock_track.artist = mock_artist + mock_track.url = "https://test.bandcamp.com/track/test-track" + mock_track.duration = 300 + mock_track.lyrics = None + mock_track.track_number = 1 + mock_track.streaming_url = streaming_url + return mock_track + + +def test_track_from_api_audio_format_mp3_320(converters: BandcampConverters) -> None: + """Test that mp3-320 streaming URL sets audio format correctly.""" + mock_track = _make_mock_track({"mp3-320": "https://example.com/track.mp3"}) + result = converters.track_from_api(track=mock_track, album_id=456) + mapping = next(iter(result.provider_mappings)) + assert mapping.audio_format.content_type == ContentType.MP3 + assert mapping.audio_format.bit_rate == 320 + + +def test_track_from_api_audio_format_mp3_v0(converters: BandcampConverters) -> None: + """Test that mp3-v0 streaming URL sets content type with no bitrate (VBR).""" + mock_track = _make_mock_track({"mp3-v0": "https://example.com/track.mp3"}) + result = converters.track_from_api(track=mock_track, album_id=456) + mapping = next(iter(result.provider_mappings)) + assert mapping.audio_format.content_type == ContentType.MP3 + assert mapping.audio_format.bit_rate is None + + +def test_track_from_api_audio_format_mp3_128(converters: BandcampConverters) -> None: + """Test that mp3-128 streaming URL sets audio format correctly.""" + mock_track = _make_mock_track({"mp3-128": "https://example.com/track.mp3"}) + result = converters.track_from_api(track=mock_track, album_id=456) + mapping = next(iter(result.provider_mappings)) + assert mapping.audio_format.content_type == ContentType.MP3 + assert mapping.audio_format.bit_rate == 128 + + +def test_track_from_api_audio_format_none_streaming_url(converters: BandcampConverters) -> None: + """Test that None streaming_url does not crash.""" + mock_track = _make_mock_track({"mp3-128": "https://example.com/track.mp3"}) + mock_track.streaming_url = None + result = converters.track_from_api(track=mock_track, album_id=456) + mapping = next(iter(result.provider_mappings)) + assert mapping.audio_format.content_type == ContentType.MP3 + assert mapping.audio_format.bit_rate is None + + +def test_streaming_url_priority_v0_over_320(converters: BandcampConverters) -> None: + """Test that mp3-v0 is preferred over mp3-320.""" + url, bitrate, content_type = converters.streaming_url_from_api( + { + "mp3-320": "https://example.com/320.mp3", + "mp3-v0": "https://example.com/v0.mp3", + } + ) + assert url == "https://example.com/v0.mp3" + assert bitrate is None + assert content_type == ContentType.MP3 + + +def test_streaming_url_priority_320_over_128(converters: BandcampConverters) -> None: + """Test that mp3-320 is preferred over mp3-128.""" + url, bitrate, content_type = converters.streaming_url_from_api( + { + "mp3-128": "https://example.com/128.mp3", + "mp3-320": "https://example.com/320.mp3", + } + ) + assert url == "https://example.com/320.mp3" + assert bitrate == 320 + assert content_type == ContentType.MP3 + + +def test_streaming_url_priority_v0_over_320_over_128(converters: BandcampConverters) -> None: + """Test full priority chain when all three formats are present.""" + url, bitrate, content_type = converters.streaming_url_from_api( + { + "mp3-128": "https://example.com/128.mp3", + "mp3-320": "https://example.com/320.mp3", + "mp3-v0": "https://example.com/v0.mp3", + } + ) + assert url == "https://example.com/v0.mp3" + assert bitrate is None + assert content_type == ContentType.MP3 + + +def test_streaming_url_fallback_unknown_key(converters: BandcampConverters) -> None: + """Test that an unknown streaming key falls back with UNKNOWN content type.""" + url, bitrate, content_type = converters.streaming_url_from_api( + {"ogg-vorbis": "https://example.com/track.ogg"} + ) + assert url == "https://example.com/track.ogg" + assert bitrate is None + assert content_type == ContentType.UNKNOWN + + +def test_streaming_url_empty_dict(converters: BandcampConverters) -> None: + """Test that empty dict returns None for URL and bitrate.""" + url, bitrate, content_type = converters.streaming_url_from_api({}) + assert url is None + assert bitrate is None + assert content_type == ContentType.MP3 diff --git a/tests/providers/bandcamp/test_provider.py b/tests/providers/bandcamp/test_provider.py index f5e85ce4..2a7e39cf 100644 --- a/tests/providers/bandcamp/test_provider.py +++ b/tests/providers/bandcamp/test_provider.py @@ -1,14 +1,28 @@ """Test Bandcamp Provider integration.""" +from collections.abc import AsyncGenerator from unittest.mock import AsyncMock, Mock, patch import pytest -from bandcamp_async_api import BandcampAPIError, BandcampNotFoundError -from music_assistant_models.enums import MediaType, StreamType -from music_assistant_models.errors import InvalidDataError, MediaNotFoundError +from bandcamp_async_api import ( + BandcampAPIError, + BandcampMustBeLoggedInError, + BandcampNotFoundError, + BandcampRateLimitError, + SearchResultAlbum, + SearchResultArtist, + SearchResultTrack, +) +from music_assistant_models.enums import ContentType, MediaType, StreamType +from music_assistant_models.errors import ( + InvalidDataError, + LoginFailed, + MediaNotFoundError, + RetriesExhausted, +) from music_assistant_models.streamdetails import StreamDetails -from music_assistant.providers.bandcamp import DEFAULT_TOP_TRACKS_LIMIT, BandcampProvider +from music_assistant.providers.bandcamp import DEFAULT_TOP_TRACKS_LIMIT, BandcampProvider, split_id @pytest.fixture @@ -126,30 +140,16 @@ async def test_handle_async_init_without_identity(mass_mock: Mock, manifest_mock async def test_is_streaming_provider(provider: BandcampProvider) -> None: - """Test that Bandcamp is not a streaming provider.""" + """Test that Bandcamp is a streaming provider.""" assert provider.is_streaming_provider is True async def test_search_with_identity(provider: BandcampProvider) -> None: """Test search functionality with identity token.""" - - # Create mock objects with proper class names - class MockSearchResultTrack: - def __init__(self) -> None: - self.__class__.__name__ = "SearchResultTrack" - - class MockSearchResultAlbum: - def __init__(self) -> None: - self.__class__.__name__ = "SearchResultAlbum" - - class MockSearchResultArtist: - def __init__(self) -> None: - self.__class__.__name__ = "SearchResultArtist" - mock_search_results = [ - MockSearchResultTrack(), - MockSearchResultAlbum(), - MockSearchResultArtist(), + Mock(spec=SearchResultTrack), + Mock(spec=SearchResultAlbum), + Mock(spec=SearchResultArtist), ] with ( @@ -169,9 +169,12 @@ async def test_search_with_identity(provider: BandcampProvider) -> None: ) mock_search.assert_called_once_with("test query") - assert results.tracks is not None - assert results.albums is not None - assert results.artists is not None + mock_track_converter.assert_called_once() + mock_album_converter.assert_called_once() + mock_artist_converter.assert_called_once() + assert len(results.tracks) == 1 + assert len(results.albums) == 1 + assert len(results.artists) == 1 async def test_search_without_identity(provider: BandcampProvider) -> None: @@ -218,7 +221,7 @@ async def test_get_artist_not_found(provider: BandcampProvider) -> None: patch.object( provider._client, "get_artist", side_effect=BandcampNotFoundError("Not found") ), - pytest.raises(MediaNotFoundError, match=r"Bandcamp artist 123 search returned no results"), + pytest.raises(MediaNotFoundError, match=r"Artist 123 not found on Bandcamp"), ): await provider.get_artist("123") @@ -260,13 +263,64 @@ async def test_get_track_success(provider: BandcampProvider) -> None: assert result is not None +async def test_get_track_standalone(provider: BandcampProvider) -> None: + """Test get_track for a standalone track (album_id=0) uses get_track API path.""" + mock_album_obj = Mock() + mock_album_obj.id = 456 + mock_album_obj.title = "Standalone Album" + mock_album_obj.art_url = "http://example.com/art.jpg" + + mock_api_track = Mock() + mock_api_track.album = mock_album_obj + + with ( + patch.object(provider._client, "get_track", new_callable=AsyncMock) as mock_get_track, + patch.object(provider._converters, "track_from_api") as mock_converter, + ): + mock_get_track.return_value = mock_api_track + mock_converter.return_value = Mock() + + result = await provider.get_track("123-0-789") + + mock_get_track.assert_called_once_with(123, 789) + mock_converter.assert_called_once_with( + track=mock_api_track, + album_id=456, + album_name="Standalone Album", + album_image_url="http://example.com/art.jpg", + ) + assert result is not None + + +async def test_get_track_standalone_no_album(provider: BandcampProvider) -> None: + """Test get_track for a standalone track where api_track.album is None.""" + mock_api_track = Mock() + mock_api_track.album = None + + with ( + patch.object(provider._client, "get_track", new_callable=AsyncMock) as mock_get_track, + patch.object(provider._converters, "track_from_api") as mock_converter, + ): + mock_get_track.return_value = mock_api_track + mock_converter.return_value = Mock() + + result = await provider.get_track("123-0-789") + + mock_get_track.assert_called_once_with(123, 789) + mock_converter.assert_called_once_with( + track=mock_api_track, + album_id=None, + album_name="", + album_image_url="", + ) + assert result is not None + + async def test_get_track_not_found(provider: BandcampProvider) -> None: """Test track retrieval when not found.""" with ( patch.object(provider._client, "get_album", side_effect=BandcampNotFoundError("Not found")), - pytest.raises( - MediaNotFoundError, match=r"Bandcamp track 123-456-789 search returned no results" - ), + pytest.raises(MediaNotFoundError, match=r"Track 123-456-789 not found on Bandcamp"), ): await provider.get_track("123-456-789") @@ -313,63 +367,241 @@ async def test_get_artist_albums_success(provider: BandcampProvider) -> None: async def test_get_stream_details_success(provider: BandcampProvider) -> None: - """Test successful stream details retrieval.""" - # Create mock album and track with proper attributes - mock_artist = Mock() - mock_artist.id = 123 - mock_artist.name = "Test Artist" + """Test stream details fetches fresh URL and audio format from API.""" + mock_api_track = Mock() + mock_api_track.id = 789 + mock_api_track.streaming_url = {"mp3-320": "http://example.com/track.mp3"} + mock_api_album = Mock() + mock_api_album.tracks = [mock_api_track] - mock_track = Mock() - mock_track.id = 789 - mock_track.artist = mock_artist - mock_track.title = "Test Track" - mock_track.duration = 180 - mock_track.track_number = 1 - mock_track.streaming_url = {"mp3-320": "http://example.com/track.mp3"} - mock_track.url = "http://example.com/track" - mock_track.lyrics = None + with patch.object(provider._client, "get_album", new_callable=AsyncMock) as mock_get_album: + mock_get_album.return_value = mock_api_album - mock_album = Mock() - mock_album.id = 456 - mock_album.title = "Test Album" - mock_album.art_url = "http://example.com/art.jpg" - mock_album.artist = mock_artist - mock_album.tracks = [mock_track] + result = await provider.get_stream_details("123-456-789", MediaType.TRACK) + + mock_get_album.assert_called_once_with(123, 456) + assert isinstance(result, StreamDetails) + assert result.item_id == "123-456-789" + assert result.media_type == MediaType.TRACK + assert result.stream_type == StreamType.HTTP + assert result.path == "http://example.com/track.mp3" + assert result.audio_format.content_type == ContentType.MP3 + assert result.audio_format.bit_rate == 320 + + +async def test_get_stream_details_vbr(provider: BandcampProvider) -> None: + """Test stream details with VBR mp3-v0 format.""" + mock_api_track = Mock() + mock_api_track.id = 789 + mock_api_track.streaming_url = {"mp3-v0": "http://example.com/track-v0.mp3"} + mock_api_album = Mock() + mock_api_album.tracks = [mock_api_track] + + with patch.object(provider._client, "get_album", new_callable=AsyncMock) as mock_get_album: + mock_get_album.return_value = mock_api_album + + result = await provider.get_stream_details("123-456-789", MediaType.TRACK) + + assert result.path == "http://example.com/track-v0.mp3" + assert result.audio_format.content_type == ContentType.MP3 + assert result.audio_format.bit_rate is None + + +async def test_get_stream_details_no_streaming_url(provider: BandcampProvider) -> None: + """Test stream details when API track has no streaming URL.""" + mock_api_track = Mock() + mock_api_track.id = 789 + mock_api_track.streaming_url = {} + mock_api_album = Mock() + mock_api_album.tracks = [mock_api_track] + + with patch.object(provider._client, "get_album", new_callable=AsyncMock) as mock_get_album: + mock_get_album.return_value = mock_api_album + + with pytest.raises(MediaNotFoundError, match=r"No streaming URL found"): + await provider.get_stream_details("123-456-789", MediaType.TRACK) + + +async def test_get_stream_details_none_streaming_url(provider: BandcampProvider) -> None: + """Test stream details when API track has streaming_url=None.""" + mock_api_track = Mock() + mock_api_track.id = 789 + mock_api_track.streaming_url = None + mock_api_album = Mock() + mock_api_album.tracks = [mock_api_track] + + with patch.object(provider._client, "get_album", new_callable=AsyncMock) as mock_get_album: + mock_get_album.return_value = mock_api_album + + with pytest.raises(MediaNotFoundError, match=r"No streaming URL found"): + await provider.get_stream_details("123-456-789", MediaType.TRACK) + + +async def test_get_stream_details_bypasses_cache(provider: BandcampProvider) -> None: + """Test that get_stream_details calls API directly, not cached get_track.""" + mock_api_track = Mock() + mock_api_track.id = 789 + mock_api_track.streaming_url = {"mp3-128": "http://example.com/track.mp3"} + mock_api_album = Mock() + mock_api_album.tracks = [mock_api_track] with ( patch.object(provider._client, "get_album", new_callable=AsyncMock) as mock_get_album, - patch.object(provider._converters, "track_from_api") as mock_converter, + patch.object(provider, "get_track", new_callable=AsyncMock) as mock_get_track, ): - mock_get_album.return_value = mock_album - - # Create a mock track with metadata.links containing the streaming URL - mock_ma_track = Mock() - mock_link = Mock() - mock_link.url = "http://example.com/track.mp3" - mock_ma_track.metadata.links = {mock_link} - mock_converter.return_value = mock_ma_track + mock_get_album.return_value = mock_api_album result = await provider.get_stream_details("123-456-789", MediaType.TRACK) - assert isinstance(result, StreamDetails) - assert result.stream_type == StreamType.HTTP + mock_get_album.assert_called_once() + mock_get_track.assert_not_called() assert result.path == "http://example.com/track.mp3" + assert result.audio_format.content_type == ContentType.MP3 -async def test_get_stream_details_no_streaming_url(provider: BandcampProvider) -> None: - """Test stream details when no streaming URL is available.""" - # Mock the get_track method directly to return a track with no streaming URLs - mock_track = Mock() - mock_track.metadata.links = [] # Empty links list means no streaming URL +async def test_fetch_api_track_album_path(provider: BandcampProvider) -> None: + """Test _fetch_api_track with 3-part ID routes through get_album.""" + mock_api_track = Mock() + mock_api_track.id = 789 + mock_api_album = Mock() + mock_api_album.tracks = [mock_api_track] - with patch.object(provider, "get_track", new_callable=AsyncMock) as mock_get_track: - mock_get_track.return_value = mock_track + with patch.object(provider._client, "get_album", new_callable=AsyncMock) as mock_get_album: + mock_get_album.return_value = mock_api_album - with pytest.raises( - MediaNotFoundError, - match=r"No streaming links found for track 123-456-789. Please report this", - ): - await provider.get_stream_details("123-456-789", MediaType.TRACK) + api_track, api_album = await provider._fetch_api_track("123-456-789") + + mock_get_album.assert_called_once_with(123, 456) + assert api_track is mock_api_track + assert api_album is mock_api_album + + +async def test_fetch_api_track_standalone_path(provider: BandcampProvider) -> None: + """Test _fetch_api_track with album_id=0 routes through get_track.""" + mock_api_track = Mock() + + with patch.object(provider._client, "get_track", new_callable=AsyncMock) as mock_get_track: + mock_get_track.return_value = mock_api_track + + api_track, api_album = await provider._fetch_api_track("123-0-789") + + mock_get_track.assert_called_once_with(123, 789) + assert api_track is mock_api_track + assert api_album is None + + +async def test_fetch_api_track_not_in_album(provider: BandcampProvider) -> None: + """Test _fetch_api_track raises when track ID not found in album tracks.""" + mock_other_track = Mock() + mock_other_track.id = 999 + mock_api_album = Mock() + mock_api_album.tracks = [mock_other_track] + + with patch.object(provider._client, "get_album", new_callable=AsyncMock) as mock_get_album: + mock_get_album.return_value = mock_api_album + + with pytest.raises(MediaNotFoundError, match=r"not found in album"): + await provider._fetch_api_track("123-456-789") + + +async def test_fetch_api_track_not_found_error(provider: BandcampProvider) -> None: + """Test _fetch_api_track converts BandcampNotFoundError.""" + with ( + patch.object( + provider._client, + "get_album", + side_effect=BandcampNotFoundError("Not found"), + ), + pytest.raises(MediaNotFoundError, match=r"not found on Bandcamp"), + ): + await provider._fetch_api_track("123-456-789") + + +async def test_fetch_api_track_rate_limit_error(provider: BandcampProvider) -> None: + """Test _fetch_api_track converts BandcampRateLimitError. + + Since @throttle_with_retries is on _fetch_api_track, persistent rate + limiting exhausts retries and raises RetriesExhausted. + """ + rate_error = BandcampRateLimitError("Rate limited") + rate_error.retry_after = 3 + + with ( + patch.object( + provider._client, + "get_album", + side_effect=rate_error, + ) as mock_get_album, + patch("asyncio.sleep", new_callable=AsyncMock) as mock_sleep, + pytest.raises(RetriesExhausted), + ): + await provider._fetch_api_track("123-456-789") + + assert mock_get_album.call_count == provider.throttler.retry_attempts + # At least retry_attempts - 1 sleeps from backoff; may be higher if the + # class-level Throttler also called asyncio.sleep due to accumulated entries. + assert mock_sleep.call_count >= provider.throttler.retry_attempts - 1 + + +async def test_fetch_api_track_generic_api_error(provider: BandcampProvider) -> None: + """Test _fetch_api_track converts generic BandcampAPIError to MediaNotFoundError.""" + with ( + patch.object( + provider._client, + "get_album", + side_effect=BandcampAPIError("Something went wrong"), + ), + pytest.raises(MediaNotFoundError, match=r"Failed to get track 123-456-789"), + ): + await provider._fetch_api_track("123-456-789") + + +def test_split_id_three_parts() -> None: + """Test split_id with a 3-part compound ID.""" + assert split_id("123-456-789") == (123, 456, 789) + + +def test_split_id_two_parts() -> None: + """Test split_id with a 2-part compound ID.""" + assert split_id("123-456") == (123, 456, 0) + + +def test_split_id_one_part() -> None: + """Test split_id with a single ID.""" + assert split_id("123") == (123, 0, 0) + + +async def test_fetch_api_track_two_part_id(provider: BandcampProvider) -> None: + """Test _fetch_api_track with 2-part ID routes through get_track.""" + # split_id("123-789") returns (123, 789, 0); since track_id=0, + # the method swaps to album_id=0, track_id=789 and uses get_track. + mock_api_track = Mock() + + with patch.object(provider._client, "get_track", new_callable=AsyncMock) as mock_get_track: + mock_get_track.return_value = mock_api_track + + api_track, api_album = await provider._fetch_api_track("123-789") + + mock_get_track.assert_called_once_with(123, 789) + assert api_track is mock_api_track + assert api_album is None + + +async def test_get_stream_details_standalone_track(provider: BandcampProvider) -> None: + """Test stream details for a standalone track (album_id=0).""" + mock_api_track = Mock() + mock_api_track.streaming_url = {"mp3-128": "http://example.com/standalone.mp3"} + + with patch.object(provider._client, "get_track", new_callable=AsyncMock) as mock_get_track: + mock_get_track.return_value = mock_api_track + + result = await provider.get_stream_details("123-0-789", MediaType.TRACK) + + mock_get_track.assert_called_once_with(123, 789) + assert isinstance(result, StreamDetails) + assert result.path == "http://example.com/standalone.mp3" + assert result.audio_format.content_type == ContentType.MP3 + assert result.audio_format.bit_rate == 128 async def test_get_artist_toptracks_success(provider: BandcampProvider) -> None: @@ -392,23 +624,97 @@ async def test_get_artist_toptracks_success(provider: BandcampProvider) -> None: async def test_get_library_artists_success(provider: BandcampProvider) -> None: """Test successful library artists retrieval.""" - # Test that the method exists and doesn't raise an exception - # This is a complex async generator method, so we just test it can be called - assert hasattr(provider, "get_library_artists") - assert callable(provider.get_library_artists) + mock_collection = Mock() + mock_collection.items = [ + Mock(item_type="band", item_id=100, band_id=100), + Mock(item_type="album", item_id=200, band_id=300), + ] + + with ( + patch.object( + provider._client, "get_collection_items", new_callable=AsyncMock + ) as mock_get_collection, + patch.object(provider, "get_artist", new_callable=AsyncMock) as mock_get_artist, + ): + mock_get_collection.return_value = mock_collection + mock_get_artist.return_value = Mock() + + artists = [artist async for artist in provider.get_library_artists()] + + assert len(artists) == 2 + assert mock_get_artist.call_count == 2 + + +async def test_get_library_artists_no_identity(provider: BandcampProvider) -> None: + """Test that library artists returns nothing without identity.""" + provider._client.identity = None + artists = [artist async for artist in provider.get_library_artists()] + assert len(artists) == 0 async def test_get_library_albums_success(provider: BandcampProvider) -> None: """Test successful library albums retrieval.""" - # Test that the method exists and doesn't raise an exception - # This is a complex async generator method, so we just test it can be called - assert hasattr(provider, "get_library_albums") - assert callable(provider.get_library_albums) + mock_collection = Mock() + mock_collection.items = [ + Mock(item_type="album", item_id=456, band_id=123), + ] + + with ( + patch.object( + provider._client, "get_collection_items", new_callable=AsyncMock + ) as mock_get_collection, + patch.object(provider, "get_album", new_callable=AsyncMock) as mock_get_album, + ): + mock_get_collection.return_value = mock_collection + mock_get_album.return_value = Mock() + + albums = [album async for album in provider.get_library_albums()] + + assert len(albums) == 1 + mock_get_album.assert_called_once_with("123-456") async def test_get_library_tracks_success(provider: BandcampProvider) -> None: """Test successful library tracks retrieval.""" - # Test that the method exists and doesn't raise an exception - # This is a complex async generator method, so we just test it can be called - assert hasattr(provider, "get_library_tracks") - assert callable(provider.get_library_tracks) + mock_track = Mock() + + with ( + patch.object(provider, "get_library_albums") as mock_get_albums, + patch.object(provider, "get_album_tracks", new_callable=AsyncMock) as mock_get_tracks, + ): + # Make get_library_albums an async generator + async def mock_albums_gen() -> AsyncGenerator[Mock, None]: + yield Mock(item_id="123-456") + + mock_get_albums.return_value = mock_albums_gen() + mock_get_tracks.return_value = [mock_track] + + tracks = [track async for track in provider.get_library_tracks()] + + assert len(tracks) == 1 + mock_get_tracks.assert_called_once_with("123-456") + + +def test_split_id_malformed_non_numeric() -> None: + """Test split_id raises InvalidDataError on non-numeric input.""" + with pytest.raises(InvalidDataError, match=r"Malformed Bandcamp ID"): + split_id("abc-def") + + +def test_split_id_malformed_empty() -> None: + """Test split_id raises InvalidDataError on empty string.""" + with pytest.raises(InvalidDataError, match=r"Malformed Bandcamp ID"): + split_id("") + + +async def test_fetch_api_track_login_error(provider: BandcampProvider) -> None: + """Test _fetch_api_track converts BandcampMustBeLoggedInError to LoginFailed.""" + with ( + patch.object( + provider._client, + "get_album", + side_effect=BandcampMustBeLoggedInError("Must be logged in"), + ), + pytest.raises(LoginFailed, match=r"login is invalid or expired"), + ): + await provider._fetch_api_track("123-456-789") -- 2.34.1