From e41978398521286a9b0e9c3e41a0a4537f7d4d99 Mon Sep 17 00:00:00 2001 From: OzGav Date: Fri, 3 Oct 2025 22:28:50 +1000 Subject: [PATCH] Add Phishin provider (#2421) --- music_assistant/providers/phishin/__init__.py | 43 + .../providers/phishin/constants.py | 46 + music_assistant/providers/phishin/helpers.py | 558 ++++++++++ music_assistant/providers/phishin/icon.svg | 46 + .../providers/phishin/icon_monochrome.svg | 46 + .../providers/phishin/manifest.json | 11 + music_assistant/providers/phishin/provider.py | 960 ++++++++++++++++++ 7 files changed, 1710 insertions(+) create mode 100644 music_assistant/providers/phishin/__init__.py create mode 100644 music_assistant/providers/phishin/constants.py create mode 100644 music_assistant/providers/phishin/helpers.py create mode 100644 music_assistant/providers/phishin/icon.svg create mode 100644 music_assistant/providers/phishin/icon_monochrome.svg create mode 100644 music_assistant/providers/phishin/manifest.json create mode 100644 music_assistant/providers/phishin/provider.py diff --git a/music_assistant/providers/phishin/__init__.py b/music_assistant/providers/phishin/__init__.py new file mode 100644 index 00000000..96e986d1 --- /dev/null +++ b/music_assistant/providers/phishin/__init__.py @@ -0,0 +1,43 @@ +"""Phish.in Music Provider for Music Assistant.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +from music_assistant_models.enums import ProviderFeature + +from .provider import PhishInProvider + +if TYPE_CHECKING: + from music_assistant_models.config_entries import ConfigEntry, ConfigValueType, ProviderConfig + from music_assistant_models.provider import ProviderManifest + + from music_assistant.mass import MusicAssistant + from music_assistant.models import ProviderInstanceType + +SUPPORTED_FEATURES = { + ProviderFeature.BROWSE, + ProviderFeature.SEARCH, + ProviderFeature.LIBRARY_ARTISTS, + ProviderFeature.LIBRARY_PLAYLISTS, + ProviderFeature.SIMILAR_TRACKS, + ProviderFeature.ARTIST_ALBUMS, + ProviderFeature.ARTIST_TOPTRACKS, +} + + +async def setup( + mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig +) -> ProviderInstanceType: + """Initialize provider(instance) with given configuration.""" + return PhishInProvider(mass, manifest, config, SUPPORTED_FEATURES) + + +async def get_config_entries( + mass: MusicAssistant, # noqa: ARG001 + instance_id: str | None = None, # noqa: ARG001 + action: str | None = None, # noqa: ARG001 + values: dict[str, ConfigValueType] | None = None, # noqa: ARG001 +) -> tuple[ConfigEntry, ...]: + """Return Config entries to setup this provider.""" + return () diff --git a/music_assistant/providers/phishin/constants.py b/music_assistant/providers/phishin/constants.py new file mode 100644 index 00000000..db7a4057 --- /dev/null +++ b/music_assistant/providers/phishin/constants.py @@ -0,0 +1,46 @@ +"""Constants for Phish.in provider.""" + +from typing import Final + +# API Configuration +API_BASE_URL: Final[str] = "https://phish.in/api/v2" +REQUEST_TIMEOUT: Final[int] = 30 +DEFAULT_LIMIT: Final[int] = 100 +MAX_SEARCH_RESULTS: Final[int] = 50 + +# Provider metadata +PROVIDER_DOMAIN: Final[str] = "phishin" +PROVIDER_NAME: Final[str] = "Phish.in" + +# Phish artist information +PHISH_ARTIST_NAME: Final[str] = "Phish" +PHISH_ARTIST_ID: Final[str] = "phish" +PHISH_MUSICBRAINZ_ID: Final[str] = "e01646f2-2a04-450d-8bf2-0d993082e058" +PHISH_DISCOGS_ID: Final[str] = "252354" +PHISH_TADB_ID: Final[str] = "112677" + +# Fallback image for albums without artwork +FALLBACK_ALBUM_IMAGE: Final[str] = ( + "https://raw.githubusercontent.com/music-assistant/music-assistant.io/refs/heads/main/docs/assets/icons/phish-logo.png" +) + +# API endpoints +ENDPOINTS = { + "shows": "/shows", + "show_by_date": "/shows/{date}", + "shows_day_of_year": "/shows/day_of_year/{date}", + "random_show": "/shows/random", + "songs": "/songs", + "song_by_slug": "/songs/{slug}", + "tracks": "/tracks", + "track_by_id": "/tracks/{id}", + "tours": "/tours", + "tour_by_slug": "/tours/{slug}", + "venues": "/venues", + "venue_by_slug": "/venues/{slug}", + "years": "/years", + "search": "/search/{term}", + "tags": "/tags", + "playlists": "/playlists", + "playlist_by_slug": "/playlists/{slug}", +} diff --git a/music_assistant/providers/phishin/helpers.py b/music_assistant/providers/phishin/helpers.py new file mode 100644 index 00000000..138e509a --- /dev/null +++ b/music_assistant/providers/phishin/helpers.py @@ -0,0 +1,558 @@ +"""Helper functions for Phish.in provider.""" + +from __future__ import annotations + +import contextlib +from collections.abc import Callable +from typing import TYPE_CHECKING, Any + +import aiohttp +from music_assistant_models.enums import AlbumType, ContentType, ExternalID, ImageType, MediaType +from music_assistant_models.errors import MediaNotFoundError, ProviderUnavailableError +from music_assistant_models.media_items import ( + Album, + Artist, + AudioFormat, + ItemMapping, + MediaItemImage, + MediaItemMetadata, + Playlist, + ProviderMapping, + Track, +) +from music_assistant_models.unique_list import UniqueList + +from .constants import ( + API_BASE_URL, + FALLBACK_ALBUM_IMAGE, + PHISH_ARTIST_ID, + PHISH_ARTIST_NAME, + PHISH_DISCOGS_ID, + PHISH_MUSICBRAINZ_ID, + PHISH_TADB_ID, + REQUEST_TIMEOUT, +) + +if TYPE_CHECKING: + from music_assistant.models.music_provider import MusicProvider + + +async def api_request( + provider: MusicProvider, + endpoint: str, + params: dict[str, Any] | None = None, +) -> Any: + """Make an API request to Phish.in.""" + url = f"{API_BASE_URL}{endpoint}" + + try: + async with provider.mass.http_session.get( + url, + params=params, + timeout=aiohttp.ClientTimeout(total=REQUEST_TIMEOUT), + ) as response: + if response.status == 404: + raise MediaNotFoundError(f"Resource not found: {url}") + response.raise_for_status() + return await response.json() + except MediaNotFoundError: + raise + except aiohttp.ClientError as err: + provider.logger.error("API request failed for %s: %s", url, err) + raise ProviderUnavailableError(f"Phish.in API unavailable: {err}") from err + + +def show_to_album(provider: MusicProvider, show_data: dict[str, Any]) -> Album: + """Convert a Phish.in show to a Music Assistant Album.""" + show_date = show_data.get("date", "") + venue_data = show_data.get("venue", {}) + venue_name = venue_data.get("name", "Unknown Venue") + location = venue_data.get("location", "") + + album_name = f"{show_date} - {venue_name}" + if location: + album_name += f", {location}" + + # Create metadata with image + album_cover_url = show_data.get("album_cover_url") or FALLBACK_ALBUM_IMAGE + metadata = MediaItemMetadata( + images=UniqueList( + [ + MediaItemImage( + type=ImageType.THUMB, + path=album_cover_url, + provider=provider.instance_id, + remotely_accessible=True, + ) + ] + ) + ) + + # Parse year from date string (YYYY-MM-DD format) + year = None + if show_date and "-" in show_date: + with contextlib.suppress(ValueError, IndexError): + year = int(show_date.split("-")[0]) + + # Create details string for provider mapping + details_parts = [f"venue:{venue_name}"] + if location: + details_parts.append(f"location:{location}") + if show_data.get("duration"): + details_parts.append(f"duration:{show_data.get('duration')}") + + audio_status = show_data.get("audio_status", "missing") + details_parts.append(f"audio_status:{audio_status}") + + if show_data.get("tour_name"): + details_parts.append(f"tour:{show_data.get('tour_name')}") + + # Create ItemMapping for Phish artist + phish_artist = ItemMapping( + item_id=PHISH_ARTIST_ID, + provider=provider.instance_id, + name=PHISH_ARTIST_NAME, + media_type=MediaType.ARTIST, + available=True, + ) + + return Album( + item_id=show_date, + provider=provider.instance_id, + name=album_name, + artists=UniqueList([phish_artist]), + year=year, + album_type=AlbumType.LIVE, + metadata=metadata, + provider_mappings={ + ProviderMapping( + item_id=show_date, + provider_domain=provider.domain, + provider_instance=provider.instance_id, + available=audio_status in ["complete", "partial"], + audio_format=AudioFormat(content_type=ContentType.MP3), + details="|".join(details_parts), + ) + }, + ) + + +async def get_phish_artist(provider: MusicProvider) -> Artist: + """Get the main Phish artist object.""" + artist = Artist( + item_id=PHISH_ARTIST_ID, + provider=provider.instance_id, + name=PHISH_ARTIST_NAME, + provider_mappings={ + ProviderMapping( + item_id=PHISH_ARTIST_ID, + provider_domain=provider.domain, + provider_instance=provider.instance_id, + available=True, + ) + }, + ) + + # Add external IDs for metadata enrichment + artist.add_external_id(ExternalID.MB_ARTIST, PHISH_MUSICBRAINZ_ID) + artist.add_external_id(ExternalID.DISCOGS, PHISH_DISCOGS_ID) + artist.add_external_id(ExternalID.TADB, PHISH_TADB_ID) + + return artist + + +def _extract_version_from_title(full_title: str) -> tuple[str, str]: + """Extract song title and version from full title with performance indicators. + + Returns: + Tuple of (clean_song_title, version_string) + """ + song_title = full_title + version = None + performance_indicators = ["set", "soundcheck", "check", "encore"] + + # Check for prefix: "(Check) Song Name" + if full_title.startswith("(") and ") " in full_title: + end_paren = full_title.index(") ") + prefix = full_title[1:end_paren] + if any(indicator in prefix.lower() for indicator in performance_indicators): + version = prefix + song_title = full_title[end_paren + 2 :] + + # Check for suffix: "Song Name (Soundcheck)" + if " (" in song_title and song_title.endswith(")"): + base_title, suffix = song_title.rsplit(" (", 1) + suffix = suffix.rstrip(")") + if any(indicator in suffix.lower() for indicator in performance_indicators): + version = f"{version}, {suffix}" if version else suffix + song_title = base_title + + return song_title, version or "" + + +def _create_album_mapping( + provider: MusicProvider, + show_date: str, + show_data: dict[str, Any] | None, +) -> ItemMapping | None: + """Create album ItemMapping with image for a track.""" + if not show_date: + return None + + venue_name = show_data.get("venue", {}).get("name", "") if show_data else "" + + # Create the image for the album mapping + album_image = None + if show_data: + image_url = show_data.get("album_cover_url") or FALLBACK_ALBUM_IMAGE + album_image = MediaItemImage( + type=ImageType.THUMB, + path=image_url, + provider=provider.instance_id, + remotely_accessible=True, + ) + + return ItemMapping( + item_id=show_date, + provider=provider.instance_id, + name=f"{show_date} - {venue_name}" if venue_name else show_date, + media_type=MediaType.ALBUM, + available=True, + image=album_image, + ) + + +def _build_track_details( + track_data: dict[str, Any], + song_data: dict[str, Any], + show_date: str, + set_name: str, + venue_name: str, +) -> str: + """Build details string for provider mapping.""" + details_parts = [f"song_slug:{song_data.get('slug', '')}"] + + if set_name: + details_parts.append(f"set_name:{set_name}") + if show_date: + details_parts.append(f"show_date:{show_date}") + if venue_name: + details_parts.append(f"venue:{venue_name}") + if track_data.get("tags"): + tag_names = [tag.get("name", "") for tag in track_data.get("tags", [])] + details_parts.append(f"tags:{','.join(tag_names)}") + if track_data.get("likes_count"): + details_parts.append(f"likes_count:{track_data.get('likes_count', 0)}") + + return "|".join(details_parts) + + +def track_to_ma_track( + provider: MusicProvider, + track_data: dict[str, Any], + show_data: dict[str, Any] | None = None, +) -> Track: + """Convert a Phish.in track to a Music Assistant Track.""" + track_id = str(track_data.get("id", "")) + + # Extract song info and version + songs = track_data.get("songs", []) + song_data = songs[0] if songs else {} + full_title = track_data.get("title", "Unknown Song") + song_title, version = _extract_version_from_title(full_title) + + # Extract basic track info + duration_ms = track_data.get("duration") + duration = int(duration_ms / 1000) if duration_ms else 0 + position = track_data.get("position") + track_number = int(position) if position is not None else 0 + set_name = track_data.get("set_name", "") + + # Get show information + if show_data is None: + show_data = track_data.get("show", {}) + show_date = show_data.get("date", "") + venue_name = show_data.get("venue", {}).get("name", "") + + # Create artist mapping + phish_artist = ItemMapping( + item_id=PHISH_ARTIST_ID, + provider=provider.instance_id, + name=PHISH_ARTIST_NAME, + media_type=MediaType.ARTIST, + available=True, + ) + + # Create album mapping with image + album_mapping = _create_album_mapping(provider, show_date, show_data) + + # Build details string + details = _build_track_details(track_data, song_data, show_date, set_name, venue_name) + + # Create metadata with image + metadata = MediaItemMetadata() + if show_data: + image_url = show_data.get("album_cover_url") + if image_url: + metadata = MediaItemMetadata( + images=UniqueList( + [ + MediaItemImage( + type=ImageType.THUMB, + path=image_url, + provider=provider.instance_id, + remotely_accessible=True, + ) + ] + ) + ) + + return Track( + item_id=track_id, + provider=provider.instance_id, + name=song_title, + version=version, + artists=UniqueList([phish_artist]), + album=album_mapping, + duration=duration, + track_number=track_number, + metadata=metadata, + provider_mappings={ + ProviderMapping( + item_id=track_id, + provider_domain=provider.domain, + provider_instance=provider.instance_id, + available=bool(track_data.get("mp3_url")), + audio_format=AudioFormat(content_type=ContentType.MP3), + url=track_data.get("mp3_url"), + details=details, + ) + }, + ) + + +def playlist_to_ma_playlist(provider: MusicProvider, playlist_data: dict[str, Any]) -> Playlist: + """Convert phish.in playlist data to Music Assistant Playlist.""" + playlist_id = str(playlist_data["id"]) + + metadata = MediaItemMetadata( + description=playlist_data.get("description"), + images=UniqueList( + [ + MediaItemImage( + type=ImageType.THUMB, + path=FALLBACK_ALBUM_IMAGE, + provider=provider.instance_id, + remotely_accessible=True, + ) + ] + ), + ) + + return Playlist( + item_id=playlist_id, + provider=provider.instance_id, + name=playlist_data.get("name", ""), + owner=playlist_data.get("username", ""), + is_editable=False, + metadata=metadata, + provider_mappings={ + ProviderMapping( + item_id=playlist_id, + provider_domain=provider.domain, + provider_instance=provider.instance_id, + available=True, + ) + }, + ) + + +def get_main_artist_mapping(provider: MusicProvider) -> ProviderMapping: + """Get artist mapping for Phish.""" + return ProviderMapping( + item_id=PHISH_ARTIST_ID, + provider_domain=provider.domain, + provider_instance=provider.instance_id, + available=True, + ) + + +def get_album_mapping(provider: MusicProvider, show_date: str) -> ProviderMapping: + """Get album mapping for a show date.""" + return ProviderMapping( + item_id=show_date, + provider_domain=provider.domain, + provider_instance=provider.instance_id, + available=True, + ) + + +def parse_search_results( + provider: MusicProvider, + search_data: dict[str, Any], + media_types: list[MediaType], + search_query: str, +) -> tuple[list[Artist], list[Album], list[Track], list[Playlist]]: + """Parse search results into MA media items.""" + search_term = search_query.lower() + + def contains_search_term(text: str | None) -> bool: + return search_term in text.lower() if text else False + + def strip_performance_indicators(title: str) -> str: + """Strip performance indicators like (Set1), (Soundcheck), etc. from title.""" + song_title = title + performance_indicators = ["set", "soundcheck", "check", "encore"] + + # Check for prefix: "(Check) Song" + if song_title.startswith("(") and ") " in song_title: + end_paren = song_title.index(") ") + prefix = song_title[1:end_paren] + if any(indicator in prefix.lower() for indicator in performance_indicators): + song_title = song_title[end_paren + 2 :] + + # Check for suffix: "Song (Set1)" + if " (" in song_title and song_title.endswith(")"): + base_title, suffix = song_title.rsplit(" (", 1) + suffix = suffix.rstrip(")") + if any(indicator in suffix.lower() for indicator in performance_indicators): + song_title = base_title + + return song_title + + artists: list[Artist] = _parse_artists(provider, media_types) + albums: list[Album] = _parse_albums(provider, search_data, media_types, contains_search_term) + tracks: list[Track] = _parse_tracks( + provider, search_data, media_types, contains_search_term, strip_performance_indicators + ) + playlists: list[Playlist] = _parse_playlists( + provider, search_data, media_types, contains_search_term + ) + + return artists, albums, tracks, playlists + + +def _parse_artists(provider: MusicProvider, media_types: list[MediaType]) -> list[Artist]: + """Parse artists from search results.""" + artists: list[Artist] = [] + if MediaType.ARTIST in media_types: + metadata = MediaItemMetadata( + images=UniqueList( + [ + MediaItemImage( + type=ImageType.THUMB, + path=FALLBACK_ALBUM_IMAGE, + provider=provider.instance_id, + remotely_accessible=True, + ) + ] + ) + ) + + phish_artist_full = Artist( + item_id=PHISH_ARTIST_ID, + provider=provider.instance_id, + name=PHISH_ARTIST_NAME, + metadata=metadata, + provider_mappings={ + ProviderMapping( + item_id=PHISH_ARTIST_ID, + provider_domain=provider.domain, + provider_instance=provider.instance_id, + available=True, + ) + }, + ) + artists.append(phish_artist_full) + + return artists + + +def _parse_albums( + provider: MusicProvider, + search_data: dict[str, Any], + media_types: list[MediaType], + contains_search_term: Callable[[str | None], bool], +) -> list[Album]: + """Parse albums from search results.""" + albums: list[Album] = [] + if MediaType.ALBUM not in media_types: + return albums + + # Add exact show if present + if search_data.get("exact_show"): + show = search_data["exact_show"] + venue_name = show.get("venue_name", "") + if contains_search_term(venue_name): + albums.append(show_to_album(provider, show)) + + # Add other shows + for show in search_data.get("other_shows", []): + venue_name = show.get("venue_name", "") + if contains_search_term(venue_name): + albums.append(show_to_album(provider, show)) + + # Add venue shows (from additional API calls) + for show in search_data.get("venue_shows", []): + venue_name = show.get("venue_name", "") + if contains_search_term(venue_name): + albums.append(show_to_album(provider, show)) + + return albums + + +def _parse_tracks( + provider: MusicProvider, + search_data: dict[str, Any], + media_types: list[MediaType], + contains_search_term: Callable[[str | None], bool], + strip_performance_indicators: Callable[[str], str], +) -> list[Track]: + """Parse tracks from search results.""" + tracks: list[Track] = [] + if MediaType.TRACK not in media_types: + return tracks + + for track_data in search_data.get("tracks", []): + full_title = track_data.get("title", "") + # Strip performance indicators to get base song name for matching + clean_title = strip_performance_indicators(full_title) + + if contains_search_term(clean_title): + # Extract show data from track data for image + show_data = { + "date": track_data.get("show_date"), + "album_cover_url": track_data.get("show_album_cover_url"), + "venue": {"name": track_data.get("venue_name")}, + } + tracks.append(track_to_ma_track(provider, track_data, show_data)) + + # Deduplicate by album - only return one track per show + seen_albums = set() + unique_tracks = [] + for track in tracks: + album_id = track.album.item_id if track.album else None + if album_id and album_id not in seen_albums: + seen_albums.add(album_id) + unique_tracks.append(track) + elif not album_id: + unique_tracks.append(track) + + return unique_tracks + + +def _parse_playlists( + provider: MusicProvider, + search_data: dict[str, Any], + media_types: list[MediaType], + contains_search_term: Callable[[str | None], bool], +) -> list[Playlist]: + """Parse playlists from search results.""" + playlists: list[Playlist] = [] + if MediaType.PLAYLIST in media_types: + for playlist_data in search_data.get("playlists", []): + playlist_name = playlist_data.get("name", "") + if contains_search_term(playlist_name): + playlists.append(playlist_to_ma_playlist(provider, playlist_data)) + + return playlists diff --git a/music_assistant/providers/phishin/icon.svg b/music_assistant/providers/phishin/icon.svg new file mode 100644 index 00000000..786e5c8f --- /dev/null +++ b/music_assistant/providers/phishin/icon.svg @@ -0,0 +1,46 @@ + + + + diff --git a/music_assistant/providers/phishin/icon_monochrome.svg b/music_assistant/providers/phishin/icon_monochrome.svg new file mode 100644 index 00000000..c8290f36 --- /dev/null +++ b/music_assistant/providers/phishin/icon_monochrome.svg @@ -0,0 +1,46 @@ + + + + diff --git a/music_assistant/providers/phishin/manifest.json b/music_assistant/providers/phishin/manifest.json new file mode 100644 index 00000000..7ded36c9 --- /dev/null +++ b/music_assistant/providers/phishin/manifest.json @@ -0,0 +1,11 @@ +{ + "domain": "phishin", + "name": "Phish.in", + "description": "Stream live audience recordings and complete setlists from Phish’s archives.", + "documentation": "https://music-assistant.io/music-providers/phishin/", + "type": "music", + "requirements": [], + "codeowners": "@ozgav", + "multi_instance": false, + "stage": "beta" +} diff --git a/music_assistant/providers/phishin/provider.py b/music_assistant/providers/phishin/provider.py new file mode 100644 index 00000000..73c3423b --- /dev/null +++ b/music_assistant/providers/phishin/provider.py @@ -0,0 +1,960 @@ +"""Phish.in Music Provider for Music Assistant.""" + +from __future__ import annotations + +from collections.abc import AsyncGenerator +from datetime import datetime +from typing import TYPE_CHECKING, Any + +from music_assistant_models.enums import ( + ContentType, + ImageType, + MediaType, + StreamType, +) +from music_assistant_models.errors import MediaNotFoundError, ProviderUnavailableError +from music_assistant_models.media_items import ( + Album, + Artist, + AudioFormat, + BrowseFolder, + ItemMapping, + MediaItemImage, + MediaItemMetadata, + Playlist, + ProviderMapping, + SearchResults, + Track, +) +from music_assistant_models.streamdetails import StreamDetails +from music_assistant_models.unique_list import UniqueList + +from music_assistant.controllers.cache import use_cache +from music_assistant.models.music_provider import MusicProvider + +from .constants import ( + ENDPOINTS, + FALLBACK_ALBUM_IMAGE, + MAX_SEARCH_RESULTS, + PHISH_ARTIST_ID, +) +from .helpers import ( + api_request, + get_phish_artist, + parse_search_results, + show_to_album, + track_to_ma_track, +) + +if TYPE_CHECKING: + from collections.abc import Sequence + + from music_assistant_models.media_items import MediaItemType + + +class PhishInProvider(MusicProvider): + """Phish.in music provider.""" + + @property + def is_streaming_provider(self) -> bool: + """Return True if the provider is a streaming provider.""" + return True + + async def search( + self, + search_query: str, + media_types: list[MediaType], + limit: int = MAX_SEARCH_RESULTS, + ) -> SearchResults: + """Perform search on Phish.in.""" + # Handle "Artist - Track" format by extracting just the track name + if " - " in search_query: + parts = search_query.split(" - ", 1) + if parts[0].strip().lower() in ["phish", "the phish"]: + search_query = parts[1].strip() + + if len(search_query.strip()) < 3: + return SearchResults() + + try: + endpoint = ENDPOINTS["search"].format(term=search_query) + search_data = await api_request( + self, endpoint, params={"audio_status": "complete_or_partial"} + ) + + # If we got song matches, fetch all performances of those songs + if MediaType.TRACK in media_types and search_data.get("songs"): + all_track_results = [] + for song in search_data.get("songs", [])[:3]: # Limit to first 3 songs + song_slug = song.get("slug") + if song_slug: + tracks_data = await api_request( + self, + "/tracks", + params={ + "song_slug": song_slug, + "audio_status": "complete_or_partial", + "per_page": limit, + "sort": "likes_count:desc", + }, + ) + all_track_results.extend(tracks_data.get("tracks", [])) + + # Replace with comprehensive song_slug results + if all_track_results: + search_data["tracks"] = all_track_results[:limit] + + # Handle venue album searches + if MediaType.ALBUM in media_types and search_data.get("venues"): + venue_shows: list[dict[str, Any]] = [] + for venue in search_data.get("venues", []): + venue_slug = venue["slug"] + page = 1 + while len(venue_shows) < limit: + shows_data = await api_request( + self, "/shows", params={"venue_slug": venue_slug, "page": page} + ) + shows_on_page = shows_data.get("shows", []) + if not shows_on_page: + break + remaining_slots = limit - len(venue_shows) + venue_shows.extend(shows_on_page[:remaining_slots]) + current_page = shows_data.get("current_page", 1) + total_pages = shows_data.get("total_pages", 1) + if current_page >= total_pages or len(venue_shows) >= limit: + break + page += 1 + if venue_shows: + search_data["venue_shows"] = venue_shows + + artists, albums, tracks, playlists = parse_search_results( + self, search_data, media_types, search_query.lower() + ) + + return SearchResults( + artists=artists[:limit] if MediaType.ARTIST in media_types else [], + albums=albums[:limit] if MediaType.ALBUM in media_types else [], + tracks=tracks[:limit] if MediaType.TRACK in media_types else [], + playlists=playlists[:limit] if MediaType.PLAYLIST in media_types else [], + ) + except MediaNotFoundError: + raise + except Exception as err: + self.logger.error("Search failed for query '%s': %s", search_query, err) + raise ProviderUnavailableError(f"Search error: {err}") from err + + async def get_library_artists(self) -> AsyncGenerator[Artist, None]: + """Retrieve library artists from the provider.""" + yield await get_phish_artist(self) + + async def get_artist(self, prov_artist_id: str) -> Artist: + """Get full artist details by id.""" + if prov_artist_id == PHISH_ARTIST_ID: + return await get_phish_artist(self) + raise MediaNotFoundError(f"Artist {prov_artist_id} not found") + + @use_cache(expiration=86400) # 24 hours - albums (ie. shows) could update daily + async def get_artist_albums(self, prov_artist_id: str) -> list[Album]: + """Get a list of all albums for the given artist.""" + if prov_artist_id != PHISH_ARTIST_ID: + raise MediaNotFoundError(f"Artist {prov_artist_id} not found") + + albums = [] + page = 1 + per_page = 750 # Phish.in limit is 1000 but this caused asyncio warnings + + try: + while True: + shows_data = await api_request( + self, + ENDPOINTS["shows"], + params={ + "page": page, + "per_page": per_page, + "audio_status": "complete_or_partial", + }, + ) + + shows = shows_data.get("shows", []) + if not shows: + break + + for show in shows: + if show.get("audio_status") in ["complete", "partial"]: + albums.append(show_to_album(self, show)) + + if len(shows) < per_page: + break + + page += 1 + + return albums + + except (MediaNotFoundError, ProviderUnavailableError): + raise + except Exception as err: + self.logger.error("Failed to get artist albums: %s", err) + raise ProviderUnavailableError(f"Artist albums error: {err}") from err + + @use_cache(expiration=2592000) # 30 days - Top tracks won't change that often as its voted on + async def get_artist_toptracks(self, prov_artist_id: str) -> list[Track]: + """Get a list of most popular tracks for the given artist.""" + if prov_artist_id != PHISH_ARTIST_ID: + raise MediaNotFoundError(f"Artist {prov_artist_id} not found") + + try: + all_tracks: list[Track] = [] + page = 1 + max_pages = 5 # 2500 tracks max for UI performance + + while len(all_tracks) < (max_pages * 500) and page <= max_pages: + tracks_data = await api_request( + self, + ENDPOINTS["tracks"], + params={ + "page": page, + "per_page": 500, + "sort": "likes_count:desc", + "audio_status": "complete_or_partial", + }, + ) + + tracks_on_page = tracks_data.get("tracks", []) + if not tracks_on_page: + break + + for track_data in tracks_on_page: + show_data = { + "date": track_data.get("show_date"), + "album_cover_url": track_data.get("show_album_cover_url"), + "venue": {"name": track_data.get("venue_name")}, + } + track = track_to_ma_track(self, track_data, show_data) + all_tracks.append(track) + + if len(tracks_on_page) < 50: + break + + page += 1 + + return all_tracks + + except (MediaNotFoundError, ProviderUnavailableError): + raise + except Exception as err: + self.logger.error("Failed to get artist top tracks: %s", err) + raise ProviderUnavailableError(f"Top tracks error: {err}") from err + + @use_cache(expiration=2592000) # 30 days - Show details from specific dates never change + async def get_album(self, prov_album_id: str) -> Album: + """Get full album details by id (show date).""" + try: + endpoint = ENDPOINTS["show_by_date"].format(date=prov_album_id) + show_data = await api_request(self, endpoint) + + if not show_data: + raise MediaNotFoundError(f"Show {prov_album_id} not found") + + return show_to_album(self, show_data) + + except MediaNotFoundError: + raise + except Exception as err: + self.logger.error("Failed to get album %s: %s", prov_album_id, err) + raise ProviderUnavailableError(f"Album error: {err}") from err + + @use_cache(expiration=2592000) # 30 days - Individual tracks never change once recorded + async def get_track(self, prov_track_id: str) -> Track: + """Get full track details by id.""" + try: + endpoint = ENDPOINTS["track_by_id"].format(id=prov_track_id) + track_data = await api_request(self, endpoint) + + if not track_data: + raise MediaNotFoundError(f"Track {prov_track_id} not found") + + # Extract show data from the track response + show_data = track_data.get("show") + + return track_to_ma_track(self, track_data, show_data) + + except MediaNotFoundError: + raise + except Exception as err: + self.logger.error("Failed to get track %s: %s", prov_track_id, err) + raise ProviderUnavailableError(f"Track error: {err}") from err + + @use_cache(expiration=2592000) # 30 days - Track listings for historical shows never change + async def get_album_tracks(self, prov_album_id: str) -> list[Track]: + """Get album tracks for given album id (show date).""" + try: + endpoint = ENDPOINTS["show_by_date"].format(date=prov_album_id) + show_data = await api_request(self, endpoint) + + if not show_data: + raise MediaNotFoundError(f"Show {prov_album_id} not found") + + tracks = [] + for track_data in show_data.get("tracks", []): + track = track_to_ma_track(self, track_data, show_data) + tracks.append(track) + + return tracks + + except MediaNotFoundError: + raise + except Exception as err: + self.logger.error("Failed to get album tracks for %s: %s", prov_album_id, err) + raise ProviderUnavailableError(f"Album tracks error: {err}") from err + + async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails: + """Get streamdetails for a track.""" + if media_type != MediaType.TRACK: + raise MediaNotFoundError(f"Streaming not supported for {media_type}") + + try: + track = await self.get_track(item_id) + + mp3_url = None + for mapping in track.provider_mappings: + if mapping.provider_instance == self.instance_id and mapping.url: + mp3_url = mapping.url + break + + if not mp3_url: + raise MediaNotFoundError(f"No audio URL found for track {item_id}") + + return StreamDetails( + provider=self.instance_id, + item_id=item_id, + audio_format=AudioFormat( + content_type=ContentType.MP3, + sample_rate=44100, + bit_depth=16, + channels=2, + ), + media_type=MediaType.TRACK, + stream_type=StreamType.HTTP, + path=mp3_url, + allow_seek=True, + can_seek=True, + ) + + except (MediaNotFoundError, ProviderUnavailableError): + raise + except Exception as err: + self.logger.error("Failed to get stream details for %s: %s", item_id, err) + raise ProviderUnavailableError(f"Stream error: {err}") from err + + @use_cache(expiration=86400) # 24 hours - Current year gets new shows added throughout the year + async def _get_years_data(self) -> Any: + """Get years data with caching.""" + return await api_request(self, ENDPOINTS["years"]) + + @use_cache(expiration=86400) # 24 hours - recent shows could update daily + async def _get_recent_shows(self) -> Any: + """Get recent shows with caching.""" + return await api_request( + self, + ENDPOINTS["shows"], + params={"per_page": 20, "sort": "date:desc", "audio_status": "complete_or_partial"}, + ) + + async def get_library_playlists(self) -> AsyncGenerator[Playlist, None]: + """Retrieve library playlists from the provider.""" + try: + playlists_data = await api_request( + self, ENDPOINTS["playlists"], params={"per_page": 100, "sort": "likes_count:desc"} + ) + + for playlist_data in playlists_data.get("playlists", []): + track_count = playlist_data.get("tracks_count", 0) + if track_count > 0: + playlist_id = str(playlist_data.get("id")) + + metadata = MediaItemMetadata( + images=UniqueList( + [ + MediaItemImage( + type=ImageType.THUMB, + path=FALLBACK_ALBUM_IMAGE, + provider=self.instance_id, + remotely_accessible=True, + ) + ] + ) + ) + yield Playlist( + item_id=playlist_id, + provider=self.instance_id, + name=playlist_data.get("name", ""), + owner=playlist_data.get("username", ""), + is_editable=False, + metadata=metadata, + provider_mappings={ + ProviderMapping( + item_id=playlist_id, + provider_domain=self.domain, + provider_instance=self.instance_id, + available=True, + ) + }, + ) + except (MediaNotFoundError, ProviderUnavailableError): + raise + except Exception as err: + self.logger.error("Failed to get library playlists: %s", err) + raise ProviderUnavailableError(f"Library playlists error: {err}") from err + + @use_cache(expiration=86400) # 24 hours - Playlist metadata might be updated by users + async def get_playlist(self, prov_playlist_id: str) -> Playlist: + """Get full playlist details by id.""" + try: + playlists_data = await api_request(self, ENDPOINTS["playlists"]) + playlist_slug = None + playlist_info = None + + for playlist in playlists_data.get("playlists", []): + if str(playlist.get("id")) == prov_playlist_id: + playlist_slug = playlist.get("slug") + playlist_info = playlist + break + + if not playlist_slug or not playlist_info: + raise MediaNotFoundError(f"Playlist {prov_playlist_id} not found") + + return Playlist( + item_id=prov_playlist_id, + provider=self.instance_id, + name=playlist_info.get("name", ""), + owner=playlist_info.get("username", ""), + is_editable=False, + provider_mappings={ + ProviderMapping( + item_id=prov_playlist_id, + provider_domain=self.domain, + provider_instance=self.instance_id, + available=True, + ) + }, + ) + + except MediaNotFoundError: + raise + except Exception as err: + self.logger.error("Failed to get playlist %s: %s", prov_playlist_id, err) + raise ProviderUnavailableError(f"Playlist error: {err}") from err + + async def get_playlist_tracks(self, prov_playlist_id: str, page: int = 0) -> list[Track]: + """Get playlist tracks for given playlist id.""" + if page > 0: + return [] + try: + playlists_data = await api_request(self, ENDPOINTS["playlists"]) + playlist_slug = None + + for playlist in playlists_data.get("playlists", []): + if str(playlist.get("id")) == prov_playlist_id: + playlist_slug = playlist.get("slug") + break + + if not playlist_slug: + return [] + + playlist_data = await api_request( + self, ENDPOINTS["playlist_by_slug"].format(slug=playlist_slug) + ) + + all_tracks = [] + for entry in playlist_data.get("entries", []): + track_data = entry.get("track") + if track_data and track_data.get("mp3_url"): + track = track_to_ma_track(self, track_data) + all_tracks.append(track) + + return all_tracks + + except (MediaNotFoundError, ProviderUnavailableError): + raise + except Exception as err: + self.logger.error("Failed to get playlist tracks for %s: %s", prov_playlist_id, err) + raise ProviderUnavailableError(f"Playlist tracks error: {err}") from err + + async def browse(self, path: str) -> Sequence[MediaItemType | ItemMapping | BrowseFolder]: + """Browse this provider's items.""" + path_parts = [] if "://" not in path else path.split("://")[1].split("/") + subpath = path_parts[0] if path_parts else "" + subsubpath = "/".join(path_parts[1:]) if len(path_parts) > 1 else "" + + if not subpath: + return self._browse_root(path) + + if subpath == "playlists": + playlists = [] + async for playlist in self.get_library_playlists(): + playlists.append(playlist) + if len(playlists) >= 50: + break + return playlists + elif subpath == "years": + return await self._browse_years(path, subsubpath) + elif subpath == "recent": + return await self._browse_recent() + elif subpath == "random": + return await self._browse_random() + elif subpath == "today": + return await self._browse_today() + elif subpath == "venues": + return await self._browse_venues(path, subsubpath) + elif subpath == "tags": + return await self._browse_tags(path, subsubpath) + elif subpath == "top_shows": + return await self._browse_top_shows() + elif subpath == "top_tracks": + return await self._browse_top_tracks() + + return [] + + def _browse_root(self, path: str) -> list[BrowseFolder]: + """Root level browse options.""" + return [ + BrowseFolder( + item_id="years", + provider=self.domain, + path=path + "years", + name="Browse by Year", + ), + BrowseFolder( + item_id="today", + provider=self.domain, + path=path + "today", + name="This Day in Phish History", + ), + BrowseFolder( + item_id="recent", + provider=self.domain, + path=path + "recent", + name="Recent Shows", + ), + BrowseFolder( + item_id="venues", + provider=self.domain, + path=path + "venues", + name="Browse by Venue", + ), + BrowseFolder( + item_id="tags", + provider=self.domain, + path=path + "tags", + name="Browse by Tag", + ), + BrowseFolder( + item_id="playlists", + provider=self.domain, + path=path + "playlists", + name="User Playlists", + ), + BrowseFolder( + item_id="top_shows", + provider=self.domain, + path=path + "top_shows", + name="Top 46 Shows", + ), + BrowseFolder( + item_id="top_tracks", + provider=self.domain, + path=path + "top_tracks", + name="Top 46 Tracks", + ), + BrowseFolder( + item_id="random", + provider=self.domain, + path=path + "random", + name="Random Show", + ), + ] + + async def _browse_years(self, path: str, subsubpath: str) -> list[BrowseFolder | Album]: + """Browse shows by year/period.""" + if not subsubpath: + try: + years_data = await self._get_years_data() + folders: list[BrowseFolder | Album] = [] + + for year_data in years_data: + period = year_data.get("period") + show_count = year_data.get("shows_count", 0) + if period and show_count > 0: + folders.append( + BrowseFolder( + item_id=f"period_{period}", + provider=self.domain, + path=f"phishin://years/{period}", + name=f"{period} ({show_count} shows)", + ) + ) + + return sorted(folders, key=lambda x: x.name, reverse=True) + + except (MediaNotFoundError, ProviderUnavailableError): + raise + except Exception as err: + self.logger.error("Failed to browse years: %s", err) + raise ProviderUnavailableError(f"Browse years error: {err}") from err + else: + return await self._get_shows_for_period(subsubpath) + + async def _browse_recent(self) -> list[Album]: + """Get recent shows.""" + try: + shows_data = await self._get_recent_shows() + albums: list[Album] = [] + + for show in shows_data.get("shows", []): + if show.get("audio_status") in ["complete", "partial"]: + album = show_to_album(self, show) + albums.append(album) + + return albums + + except (MediaNotFoundError, ProviderUnavailableError): + raise + except Exception as err: + self.logger.error("Failed to browse recent shows: %s", err) + raise ProviderUnavailableError(f"Browse recent error: {err}") from err + + async def _browse_random(self) -> list[Album]: + """Get a random show.""" + try: + show_data = await api_request(self, ENDPOINTS["random_show"]) + if show_data and show_data.get("audio_status") in ["complete", "partial"]: + album = show_to_album(self, show_data) + return [album] + return [] + + except (MediaNotFoundError, ProviderUnavailableError): + raise + except Exception as err: + self.logger.error("Failed to get random show: %s", err) + raise ProviderUnavailableError(f"Random show error: {err}") from err + + @use_cache(expiration=21600) # 6 hours - today's shows are historical but queried daily + async def _browse_today(self) -> list[Album]: + """Get shows that happened on this day in history.""" + try: + today = datetime.now() + target_date = today.strftime("%Y-%m-%d") + + shows_data = await api_request( + self, + ENDPOINTS["shows_day_of_year"].format(date=target_date), + params={"audio_status": "complete_or_partial", "sort": "date:desc"}, + ) + + albums: list[Album] = [] + shows = shows_data.get("shows", []) + + for show in shows: + if show and show.get("audio_status") in ["complete", "partial"]: + album = show_to_album(self, show) + albums.append(album) + + return albums + + except MediaNotFoundError: + self.logger.info("No shows found for %s", today.strftime("%B %d")) + return [] + except ProviderUnavailableError: + raise + except Exception as err: + self.logger.error("Failed to get today's shows: %s", err) + raise ProviderUnavailableError(f"Today's shows error: {err}") from err + + @use_cache(expiration=604800) # 7 days - venue list changes rarely + async def _browse_venues(self, path: str, subsubpath: str) -> list[BrowseFolder | Album]: + """Browse shows by venue.""" + if not subsubpath: + try: + venues_data = await api_request( + self, ENDPOINTS["venues"], params={"per_page": 100, "sort": "shows_count:desc"} + ) + + folders: list[BrowseFolder | Album] = [] + for venue in venues_data.get("venues", []): + audio_count = venue.get("shows_with_audio_count", 0) + if audio_count > 0: + folders.append( + BrowseFolder( + item_id=f"venue_{venue.get('slug')}", + provider=self.domain, + path=f"phishin://venues/{venue.get('slug')}", + name=f"{venue.get('name')} ({audio_count} shows)", + ) + ) + + return folders[:50] + + except (MediaNotFoundError, ProviderUnavailableError): + raise + except Exception as err: + self.logger.error("Failed to browse venues: %s", err) + raise ProviderUnavailableError(f"Browse venues error: {err}") from err + else: + return await self._get_shows_for_venue(subsubpath) + + @use_cache(expiration=604800) # 7 days - tags list changes rarely + async def _browse_tags(self, path: str, subsubpath: str) -> list[BrowseFolder | Album | Track]: + """Browse shows and tracks by tag.""" + if not subsubpath: + try: + tags_data = await api_request(self, ENDPOINTS["tags"]) + + folders: list[BrowseFolder | Album | Track] = [] + for tag in tags_data: + track_count = tag.get("tracks_count", 0) + show_count = tag.get("shows_count", 0) + if track_count > 0 or show_count > 0: + count_str = ( + f"{show_count} shows, {track_count} tracks" + if show_count > 0 + else f"{track_count} tracks" + ) + folders.append( + BrowseFolder( + item_id=f"tag_{tag.get('slug')}", + provider=self.domain, + path=f"phishin://tags/{tag.get('slug')}", + name=f"{tag.get('name')} ({count_str})", + ) + ) + + return sorted(folders, key=lambda x: x.name) + + except (MediaNotFoundError, ProviderUnavailableError): + raise + except Exception as err: + self.logger.error("Failed to browse tags: %s", err) + raise ProviderUnavailableError(f"Browse tags error: {err}") from err + + elif "/" not in subsubpath: + tag_slug = subsubpath + try: + tags_data = await api_request(self, ENDPOINTS["tags"]) + tag_info: dict[str, Any] = next( + (tag for tag in tags_data if tag.get("slug") == tag_slug), {} + ) + tag_name = tag_info.get("name", tag_slug) + show_count = tag_info.get("shows_count", 0) + track_count = tag_info.get("tracks_count", 0) + + subfolders: list[BrowseFolder | Album | Track] = [] + + if show_count > 0: + subfolders.append( + BrowseFolder( + item_id=f"tag_shows_{tag_slug}", + provider=self.domain, + path=f"phishin://tags/{tag_slug}/shows", + name=f"Shows with {tag_name} ({show_count})", + ) + ) + + if track_count > 0: + subfolders.append( + BrowseFolder( + item_id=f"tag_tracks_{tag_slug}", + provider=self.domain, + path=f"phishin://tags/{tag_slug}/tracks", + name=f"All {tag_name} Tracks ({track_count})", + ) + ) + + return subfolders + + except (MediaNotFoundError, ProviderUnavailableError): + raise + except Exception as err: + self.logger.error("Failed to get tag subfolders: %s", err) + raise ProviderUnavailableError(f"Tag subfolders error: {err}") from err + else: + tag_slug, content_type = subsubpath.split("/", 1) + if content_type == "shows": + return await self._get_shows_for_tag(tag_slug) + elif content_type == "tracks": + return await self._get_tracks_for_tag(tag_slug) + else: + return [] + + @use_cache(expiration=86400) # 24 hours - Tag associations could change as new shows are tagged + async def _get_tracks_for_tag(self, tag_slug: str) -> list[BrowseFolder | Album | Track]: + """Get tracks for a specific tag.""" + try: + tracks_data = await api_request( + self, + ENDPOINTS["tracks"], + params={ + "tag_slug": tag_slug, + "per_page": 100, + "audio_status": "complete_or_partial", + "sort": "likes_count:desc", + }, + ) + + tracks: list[BrowseFolder | Album | Track] = [] + for track_data in tracks_data.get("tracks", []): + if track_data.get("mp3_url"): + track = track_to_ma_track(self, track_data) + tracks.append(track) + + return tracks + + except (MediaNotFoundError, ProviderUnavailableError): + raise + except Exception as err: + self.logger.error("Failed to get tracks for tag %s: %s", tag_slug, err) + raise ProviderUnavailableError(f"Tag tracks error: {err}") from err + + async def _browse_top_shows(self) -> list[Album]: + """Get top 46 most liked shows.""" + try: + shows_data = await api_request( + self, + ENDPOINTS["shows"], + params={ + "per_page": 46, + "sort": "likes_count:desc", + "audio_status": "complete_or_partial", + }, + ) + + albums: list[Album] = [] + for show in shows_data.get("shows", []): + if show.get("audio_status") in ["complete", "partial"]: + album = show_to_album(self, show) + albums.append(album) + + return albums + + except (MediaNotFoundError, ProviderUnavailableError): + raise + except Exception as err: + self.logger.error("Failed to get top shows: %s", err) + raise ProviderUnavailableError(f"Top shows error: {err}") from err + + async def _browse_top_tracks(self) -> list[Track]: + """Get top 46 most liked tracks.""" + try: + tracks_data = await api_request( + self, + ENDPOINTS["tracks"], + params={ + "per_page": 46, + "sort": "likes_count:desc", + "audio_status": "complete_or_partial", + }, + ) + + tracks: list[Track] = [] + for track_data in tracks_data.get("tracks", []): + if track_data.get("mp3_url"): + track = track_to_ma_track(self, track_data) + tracks.append(track) + + return tracks + + except (MediaNotFoundError, ProviderUnavailableError): + raise + except Exception as err: + self.logger.error("Failed to get top tracks: %s", err) + raise ProviderUnavailableError(f"Top tracks error: {err}") from err + + @use_cache(expiration=86400) # 24 hours - Shows can be added to the current year + async def _get_shows_for_period(self, period: str) -> list[BrowseFolder | Album]: + """Get shows for a specific year or period.""" + try: + if "-" in period and len(period.split("-")) == 2: + params = { + "year_range": period, + "per_page": 100, + "audio_status": "complete_or_partial", + } + else: + params = { + "year": period, + "per_page": 100, + "audio_status": "complete_or_partial", + } + + shows_data = await api_request(self, ENDPOINTS["shows"], params=params) + + albums: list[BrowseFolder | Album] = [] + for show in shows_data.get("shows", []): + if show.get("audio_status") in ["complete", "partial"]: + album = show_to_album(self, show) + albums.append(album) + + return sorted(albums, key=lambda x: x.name) + + except (MediaNotFoundError, ProviderUnavailableError): + raise + except Exception as err: + self.logger.error("Failed to browse period %s: %s", period, err) + raise ProviderUnavailableError(f"Browse period error: {err}") from err + + @use_cache(expiration=86400) # 24 hours - Venues might get new shows added + async def _get_shows_for_venue(self, venue_slug: str) -> list[BrowseFolder | Album]: + """Get shows for a specific venue.""" + try: + shows_data = await api_request( + self, + ENDPOINTS["shows"], + params={ + "venue_slug": venue_slug, + "per_page": 100, + "audio_status": "complete_or_partial", + "sort": "date:desc", + }, + ) + + albums: list[BrowseFolder | Album] = [] + for show in shows_data.get("shows", []): + if show.get("audio_status") in ["complete", "partial"]: + album = show_to_album(self, show) + albums.append(album) + + return albums + + except (MediaNotFoundError, ProviderUnavailableError): + raise + except Exception as err: + self.logger.error("Failed to get shows for venue %s: %s", venue_slug, err) + raise ProviderUnavailableError(f"Venue shows error: {err}") from err + + @use_cache(expiration=86400) # 24 hours - Tag associations could change as new shows are tagged + async def _get_shows_for_tag(self, tag_slug: str) -> list[BrowseFolder | Album | Track]: + """Get shows for a specific tag.""" + try: + shows_data = await api_request( + self, + ENDPOINTS["shows"], + params={ + "tag_slug": tag_slug, + "per_page": 100, + "audio_status": "complete_or_partial", + "sort": "date:desc", + }, + ) + + albums: list[BrowseFolder | Album | Track] = [] + for show in shows_data.get("shows", []): + if show.get("audio_status") in ["complete", "partial"]: + album = show_to_album(self, show) + albums.append(album) + + return albums + + except (MediaNotFoundError, ProviderUnavailableError): + raise + except Exception as err: + self.logger.error("Failed to get shows for tag %s: %s", tag_slug, err) + raise ProviderUnavailableError(f"Tag shows error: {err}") from err -- 2.34.1