SearchResultArtist,
SearchResultTrack,
)
-from bandcamp_async_api.models import CollectionType
+from bandcamp_async_api.models import BCAlbum, BCTrack, CollectionType
from music_assistant_models.config_entries import ConfigEntry, ConfigValueType, ProviderConfig
-from music_assistant_models.enums import ConfigEntryType, MediaType, ProviderFeature, StreamType
+from music_assistant_models.enums import (
+ ConfigEntryType,
+ MediaType,
+ ProviderFeature,
+ StreamType,
+)
from music_assistant_models.errors import (
InvalidDataError,
LoginFailed,
)
-def split_id(id_: str) -> tuple[int, int | None, int | None]:
- """Return (artist_id, album_id, track_id). Missing parts are returned as 0."""
- parts = id_.split("-")
- part_0 = int(parts[0])
- part_1 = int(parts[1]) if len(parts) > 1 else 0
- part_2 = int(parts[2]) if len(parts) > 2 else 0
+def split_id(id_: str) -> tuple[int, int, int]:
+ """Return (artist_id, album_id, track_id). Missing parts are returned as 0.
+
+ :param id_: Compound ID string, e.g. "123-456-789".
+ :raises InvalidDataError: If the ID contains non-numeric parts.
+ """
+ try:
+ parts = id_.split("-")
+ part_0 = int(parts[0])
+ part_1 = int(parts[1]) if len(parts) > 1 else 0
+ part_2 = int(parts[2]) if len(parts) > 2 else 0
+ except (ValueError, IndexError) as error:
+ raise InvalidDataError(f"Malformed Bandcamp ID: {id_}") from error
return part_0, part_1, part_2
)
self._converters = BandcampConverters(self.domain, self.instance_id)
- # The provider can function without login (search-only),
+ # The provider can function without login (search and streaming),
# but if credentials were explicitly configured, validate them now.
# A bad login fails hard so the user can fix it immediately;
# transient errors (rate limits, network) are logged and the provider
) -> SearchResults:
"""Perform search on music provider."""
results = SearchResults()
- if not self._client.identity:
- return results
-
if not media_types:
return results
api_artist = await self._client.get_artist(prov_artist_id)
return self._converters.artist_from_api(api_artist)
except BandcampNotFoundError as error:
- raise MediaNotFoundError(
- f"Bandcamp artist {prov_artist_id} search returned no results"
- ) from error
+ raise MediaNotFoundError(f"Artist {prov_artist_id} not found on Bandcamp") from error
except BandcampRateLimitError as error:
raise ResourceTemporarilyUnavailable(
"Bandcamp rate limit reached", backoff_time=error.retry_after
api_album = await self._client.get_album(artist_id, album_id)
return self._converters.album_from_api(api_album)
except BandcampNotFoundError as error:
- raise MediaNotFoundError(
- f"Bandcamp album {prov_album_id} search returned no results"
- ) from error
+ raise MediaNotFoundError(f"Album {prov_album_id} not found on Bandcamp") from error
except BandcampRateLimitError as error:
raise ResourceTemporarilyUnavailable(
"Bandcamp rate limit reached", backoff_time=error.retry_after
except BandcampAPIError as error:
raise MediaNotFoundError(f"Failed to get album {prov_album_id}") from error
- @use_cache(CACHE)
@throttle_with_retries
- async def get_track(self, prov_track_id: str) -> Track:
- """Get full track details by id."""
- artist_id, album_id, track_id = split_id(prov_track_id)
- if track_id is None: # artist_id-track_id
- album_id, track_id = None, album_id
+ async def _fetch_api_track(self, item_id: str) -> tuple[BCTrack, BCAlbum | None]:
+ """Fetch a raw API track and its parent album by compound item ID.
+
+ Uses get_album when album_id is present (most tracks), falling back
+ to get_track for standalone tracks (album_id=0).
+
+ :param item_id: Compound track ID in the form artist_id-album_id-track_id.
+ """
+ artist_id, album_id, track_id = split_id(item_id)
+ if not track_id:
+ album_id, track_id = 0, album_id
try:
- if all((artist_id, album_id, track_id)):
+ if album_id:
api_album = await self._client.get_album(artist_id, album_id)
- api_track = next((_ for _ in api_album.tracks if _.id == track_id), None)
- return self._converters.track_from_api(
- track=api_track,
- album_id=api_album.id,
- album_name=api_album.title,
- album_image_url=api_album.art_url,
- )
- if not album_id:
- api_track = await self._client.get_track(artist_id, track_id)
- return self._converters.track_from_api(
- track=api_track,
- album_id=api_track.album.id if api_track.album else None,
- album_name=api_track.album.title if api_track.album else "",
- album_image_url=api_track.album.art_url if api_track.album else "",
- )
- raise MediaNotFoundError(f"Track {prov_track_id} not found on Bandcamp")
+ api_track = next((t for t in api_album.tracks if t.id == track_id), None)
+ if not api_track:
+ raise MediaNotFoundError(f"Track {item_id} not found in album on Bandcamp")
+ return api_track, api_album
+ return await self._client.get_track(artist_id, track_id), None
+ except BandcampMustBeLoggedInError as error:
+ raise LoginFailed("Bandcamp login is invalid or expired.") from error
except BandcampNotFoundError as error:
- raise MediaNotFoundError(
- f"Bandcamp track {prov_track_id} search returned no results"
- ) from error
+ raise MediaNotFoundError(f"Track {item_id} not found on Bandcamp") from error
except BandcampRateLimitError as error:
raise ResourceTemporarilyUnavailable(
"Bandcamp rate limit reached", backoff_time=error.retry_after
) from error
except BandcampAPIError as error:
- raise MediaNotFoundError(f"Failed to get track {prov_track_id}") from error
+ raise MediaNotFoundError(f"Failed to get track {item_id}") from error
+
+ @use_cache(CACHE)
+ async def get_track(self, prov_track_id: str) -> Track:
+ """Get full track details by id."""
+ api_track, api_album = await self._fetch_api_track(prov_track_id)
+ if api_album:
+ return self._converters.track_from_api(
+ track=api_track,
+ album_id=api_album.id,
+ album_name=api_album.title,
+ album_image_url=api_album.art_url,
+ )
+ return self._converters.track_from_api(
+ track=api_track,
+ album_id=api_track.album.id if api_track.album else None,
+ album_name=api_track.album.title if api_track.album else "",
+ album_image_url=api_track.album.art_url if api_track.album else "",
+ )
@use_cache(CACHE)
@throttle_with_retries
except BandcampNotFoundError as error:
raise MediaNotFoundError(
- f"Bandcamp album {prov_album_id} tracks search returned no results"
+ f"Album tracks for {prov_album_id} not found on Bandcamp"
) from error
except BandcampRateLimitError as error:
raise ResourceTemporarilyUnavailable(
except BandcampNotFoundError as error:
raise MediaNotFoundError(
- f"Bandcamp artist {prov_artist_id} albums search returned no results"
+ f"Artist {prov_artist_id} albums not found on Bandcamp"
) from error
except BandcampRateLimitError as error:
raise ResourceTemporarilyUnavailable(
return tracks[: self.top_tracks_limit]
async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails:
- """Return the content details for the given track."""
- # get_track already handles exceptions and rate limiting
- track_ma = await self.get_track(item_id)
- if not track_ma.metadata.links:
- raise MediaNotFoundError(
- f"No streaming links found for track {item_id}. Please report this"
- )
+ """Return the content details for the given track.
- link = next(iter(track_ma.metadata.links))
- if not link:
- raise MediaNotFoundError(
- f"No streaming URL found for track {item_id}. Please report this"
- )
+ Fetches fresh from the Bandcamp API since streaming URLs may expire.
+ """
+ api_track, _ = await self._fetch_api_track(item_id)
- streaming_url = link.url
+ streaming_url, bitrate, content_type = self._converters.streaming_url_from_api(
+ api_track.streaming_url or {}
+ )
if not streaming_url:
- raise MediaNotFoundError(f"No streaming URL found for track {item_id}: {streaming_url}")
+ raise MediaNotFoundError(f"No streaming URL found for track {item_id}")
return StreamDetails(
item_id=item_id,
provider=self.instance_id,
- audio_format=AudioFormat(),
+ audio_format=AudioFormat(
+ content_type=content_type,
+ bit_rate=bitrate,
+ ),
stream_type=StreamType.HTTP,
media_type=media_type,
path=streaming_url,
import pytest
from bandcamp_async_api.models import BCAlbum, BCArtist, BCTrack
+from music_assistant_models.enums import ContentType
from music_assistant.providers.bandcamp.converters import BandcampConverters
assert result.album is not None
assert result.album.item_id == "123-456"
assert result.album.name == "Test Album"
+
+
+def _make_mock_track(streaming_url: dict[str, str]) -> Mock:
+ """Create a mock API track with the given streaming URL."""
+ mock_artist = Mock()
+ mock_artist.id = 123
+ mock_artist.name = "Test Artist"
+ mock_artist.url = "https://test.bandcamp.com"
+
+ mock_track = Mock()
+ mock_track.id = 789
+ mock_track.title = "Test Track"
+ mock_track.artist = mock_artist
+ mock_track.url = "https://test.bandcamp.com/track/test-track"
+ mock_track.duration = 300
+ mock_track.lyrics = None
+ mock_track.track_number = 1
+ mock_track.streaming_url = streaming_url
+ return mock_track
+
+
+def test_track_from_api_audio_format_mp3_320(converters: BandcampConverters) -> None:
+ """Test that mp3-320 streaming URL sets audio format correctly."""
+ mock_track = _make_mock_track({"mp3-320": "https://example.com/track.mp3"})
+ result = converters.track_from_api(track=mock_track, album_id=456)
+ mapping = next(iter(result.provider_mappings))
+ assert mapping.audio_format.content_type == ContentType.MP3
+ assert mapping.audio_format.bit_rate == 320
+
+
+def test_track_from_api_audio_format_mp3_v0(converters: BandcampConverters) -> None:
+ """Test that mp3-v0 streaming URL sets content type with no bitrate (VBR)."""
+ mock_track = _make_mock_track({"mp3-v0": "https://example.com/track.mp3"})
+ result = converters.track_from_api(track=mock_track, album_id=456)
+ mapping = next(iter(result.provider_mappings))
+ assert mapping.audio_format.content_type == ContentType.MP3
+ assert mapping.audio_format.bit_rate is None
+
+
+def test_track_from_api_audio_format_mp3_128(converters: BandcampConverters) -> None:
+ """Test that mp3-128 streaming URL sets audio format correctly."""
+ mock_track = _make_mock_track({"mp3-128": "https://example.com/track.mp3"})
+ result = converters.track_from_api(track=mock_track, album_id=456)
+ mapping = next(iter(result.provider_mappings))
+ assert mapping.audio_format.content_type == ContentType.MP3
+ assert mapping.audio_format.bit_rate == 128
+
+
+def test_track_from_api_audio_format_none_streaming_url(converters: BandcampConverters) -> None:
+ """Test that None streaming_url does not crash."""
+ mock_track = _make_mock_track({"mp3-128": "https://example.com/track.mp3"})
+ mock_track.streaming_url = None
+ result = converters.track_from_api(track=mock_track, album_id=456)
+ mapping = next(iter(result.provider_mappings))
+ assert mapping.audio_format.content_type == ContentType.MP3
+ assert mapping.audio_format.bit_rate is None
+
+
+def test_streaming_url_priority_v0_over_320(converters: BandcampConverters) -> None:
+ """Test that mp3-v0 is preferred over mp3-320."""
+ url, bitrate, content_type = converters.streaming_url_from_api(
+ {
+ "mp3-320": "https://example.com/320.mp3",
+ "mp3-v0": "https://example.com/v0.mp3",
+ }
+ )
+ assert url == "https://example.com/v0.mp3"
+ assert bitrate is None
+ assert content_type == ContentType.MP3
+
+
+def test_streaming_url_priority_320_over_128(converters: BandcampConverters) -> None:
+ """Test that mp3-320 is preferred over mp3-128."""
+ url, bitrate, content_type = converters.streaming_url_from_api(
+ {
+ "mp3-128": "https://example.com/128.mp3",
+ "mp3-320": "https://example.com/320.mp3",
+ }
+ )
+ assert url == "https://example.com/320.mp3"
+ assert bitrate == 320
+ assert content_type == ContentType.MP3
+
+
+def test_streaming_url_priority_v0_over_320_over_128(converters: BandcampConverters) -> None:
+ """Test full priority chain when all three formats are present."""
+ url, bitrate, content_type = converters.streaming_url_from_api(
+ {
+ "mp3-128": "https://example.com/128.mp3",
+ "mp3-320": "https://example.com/320.mp3",
+ "mp3-v0": "https://example.com/v0.mp3",
+ }
+ )
+ assert url == "https://example.com/v0.mp3"
+ assert bitrate is None
+ assert content_type == ContentType.MP3
+
+
+def test_streaming_url_fallback_unknown_key(converters: BandcampConverters) -> None:
+ """Test that an unknown streaming key falls back with UNKNOWN content type."""
+ url, bitrate, content_type = converters.streaming_url_from_api(
+ {"ogg-vorbis": "https://example.com/track.ogg"}
+ )
+ assert url == "https://example.com/track.ogg"
+ assert bitrate is None
+ assert content_type == ContentType.UNKNOWN
+
+
+def test_streaming_url_empty_dict(converters: BandcampConverters) -> None:
+ """Test that empty dict returns None for URL and bitrate."""
+ url, bitrate, content_type = converters.streaming_url_from_api({})
+ assert url is None
+ assert bitrate is None
+ assert content_type == ContentType.MP3
"""Test Bandcamp Provider integration."""
+from collections.abc import AsyncGenerator
from unittest.mock import AsyncMock, Mock, patch
import pytest
-from bandcamp_async_api import BandcampAPIError, BandcampNotFoundError
-from music_assistant_models.enums import MediaType, StreamType
-from music_assistant_models.errors import InvalidDataError, MediaNotFoundError
+from bandcamp_async_api import (
+ BandcampAPIError,
+ BandcampMustBeLoggedInError,
+ BandcampNotFoundError,
+ BandcampRateLimitError,
+ SearchResultAlbum,
+ SearchResultArtist,
+ SearchResultTrack,
+)
+from music_assistant_models.enums import ContentType, MediaType, StreamType
+from music_assistant_models.errors import (
+ InvalidDataError,
+ LoginFailed,
+ MediaNotFoundError,
+ RetriesExhausted,
+)
from music_assistant_models.streamdetails import StreamDetails
-from music_assistant.providers.bandcamp import DEFAULT_TOP_TRACKS_LIMIT, BandcampProvider
+from music_assistant.providers.bandcamp import DEFAULT_TOP_TRACKS_LIMIT, BandcampProvider, split_id
@pytest.fixture
async def test_is_streaming_provider(provider: BandcampProvider) -> None:
- """Test that Bandcamp is not a streaming provider."""
+ """Test that Bandcamp is a streaming provider."""
assert provider.is_streaming_provider is True
async def test_search_with_identity(provider: BandcampProvider) -> None:
"""Test search functionality with identity token."""
-
- # Create mock objects with proper class names
- class MockSearchResultTrack:
- def __init__(self) -> None:
- self.__class__.__name__ = "SearchResultTrack"
-
- class MockSearchResultAlbum:
- def __init__(self) -> None:
- self.__class__.__name__ = "SearchResultAlbum"
-
- class MockSearchResultArtist:
- def __init__(self) -> None:
- self.__class__.__name__ = "SearchResultArtist"
-
mock_search_results = [
- MockSearchResultTrack(),
- MockSearchResultAlbum(),
- MockSearchResultArtist(),
+ Mock(spec=SearchResultTrack),
+ Mock(spec=SearchResultAlbum),
+ Mock(spec=SearchResultArtist),
]
with (
)
mock_search.assert_called_once_with("test query")
- assert results.tracks is not None
- assert results.albums is not None
- assert results.artists is not None
+ mock_track_converter.assert_called_once()
+ mock_album_converter.assert_called_once()
+ mock_artist_converter.assert_called_once()
+ assert len(results.tracks) == 1
+ assert len(results.albums) == 1
+ assert len(results.artists) == 1
async def test_search_without_identity(provider: BandcampProvider) -> None:
patch.object(
provider._client, "get_artist", side_effect=BandcampNotFoundError("Not found")
),
- pytest.raises(MediaNotFoundError, match=r"Bandcamp artist 123 search returned no results"),
+ pytest.raises(MediaNotFoundError, match=r"Artist 123 not found on Bandcamp"),
):
await provider.get_artist("123")
assert result is not None
+async def test_get_track_standalone(provider: BandcampProvider) -> None:
+ """Test get_track for a standalone track (album_id=0) uses get_track API path."""
+ mock_album_obj = Mock()
+ mock_album_obj.id = 456
+ mock_album_obj.title = "Standalone Album"
+ mock_album_obj.art_url = "http://example.com/art.jpg"
+
+ mock_api_track = Mock()
+ mock_api_track.album = mock_album_obj
+
+ with (
+ patch.object(provider._client, "get_track", new_callable=AsyncMock) as mock_get_track,
+ patch.object(provider._converters, "track_from_api") as mock_converter,
+ ):
+ mock_get_track.return_value = mock_api_track
+ mock_converter.return_value = Mock()
+
+ result = await provider.get_track("123-0-789")
+
+ mock_get_track.assert_called_once_with(123, 789)
+ mock_converter.assert_called_once_with(
+ track=mock_api_track,
+ album_id=456,
+ album_name="Standalone Album",
+ album_image_url="http://example.com/art.jpg",
+ )
+ assert result is not None
+
+
+async def test_get_track_standalone_no_album(provider: BandcampProvider) -> None:
+ """Test get_track for a standalone track where api_track.album is None."""
+ mock_api_track = Mock()
+ mock_api_track.album = None
+
+ with (
+ patch.object(provider._client, "get_track", new_callable=AsyncMock) as mock_get_track,
+ patch.object(provider._converters, "track_from_api") as mock_converter,
+ ):
+ mock_get_track.return_value = mock_api_track
+ mock_converter.return_value = Mock()
+
+ result = await provider.get_track("123-0-789")
+
+ mock_get_track.assert_called_once_with(123, 789)
+ mock_converter.assert_called_once_with(
+ track=mock_api_track,
+ album_id=None,
+ album_name="",
+ album_image_url="",
+ )
+ assert result is not None
+
+
async def test_get_track_not_found(provider: BandcampProvider) -> None:
"""Test track retrieval when not found."""
with (
patch.object(provider._client, "get_album", side_effect=BandcampNotFoundError("Not found")),
- pytest.raises(
- MediaNotFoundError, match=r"Bandcamp track 123-456-789 search returned no results"
- ),
+ pytest.raises(MediaNotFoundError, match=r"Track 123-456-789 not found on Bandcamp"),
):
await provider.get_track("123-456-789")
async def test_get_stream_details_success(provider: BandcampProvider) -> None:
- """Test successful stream details retrieval."""
- # Create mock album and track with proper attributes
- mock_artist = Mock()
- mock_artist.id = 123
- mock_artist.name = "Test Artist"
+ """Test stream details fetches fresh URL and audio format from API."""
+ mock_api_track = Mock()
+ mock_api_track.id = 789
+ mock_api_track.streaming_url = {"mp3-320": "http://example.com/track.mp3"}
+ mock_api_album = Mock()
+ mock_api_album.tracks = [mock_api_track]
- mock_track = Mock()
- mock_track.id = 789
- mock_track.artist = mock_artist
- mock_track.title = "Test Track"
- mock_track.duration = 180
- mock_track.track_number = 1
- mock_track.streaming_url = {"mp3-320": "http://example.com/track.mp3"}
- mock_track.url = "http://example.com/track"
- mock_track.lyrics = None
+ with patch.object(provider._client, "get_album", new_callable=AsyncMock) as mock_get_album:
+ mock_get_album.return_value = mock_api_album
- mock_album = Mock()
- mock_album.id = 456
- mock_album.title = "Test Album"
- mock_album.art_url = "http://example.com/art.jpg"
- mock_album.artist = mock_artist
- mock_album.tracks = [mock_track]
+ result = await provider.get_stream_details("123-456-789", MediaType.TRACK)
+
+ mock_get_album.assert_called_once_with(123, 456)
+ assert isinstance(result, StreamDetails)
+ assert result.item_id == "123-456-789"
+ assert result.media_type == MediaType.TRACK
+ assert result.stream_type == StreamType.HTTP
+ assert result.path == "http://example.com/track.mp3"
+ assert result.audio_format.content_type == ContentType.MP3
+ assert result.audio_format.bit_rate == 320
+
+
+async def test_get_stream_details_vbr(provider: BandcampProvider) -> None:
+ """Test stream details with VBR mp3-v0 format."""
+ mock_api_track = Mock()
+ mock_api_track.id = 789
+ mock_api_track.streaming_url = {"mp3-v0": "http://example.com/track-v0.mp3"}
+ mock_api_album = Mock()
+ mock_api_album.tracks = [mock_api_track]
+
+ with patch.object(provider._client, "get_album", new_callable=AsyncMock) as mock_get_album:
+ mock_get_album.return_value = mock_api_album
+
+ result = await provider.get_stream_details("123-456-789", MediaType.TRACK)
+
+ assert result.path == "http://example.com/track-v0.mp3"
+ assert result.audio_format.content_type == ContentType.MP3
+ assert result.audio_format.bit_rate is None
+
+
+async def test_get_stream_details_no_streaming_url(provider: BandcampProvider) -> None:
+ """Test stream details when API track has no streaming URL."""
+ mock_api_track = Mock()
+ mock_api_track.id = 789
+ mock_api_track.streaming_url = {}
+ mock_api_album = Mock()
+ mock_api_album.tracks = [mock_api_track]
+
+ with patch.object(provider._client, "get_album", new_callable=AsyncMock) as mock_get_album:
+ mock_get_album.return_value = mock_api_album
+
+ with pytest.raises(MediaNotFoundError, match=r"No streaming URL found"):
+ await provider.get_stream_details("123-456-789", MediaType.TRACK)
+
+
+async def test_get_stream_details_none_streaming_url(provider: BandcampProvider) -> None:
+ """Test stream details when API track has streaming_url=None."""
+ mock_api_track = Mock()
+ mock_api_track.id = 789
+ mock_api_track.streaming_url = None
+ mock_api_album = Mock()
+ mock_api_album.tracks = [mock_api_track]
+
+ with patch.object(provider._client, "get_album", new_callable=AsyncMock) as mock_get_album:
+ mock_get_album.return_value = mock_api_album
+
+ with pytest.raises(MediaNotFoundError, match=r"No streaming URL found"):
+ await provider.get_stream_details("123-456-789", MediaType.TRACK)
+
+
+async def test_get_stream_details_bypasses_cache(provider: BandcampProvider) -> None:
+ """Test that get_stream_details calls API directly, not cached get_track."""
+ mock_api_track = Mock()
+ mock_api_track.id = 789
+ mock_api_track.streaming_url = {"mp3-128": "http://example.com/track.mp3"}
+ mock_api_album = Mock()
+ mock_api_album.tracks = [mock_api_track]
with (
patch.object(provider._client, "get_album", new_callable=AsyncMock) as mock_get_album,
- patch.object(provider._converters, "track_from_api") as mock_converter,
+ patch.object(provider, "get_track", new_callable=AsyncMock) as mock_get_track,
):
- mock_get_album.return_value = mock_album
-
- # Create a mock track with metadata.links containing the streaming URL
- mock_ma_track = Mock()
- mock_link = Mock()
- mock_link.url = "http://example.com/track.mp3"
- mock_ma_track.metadata.links = {mock_link}
- mock_converter.return_value = mock_ma_track
+ mock_get_album.return_value = mock_api_album
result = await provider.get_stream_details("123-456-789", MediaType.TRACK)
- assert isinstance(result, StreamDetails)
- assert result.stream_type == StreamType.HTTP
+ mock_get_album.assert_called_once()
+ mock_get_track.assert_not_called()
assert result.path == "http://example.com/track.mp3"
+ assert result.audio_format.content_type == ContentType.MP3
-async def test_get_stream_details_no_streaming_url(provider: BandcampProvider) -> None:
- """Test stream details when no streaming URL is available."""
- # Mock the get_track method directly to return a track with no streaming URLs
- mock_track = Mock()
- mock_track.metadata.links = [] # Empty links list means no streaming URL
+async def test_fetch_api_track_album_path(provider: BandcampProvider) -> None:
+ """Test _fetch_api_track with 3-part ID routes through get_album."""
+ mock_api_track = Mock()
+ mock_api_track.id = 789
+ mock_api_album = Mock()
+ mock_api_album.tracks = [mock_api_track]
- with patch.object(provider, "get_track", new_callable=AsyncMock) as mock_get_track:
- mock_get_track.return_value = mock_track
+ with patch.object(provider._client, "get_album", new_callable=AsyncMock) as mock_get_album:
+ mock_get_album.return_value = mock_api_album
- with pytest.raises(
- MediaNotFoundError,
- match=r"No streaming links found for track 123-456-789. Please report this",
- ):
- await provider.get_stream_details("123-456-789", MediaType.TRACK)
+ api_track, api_album = await provider._fetch_api_track("123-456-789")
+
+ mock_get_album.assert_called_once_with(123, 456)
+ assert api_track is mock_api_track
+ assert api_album is mock_api_album
+
+
+async def test_fetch_api_track_standalone_path(provider: BandcampProvider) -> None:
+ """Test _fetch_api_track with album_id=0 routes through get_track."""
+ mock_api_track = Mock()
+
+ with patch.object(provider._client, "get_track", new_callable=AsyncMock) as mock_get_track:
+ mock_get_track.return_value = mock_api_track
+
+ api_track, api_album = await provider._fetch_api_track("123-0-789")
+
+ mock_get_track.assert_called_once_with(123, 789)
+ assert api_track is mock_api_track
+ assert api_album is None
+
+
+async def test_fetch_api_track_not_in_album(provider: BandcampProvider) -> None:
+ """Test _fetch_api_track raises when track ID not found in album tracks."""
+ mock_other_track = Mock()
+ mock_other_track.id = 999
+ mock_api_album = Mock()
+ mock_api_album.tracks = [mock_other_track]
+
+ with patch.object(provider._client, "get_album", new_callable=AsyncMock) as mock_get_album:
+ mock_get_album.return_value = mock_api_album
+
+ with pytest.raises(MediaNotFoundError, match=r"not found in album"):
+ await provider._fetch_api_track("123-456-789")
+
+
+async def test_fetch_api_track_not_found_error(provider: BandcampProvider) -> None:
+ """Test _fetch_api_track converts BandcampNotFoundError."""
+ with (
+ patch.object(
+ provider._client,
+ "get_album",
+ side_effect=BandcampNotFoundError("Not found"),
+ ),
+ pytest.raises(MediaNotFoundError, match=r"not found on Bandcamp"),
+ ):
+ await provider._fetch_api_track("123-456-789")
+
+
+async def test_fetch_api_track_rate_limit_error(provider: BandcampProvider) -> None:
+ """Test _fetch_api_track converts BandcampRateLimitError.
+
+ Since @throttle_with_retries is on _fetch_api_track, persistent rate
+ limiting exhausts retries and raises RetriesExhausted.
+ """
+ rate_error = BandcampRateLimitError("Rate limited")
+ rate_error.retry_after = 3
+
+ with (
+ patch.object(
+ provider._client,
+ "get_album",
+ side_effect=rate_error,
+ ) as mock_get_album,
+ patch("asyncio.sleep", new_callable=AsyncMock) as mock_sleep,
+ pytest.raises(RetriesExhausted),
+ ):
+ await provider._fetch_api_track("123-456-789")
+
+ assert mock_get_album.call_count == provider.throttler.retry_attempts
+ # At least retry_attempts - 1 sleeps from backoff; may be higher if the
+ # class-level Throttler also called asyncio.sleep due to accumulated entries.
+ assert mock_sleep.call_count >= provider.throttler.retry_attempts - 1
+
+
+async def test_fetch_api_track_generic_api_error(provider: BandcampProvider) -> None:
+ """Test _fetch_api_track converts generic BandcampAPIError to MediaNotFoundError."""
+ with (
+ patch.object(
+ provider._client,
+ "get_album",
+ side_effect=BandcampAPIError("Something went wrong"),
+ ),
+ pytest.raises(MediaNotFoundError, match=r"Failed to get track 123-456-789"),
+ ):
+ await provider._fetch_api_track("123-456-789")
+
+
+def test_split_id_three_parts() -> None:
+ """Test split_id with a 3-part compound ID."""
+ assert split_id("123-456-789") == (123, 456, 789)
+
+
+def test_split_id_two_parts() -> None:
+ """Test split_id with a 2-part compound ID."""
+ assert split_id("123-456") == (123, 456, 0)
+
+
+def test_split_id_one_part() -> None:
+ """Test split_id with a single ID."""
+ assert split_id("123") == (123, 0, 0)
+
+
+async def test_fetch_api_track_two_part_id(provider: BandcampProvider) -> None:
+ """Test _fetch_api_track with 2-part ID routes through get_track."""
+ # split_id("123-789") returns (123, 789, 0); since track_id=0,
+ # the method swaps to album_id=0, track_id=789 and uses get_track.
+ mock_api_track = Mock()
+
+ with patch.object(provider._client, "get_track", new_callable=AsyncMock) as mock_get_track:
+ mock_get_track.return_value = mock_api_track
+
+ api_track, api_album = await provider._fetch_api_track("123-789")
+
+ mock_get_track.assert_called_once_with(123, 789)
+ assert api_track is mock_api_track
+ assert api_album is None
+
+
+async def test_get_stream_details_standalone_track(provider: BandcampProvider) -> None:
+ """Test stream details for a standalone track (album_id=0)."""
+ mock_api_track = Mock()
+ mock_api_track.streaming_url = {"mp3-128": "http://example.com/standalone.mp3"}
+
+ with patch.object(provider._client, "get_track", new_callable=AsyncMock) as mock_get_track:
+ mock_get_track.return_value = mock_api_track
+
+ result = await provider.get_stream_details("123-0-789", MediaType.TRACK)
+
+ mock_get_track.assert_called_once_with(123, 789)
+ assert isinstance(result, StreamDetails)
+ assert result.path == "http://example.com/standalone.mp3"
+ assert result.audio_format.content_type == ContentType.MP3
+ assert result.audio_format.bit_rate == 128
async def test_get_artist_toptracks_success(provider: BandcampProvider) -> None:
async def test_get_library_artists_success(provider: BandcampProvider) -> None:
"""Test successful library artists retrieval."""
- # Test that the method exists and doesn't raise an exception
- # This is a complex async generator method, so we just test it can be called
- assert hasattr(provider, "get_library_artists")
- assert callable(provider.get_library_artists)
+ mock_collection = Mock()
+ mock_collection.items = [
+ Mock(item_type="band", item_id=100, band_id=100),
+ Mock(item_type="album", item_id=200, band_id=300),
+ ]
+
+ with (
+ patch.object(
+ provider._client, "get_collection_items", new_callable=AsyncMock
+ ) as mock_get_collection,
+ patch.object(provider, "get_artist", new_callable=AsyncMock) as mock_get_artist,
+ ):
+ mock_get_collection.return_value = mock_collection
+ mock_get_artist.return_value = Mock()
+
+ artists = [artist async for artist in provider.get_library_artists()]
+
+ assert len(artists) == 2
+ assert mock_get_artist.call_count == 2
+
+
+async def test_get_library_artists_no_identity(provider: BandcampProvider) -> None:
+ """Test that library artists returns nothing without identity."""
+ provider._client.identity = None
+ artists = [artist async for artist in provider.get_library_artists()]
+ assert len(artists) == 0
async def test_get_library_albums_success(provider: BandcampProvider) -> None:
"""Test successful library albums retrieval."""
- # Test that the method exists and doesn't raise an exception
- # This is a complex async generator method, so we just test it can be called
- assert hasattr(provider, "get_library_albums")
- assert callable(provider.get_library_albums)
+ mock_collection = Mock()
+ mock_collection.items = [
+ Mock(item_type="album", item_id=456, band_id=123),
+ ]
+
+ with (
+ patch.object(
+ provider._client, "get_collection_items", new_callable=AsyncMock
+ ) as mock_get_collection,
+ patch.object(provider, "get_album", new_callable=AsyncMock) as mock_get_album,
+ ):
+ mock_get_collection.return_value = mock_collection
+ mock_get_album.return_value = Mock()
+
+ albums = [album async for album in provider.get_library_albums()]
+
+ assert len(albums) == 1
+ mock_get_album.assert_called_once_with("123-456")
async def test_get_library_tracks_success(provider: BandcampProvider) -> None:
"""Test successful library tracks retrieval."""
- # Test that the method exists and doesn't raise an exception
- # This is a complex async generator method, so we just test it can be called
- assert hasattr(provider, "get_library_tracks")
- assert callable(provider.get_library_tracks)
+ mock_track = Mock()
+
+ with (
+ patch.object(provider, "get_library_albums") as mock_get_albums,
+ patch.object(provider, "get_album_tracks", new_callable=AsyncMock) as mock_get_tracks,
+ ):
+ # Make get_library_albums an async generator
+ async def mock_albums_gen() -> AsyncGenerator[Mock, None]:
+ yield Mock(item_id="123-456")
+
+ mock_get_albums.return_value = mock_albums_gen()
+ mock_get_tracks.return_value = [mock_track]
+
+ tracks = [track async for track in provider.get_library_tracks()]
+
+ assert len(tracks) == 1
+ mock_get_tracks.assert_called_once_with("123-456")
+
+
+def test_split_id_malformed_non_numeric() -> None:
+ """Test split_id raises InvalidDataError on non-numeric input."""
+ with pytest.raises(InvalidDataError, match=r"Malformed Bandcamp ID"):
+ split_id("abc-def")
+
+
+def test_split_id_malformed_empty() -> None:
+ """Test split_id raises InvalidDataError on empty string."""
+ with pytest.raises(InvalidDataError, match=r"Malformed Bandcamp ID"):
+ split_id("")
+
+
+async def test_fetch_api_track_login_error(provider: BandcampProvider) -> None:
+ """Test _fetch_api_track converts BandcampMustBeLoggedInError to LoginFailed."""
+ with (
+ patch.object(
+ provider._client,
+ "get_album",
+ side_effect=BandcampMustBeLoggedInError("Must be logged in"),
+ ),
+ pytest.raises(LoginFailed, match=r"login is invalid or expired"),
+ ):
+ await provider._fetch_api_track("123-456-789")