--- /dev/null
+"""iTunes Podcast search support for MusicAssistant."""
+
+from __future__ import annotations
+
+from collections.abc import AsyncGenerator
+from io import BytesIO
+from pathlib import Path
+from typing import TYPE_CHECKING, Any
+
+import aiofiles
+import orjson
+import podcastparser
+from music_assistant_models.config_entries import ConfigEntry, ConfigValueOption
+from music_assistant_models.enums import (
+ ConfigEntryType,
+ ContentType,
+ ImageType,
+ MediaType,
+ ProviderFeature,
+ StreamType,
+)
+from music_assistant_models.errors import MediaNotFoundError
+from music_assistant_models.media_items import (
+ AudioFormat,
+ MediaItemImage,
+ Podcast,
+ PodcastEpisode,
+ ProviderMapping,
+ SearchResults,
+ UniqueList,
+)
+from music_assistant_models.streamdetails import StreamDetails
+
+from music_assistant.helpers.throttle_retry import ThrottlerManager, throttle_with_retries
+from music_assistant.models.music_provider import MusicProvider
+from music_assistant.providers.itunes_podcasts.parsers import parse_podcast, parse_podcast_episode
+from music_assistant.providers.itunes_podcasts.schema import ITunesSearchResults
+
+if TYPE_CHECKING:
+ from music_assistant_models.config_entries import ConfigValueType, ProviderConfig
+ from music_assistant_models.provider import ProviderManifest
+
+ from music_assistant.mass import MusicAssistant
+ from music_assistant.models import ProviderInstanceType
+
+
+CONF_LOCALE = "locale"
+CONF_EXPLICIT = "explicit"
+CONF_NUM_EPISODES = "num_episodes"
+
+CACHE_CATEGORY_PODCASTS = 0
+
+
+async def setup(
+ mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
+) -> ProviderInstanceType:
+ """Initialize provider(instance) with given configuration."""
+ return ITunesPodcastsProvider(mass, manifest, config)
+
+
+async def get_config_entries(
+ mass: MusicAssistant,
+ instance_id: str | None = None,
+ action: str | None = None,
+ values: dict[str, ConfigValueType] | None = None,
+) -> tuple[ConfigEntry, ...]:
+ """
+ Return Config entries to setup this provider.
+
+ instance_id: id of an existing provider instance (None if new instance setup).
+ action: [optional] action key called from config entries UI.
+ values: the (intermediate) raw values for config entries sent with the action.
+ """
+ # ruff: noqa: ARG001
+ json_path = Path(__file__).parent / "itunes_country_codes.json"
+ async with aiofiles.open(json_path) as f:
+ country_codes = orjson.loads(await f.read())
+
+ language_options = [ConfigValueOption(val, key.lower()) for key, val in country_codes.items()]
+ return (
+ ConfigEntry(
+ key=CONF_LOCALE,
+ type=ConfigEntryType.STRING,
+ label="Country",
+ required=True,
+ options=language_options,
+ ),
+ ConfigEntry(
+ key=CONF_NUM_EPISODES,
+ type=ConfigEntryType.INTEGER,
+ label="Maximum number of episodes. 0 for unlimited.",
+ required=False,
+ description="Maximum number of episodes. 0 for unlimited.",
+ default_value=0,
+ ),
+ ConfigEntry(
+ key=CONF_EXPLICIT,
+ type=ConfigEntryType.BOOLEAN,
+ label="Include explicit results",
+ required=False,
+ description="Whether or not to include explicit content results in search.",
+ default_value=True,
+ ),
+ )
+
+
+class ITunesPodcastsProvider(MusicProvider):
+ """ITunesPodcastsProvider."""
+
+ throttler: ThrottlerManager
+
+ @property
+ def supported_features(self) -> set[ProviderFeature]:
+ """Return the features supported by this Provider."""
+ return {
+ ProviderFeature.SEARCH,
+ }
+
+ @property
+ def is_streaming_provider(self) -> bool:
+ """Return True if the provider is a streaming provider."""
+ # For streaming providers return True here but for local file based providers return False.
+ return True
+
+ async def handle_async_init(self) -> None:
+ """Handle async initialization of the provider."""
+ self.max_episodes = int(str(self.config.get_value(CONF_NUM_EPISODES)))
+ # 20 requests per minute, be a bit below
+ self.throttler = ThrottlerManager(rate_limit=18, period=60)
+
+ async def search(
+ self, search_query: str, media_types: list[MediaType], limit: int = 10
+ ) -> SearchResults:
+ """Perform search on musicprovider."""
+ result = SearchResults()
+ if MediaType.PODCAST not in media_types:
+ return result
+
+ if limit < 1:
+ limit = 1
+ elif limit > 200:
+ limit = 200
+ country = str(self.config.get_value(CONF_LOCALE))
+ explicit = "Yes" if bool(self.config.get_value(CONF_EXPLICIT)) else "No"
+ params: dict[str, str | int] = {
+ "media": "podcast",
+ "entity": "podcast",
+ "country": country,
+ "attribute": "titleTerm",
+ "explicit": explicit,
+ "limit": limit,
+ "term": search_query,
+ }
+ url = "https://itunes.apple.com/search?"
+ result.podcasts = await self._perform_search(url, params)
+
+ return result
+
+ @throttle_with_retries
+ async def _perform_search(self, url: str, params: dict[str, str | int]) -> list[Podcast]:
+ response = await self.mass.http_session.get(url, params=params)
+ json_response = b""
+ if response.status == 200:
+ json_response = await response.read()
+ if not json_response:
+ return []
+ podcast_list: list[Podcast] = []
+ results = ITunesSearchResults.from_json(json_response).results
+ for result in results:
+ if result.feed_url is None or result.track_name is None:
+ continue
+ podcast = Podcast(
+ name=result.track_name,
+ item_id=result.feed_url,
+ publisher=result.artist_name,
+ provider=self.lookup_key,
+ provider_mappings={
+ ProviderMapping(
+ item_id=result.feed_url,
+ provider_domain=self.domain,
+ provider_instance=self.instance_id,
+ )
+ },
+ )
+ image_list = []
+ for artwork_url in [
+ result.artwork_url_600,
+ result.artwork_url_100,
+ result.artwork_url_60,
+ result.artwork_url_30,
+ ]:
+ if artwork_url is not None:
+ image_list.append(
+ MediaItemImage(
+ type=ImageType.THUMB, path=artwork_url, provider=self.lookup_key
+ )
+ )
+ podcast.metadata.images = UniqueList(image_list)
+ podcast_list.append(podcast)
+ return podcast_list
+
+ async def get_podcast(self, prov_podcast_id: str) -> Podcast:
+ """Get podcast."""
+ parsed = await self._get_cached_podcast(prov_podcast_id)
+
+ return parse_podcast(
+ feed_url=prov_podcast_id,
+ parsed_feed=parsed,
+ lookup_key=self.lookup_key,
+ domain=self.domain,
+ instance_id=self.instance_id,
+ )
+
+ async def get_podcast_episodes(
+ self, prov_podcast_id: str
+ ) -> AsyncGenerator[PodcastEpisode, None]:
+ """Get podcast episodes."""
+ podcast = await self._get_cached_podcast(prov_podcast_id)
+ podcast_cover = podcast.get("cover_url")
+ episodes = podcast.get("episodes", [])
+ for cnt, episode in enumerate(episodes):
+ yield parse_podcast_episode(
+ episode=episode,
+ prov_podcast_id=prov_podcast_id,
+ episode_cnt=cnt,
+ podcast_cover=podcast_cover,
+ domain=self.domain,
+ lookup_key=self.lookup_key,
+ instance_id=self.instance_id,
+ )
+
+ async def get_podcast_episode(self, prov_episode_id: str) -> PodcastEpisode:
+ """Get single podcast episode."""
+ prov_podcast_id, guid_or_stream_url = prov_episode_id.split(" ")
+ podcast = await self._get_cached_podcast(prov_podcast_id)
+ podcast_cover = podcast.get("cover_url")
+ episodes = podcast.get("episodes", [])
+ for cnt, episode in enumerate(episodes):
+ episode_enclosures = episode.get("enclosures", [])
+ if len(episode_enclosures) < 1:
+ raise RuntimeError
+ stream_url = episode_enclosures[0].get("url", None)
+ if guid_or_stream_url == episode.get("guid", stream_url):
+ return parse_podcast_episode(
+ episode=episode,
+ prov_podcast_id=prov_podcast_id,
+ episode_cnt=cnt,
+ podcast_cover=podcast_cover,
+ domain=self.domain,
+ lookup_key=self.lookup_key,
+ instance_id=self.instance_id,
+ )
+
+ raise MediaNotFoundError("Episode not found")
+
+ async def _get_episode_stream_url(self, podcast_id: str, guid_or_stream_url: str) -> str | None:
+ podcast = await self._get_cached_podcast(podcast_id)
+ episodes = podcast.get("episodes", [])
+ for cnt, episode in enumerate(episodes):
+ episode_enclosures = episode.get("enclosures", [])
+ if len(episode_enclosures) < 1:
+ raise MediaNotFoundError
+ stream_url: str | None = episode_enclosures[0].get("url", None)
+ if guid_or_stream_url == episode.get("guid", stream_url):
+ return stream_url
+ return None
+
+ async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails:
+ """Get streamdetails for item."""
+ podcast_id, guid_or_stream_url = item_id.split(" ")
+ stream_url = await self._get_episode_stream_url(podcast_id, guid_or_stream_url)
+ if stream_url is None:
+ raise MediaNotFoundError
+ return StreamDetails(
+ provider=self.lookup_key,
+ item_id=item_id,
+ audio_format=AudioFormat(
+ content_type=ContentType.try_parse(stream_url),
+ ),
+ media_type=MediaType.PODCAST_EPISODE,
+ stream_type=StreamType.HTTP,
+ path=stream_url,
+ can_seek=True,
+ allow_seek=True,
+ )
+
+ async def _get_cached_podcast(self, prov_podcast_id: str) -> dict[str, Any]:
+ parsed_podcast = await self.mass.cache.get(
+ key=prov_podcast_id,
+ base_key=self.lookup_key,
+ category=CACHE_CATEGORY_PODCASTS,
+ default=None,
+ )
+ if parsed_podcast is None:
+ # see music-assistant/server@6aae82e
+ response = await self.mass.http_session.get(
+ prov_podcast_id, headers={"User-Agent": "Mozilla/5.0"}
+ )
+ if response.status != 200:
+ raise RuntimeError
+ feed_data = await response.read()
+ feed_stream = BytesIO(feed_data)
+ parsed_podcast = podcastparser.parse(
+ prov_podcast_id, feed_stream, max_episodes=self.max_episodes
+ )
+ await self._cache_set_podcast(feed_url=prov_podcast_id, parsed_podcast=parsed_podcast)
+
+ # this is a dictionary from podcastparser
+ return parsed_podcast # type: ignore[no-any-return]
+
+ async def _cache_set_podcast(self, feed_url: str, parsed_podcast: dict[str, Any]) -> None:
+ await self.mass.cache.set(
+ key=feed_url,
+ base_key=self.lookup_key,
+ category=CACHE_CATEGORY_PODCASTS,
+ data=parsed_podcast,
+ expiration=60 * 60 * 24, # 1 day
+ )
--- /dev/null
+{
+ "AF": "Afghanistan",
+ "AL": "Albania",
+ "DZ": "Algeria",
+ "AD": "Andorra",
+ "AO": "Angola",
+ "AI": "Anguilla",
+ "AG": "Antigua and Barbuda",
+ "AR": "Argentina",
+ "AM": "Armenia",
+ "AU": "Australia",
+ "AT": "Austria",
+ "AZ": "Azerbaijan",
+ "BS": "Bahamas (the)",
+ "BH": "Bahrain",
+ "BD": "Bangladesh",
+ "BB": "Barbados",
+ "BY": "Belarus",
+ "BE": "Belgium",
+ "BZ": "Belize",
+ "BJ": "Benin",
+ "BM": "Bermuda",
+ "BT": "Bhutan",
+ "BO": "Bolivia (Plurinational State of)",
+ "BA": "Bosnia and Herzegovina",
+ "BW": "Botswana",
+ "BR": "Brazil",
+ "BN": "Brunei Darussalam",
+ "BG": "Bulgaria",
+ "BF": "Burkina Faso",
+ "CV": "Cabo Verde",
+ "KH": "Cambodia",
+ "CM": "Cameroon",
+ "CA": "Canada",
+ "KY": "Cayman Islands (the)",
+ "CF": "Central African Republic (the)",
+ "TD": "Chad",
+ "CL": "Chile",
+ "CN": "China",
+ "CO": "Colombia",
+ "CD": "Congo (the Democratic Republic of the)",
+ "CG": "Congo (the)",
+ "CR": "Costa Rica",
+ "HR": "Croatia",
+ "CY": "Cyprus",
+ "CZ": "Czechia",
+ "CI": "Côte d'Ivoire",
+ "DK": "Denmark",
+ "DM": "Dominica",
+ "DO": "Dominican Republic (the)",
+ "EC": "Ecuador",
+ "EG": "Egypt",
+ "SV": "El Salvador",
+ "EE": "Estonia",
+ "SZ": "Eswatini",
+ "ET": "Ethiopia",
+ "FJ": "Fiji",
+ "FI": "Finland",
+ "FR": "France",
+ "GA": "Gabon",
+ "GM": "Gambia (the)",
+ "GE": "Georgia",
+ "DE": "Germany",
+ "GH": "Ghana",
+ "GR": "Greece",
+ "GD": "Grenada",
+ "GT": "Guatemala",
+ "GN": "Guinea",
+ "GW": "Guinea-Bissau",
+ "GY": "Guyana",
+ "HN": "Honduras",
+ "HK": "Hong Kong",
+ "HU": "Hungary",
+ "IS": "Iceland",
+ "IN": "India",
+ "ID": "Indonesia",
+ "IQ": "Iraq",
+ "IE": "Ireland",
+ "IL": "Israel",
+ "IT": "Italy",
+ "JM": "Jamaica",
+ "JP": "Japan",
+ "JO": "Jordan",
+ "KZ": "Kazakhstan",
+ "KE": "Kenya",
+ "KR": "Korea (the Republic of)",
+ "XK": "Kosovo",
+ "KW": "Kuwait",
+ "KG": "Kyrgyzstan",
+ "LA": "Lao People's Democratic Republic (the)",
+ "LV": "Latvia",
+ "LB": "Lebanon",
+ "LR": "Liberia",
+ "LY": "Libya",
+ "LI": "Liechtenstein",
+ "LT": "Lithuania",
+ "LU": "Luxembourg",
+ "MO": "Macao",
+ "MG": "Madagascar",
+ "MW": "Malawi",
+ "MY": "Malaysia",
+ "MV": "Maldives",
+ "ML": "Mali",
+ "MT": "Malta",
+ "MR": "Mauritania",
+ "MU": "Mauritius",
+ "MX": "Mexico",
+ "FM": "Micronesia (Federated States of)",
+ "MD": "Moldova (the Republic of)",
+ "MC": "Monaco",
+ "MN": "Mongolia",
+ "ME": "Montenegro",
+ "MS": "Montserrat",
+ "MA": "Morocco",
+ "MZ": "Mozambique",
+ "MM": "Myanmar",
+ "NA": "Namibia",
+ "NR": "Nauru",
+ "NP": "Nepal",
+ "NL": "Netherlands (the)",
+ "NZ": "New Zealand",
+ "NI": "Nicaragua",
+ "NE": "Niger (the)",
+ "NG": "Nigeria",
+ "NO": "Norway",
+ "OM": "Oman",
+ "PK": "Pakistan",
+ "PW": "Palau",
+ "PS": "Palestine, State of",
+ "PA": "Panama",
+ "PG": "Papua New Guinea",
+ "PY": "Paraguay",
+ "PE": "Peru",
+ "PH": "Philippines (the)",
+ "PL": "Poland",
+ "PT": "Portugal",
+ "QA": "Qatar",
+ "MK": "Republic of North Macedonia",
+ "RO": "Romania",
+ "RU": "Russian Federation (the)",
+ "RW": "Rwanda",
+ "KN": "Saint Kitts and Nevis",
+ "LC": "Saint Lucia",
+ "VC": "Saint Vincent and the Grenadines",
+ "WS": "Samoa",
+ "ST": "Sao Tome and Principe",
+ "SA": "Saudi Arabia",
+ "SN": "Senegal",
+ "RS": "Serbia",
+ "SC": "Seychelles",
+ "SL": "Sierra Leone",
+ "SG": "Singapore",
+ "SK": "Slovakia",
+ "SI": "Slovenia",
+ "SB": "Solomon Islands",
+ "ZA": "South Africa",
+ "ES": "Spain",
+ "LK": "Sri Lanka",
+ "SR": "Suriname",
+ "SE": "Sweden",
+ "CH": "Switzerland",
+ "TW": "Taiwan (Province of China)",
+ "TJ": "Tajikistan",
+ "TZ": "Tanzania, United Republic of",
+ "TH": "Thailand",
+ "TO": "Tonga",
+ "TT": "Trinidad and Tobago",
+ "TN": "Tunisia",
+ "TR": "Turkey",
+ "TM": "Turkmenistan",
+ "TC": "Turks and Caicos Islands (the)",
+ "UG": "Uganda",
+ "UA": "Ukraine",
+ "AE": "United Arab Emirates (the)",
+ "GB": "United Kingdom of Great Britain and Northern Ireland (the)",
+ "US": "United States of America (the)",
+ "UY": "Uruguay",
+ "UZ": "Uzbekistan",
+ "VU": "Vanuatu",
+ "VE": "Venezuela (Bolivarian Republic of)",
+ "VN": "Viet Nam",
+ "VG": "Virgin Islands (British)",
+ "YE": "Yemen",
+ "ZM": "Zambia",
+ "ZW": "Zimbabwe"
+}
--- /dev/null
+"""Podcastfeed -> Mass."""
+
+from typing import Any
+
+from music_assistant_models.enums import ContentType, ImageType, MediaType
+from music_assistant_models.media_items import (
+ AudioFormat,
+ ItemMapping,
+ MediaItemChapter,
+ MediaItemImage,
+ Podcast,
+ PodcastEpisode,
+ ProviderMapping,
+ UniqueList,
+)
+
+
+def parse_podcast(
+ *, feed_url: str, parsed_feed: dict[str, Any], lookup_key: str, domain: str, instance_id: str
+) -> Podcast:
+ """Podcast -> Mass Podcast."""
+ publisher = parsed_feed.get("author") or parsed_feed.get("itunes_author", "NO_AUTHOR")
+ mass_podcast = Podcast(
+ item_id=feed_url,
+ name=parsed_feed.get("title", "NO_TITLE"),
+ publisher=publisher,
+ provider=lookup_key,
+ uri=parsed_feed.get("link"),
+ provider_mappings={
+ ProviderMapping(
+ item_id=feed_url,
+ provider_domain=domain,
+ provider_instance=instance_id,
+ )
+ },
+ )
+ genres: list[str] = []
+ if _genres := parsed_feed.get("itunes_categories"):
+ for _sub_genre in _genres:
+ if isinstance(_sub_genre, list):
+ genres.extend(x for x in _sub_genre if isinstance(x, str))
+ elif isinstance(_sub_genre, str):
+ genres.append(_sub_genre)
+
+ mass_podcast.metadata.genres = set(genres)
+ mass_podcast.metadata.description = parsed_feed.get("description", "")
+ mass_podcast.metadata.explicit = parsed_feed.get("explicit", False)
+ language = parsed_feed.get("language")
+ if language is not None:
+ mass_podcast.metadata.languages = UniqueList([language])
+ episodes = parsed_feed.get("episodes", [])
+ mass_podcast.total_episodes = len(episodes)
+ podcast_cover = parsed_feed.get("cover_url")
+ if podcast_cover is not None:
+ mass_podcast.metadata.images = UniqueList(
+ [
+ MediaItemImage(
+ type=ImageType.THUMB,
+ path=podcast_cover,
+ provider=lookup_key,
+ remotely_accessible=True,
+ )
+ ]
+ )
+ return mass_podcast
+
+
+def parse_podcast_episode(
+ *,
+ episode: dict[str, Any],
+ prov_podcast_id: str,
+ episode_cnt: int,
+ podcast_cover: str | None = None,
+ lookup_key: str,
+ domain: str,
+ instance_id: str,
+) -> PodcastEpisode:
+ """Podcast Episode -> Mass Podcast Episode."""
+ episode_duration = episode.get("total_time", 0.0)
+ episode_title = episode.get("title", "NO_EPISODE_TITLE")
+ episode_cover = episode.get("episode_art_url", podcast_cover)
+ episode_published = episode.get("published")
+ episode_enclosures = episode.get("enclosures", [])
+ if len(episode_enclosures) < 1:
+ raise RuntimeError
+ stream_url = episode_enclosures[0].get("url", None)
+ # not all feeds have a guid, but a guid is preferred as identification
+ guid_or_stream_url = episode.get("guid", stream_url)
+
+ episode_id = f"{prov_podcast_id} {guid_or_stream_url}"
+ mass_episode = PodcastEpisode(
+ item_id=episode_id,
+ provider=lookup_key,
+ name=episode_title,
+ duration=int(episode_duration),
+ position=episode_cnt,
+ podcast=ItemMapping(
+ item_id=prov_podcast_id,
+ provider=lookup_key,
+ name=episode_title,
+ media_type=MediaType.PODCAST,
+ ),
+ provider_mappings={
+ ProviderMapping(
+ item_id=episode_id,
+ provider_domain=domain,
+ provider_instance=instance_id,
+ audio_format=AudioFormat(
+ content_type=ContentType.try_parse(stream_url),
+ ),
+ url=stream_url,
+ )
+ },
+ )
+ mass_episode.metadata.release_date = episode_published
+
+ # chapter
+ if chapters := episode.get("chapters"):
+ _chapters = []
+ for cnt, chapter in chapters:
+ if not isinstance(chapter, dict):
+ continue
+ title = chapter.get("title")
+ start = chapter.get("start")
+ if title and start:
+ _chapters.append(MediaItemChapter(position=cnt + 1, name=title, start=start))
+
+ # cover image
+ if episode_cover is not None:
+ mass_episode.metadata.images = UniqueList(
+ [
+ MediaItemImage(
+ type=ImageType.THUMB,
+ path=episode_cover,
+ provider=lookup_key,
+ remotely_accessible=True,
+ )
+ ]
+ )
+
+ return mass_episode
--- /dev/null
+"""Schema for iTunes Podcast Search.
+
+Only what is needed.
+"""
+
+from dataclasses import dataclass, field
+
+from mashumaro import field_options
+from mashumaro.config import BaseConfig
+from mashumaro.mixins.json import DataClassJSONMixin
+
+
+class _BaseModel(DataClassJSONMixin):
+ """Model shared between schema definitions."""
+
+ class Config(BaseConfig):
+ """Base configuration."""
+
+ forbid_extra_keys = False
+ serialize_by_alias = True
+
+
+@dataclass(kw_only=True)
+class PodcastSearchResult(_BaseModel):
+ """PodcastSearchResult."""
+
+ collection_id: int | None = field(metadata=field_options(alias="collectionId"), default=None)
+ kind: str | None = None
+ artist_name: str | None = field(metadata=field_options(alias="artistName"), default=None)
+ collection_name: str | None = field(
+ metadata=field_options(alias="collectionName"), default=None
+ )
+ collection_censored_name: str | None = field(
+ metadata=field_options(alias="collectionCensoredName"), default=None
+ )
+ track_name: str | None = field(metadata=field_options(alias="trackName"), default=None)
+ track_censored_name: str | None = field(
+ metadata=field_options(alias="trackCensoredName"), default=None
+ )
+ feed_url: str | None = field(metadata=field_options(alias="feedUrl"), default=None)
+ artwork_url_30: str | None = field(metadata=field_options(alias="artworkUrl30"), default=None)
+ artwork_url_60: str | None = field(metadata=field_options(alias="artworkUrl60"), default=None)
+ artwork_url_100: str | None = field(metadata=field_options(alias="artworkUrl100"), default=None)
+ artwork_url_600: str | None = field(metadata=field_options(alias="artworkUrl600"), default=None)
+ release_date: str | None = field(metadata=field_options(alias="releaseDate"), default=None)
+ track_count: int = field(metadata=field_options(alias="trackCount"), default=0)
+ primary_genre_name: str | None = field(
+ metadata=field_options(alias="primaryGenreName"), default=None
+ )
+ genres: list[str] = field(default_factory=list)
+
+
+@dataclass(kw_only=True)
+class ITunesSearchResults(_BaseModel):
+ """SearchResults."""
+
+ result_count: int = field(metadata=field_options(alias="resultCount"), default=0)
+ results: list[PodcastSearchResult] = field(default_factory=list)