import asyncio
import os
-from collections.abc import AsyncGenerator
+from collections.abc import AsyncGenerator, Sequence
from logging import getLevelName
from typing import TYPE_CHECKING, cast
+from urllib.parse import quote, unquote
from uuid import uuid4
import audible
)
from music_assistant_models.enums import ConfigEntryType, EventType, MediaType, ProviderFeature
from music_assistant_models.errors import LoginFailed, MediaNotFoundError
+from music_assistant_models.media_items import BrowseFolder, ItemMapping
from music_assistant.models.music_provider import MusicProvider
from music_assistant.providers.audible.audible_helper import (
)
if TYPE_CHECKING:
- from music_assistant_models.media_items import Audiobook, MediaItemType
+ from music_assistant_models.media_items import (
+ Audiobook,
+ MediaItemType,
+ Podcast,
+ PodcastEpisode,
+ )
from music_assistant_models.provider import ProviderManifest
from music_assistant_models.streamdetails import StreamDetails
SUPPORTED_FEATURES = {
ProviderFeature.BROWSE,
ProviderFeature.LIBRARY_AUDIOBOOKS,
+ ProviderFeature.LIBRARY_PODCASTS,
}
"""Get full audiobook details by id."""
return await self.helper.get_audiobook(asin=prov_audiobook_id, use_cache=False)
+ async def browse(self, path: str) -> Sequence[MediaItemType | ItemMapping | BrowseFolder]:
+ """Browse this provider's items.
+
+ :param path: The path to browse, (e.g. provider_id://authors).
+ """
+ item_path = path.split("://", 1)[1] if "://" in path else ""
+ parts = item_path.split("/") if item_path else []
+
+ # Root - return main folders
+ if not item_path:
+ return self._browse_root(path)
+
+ # Authors listing
+ if parts[0] == "authors":
+ if len(parts) == 1:
+ return await self._browse_authors(path)
+ # Specific author's books
+ return await self._browse_author_books(unquote(parts[1]))
+
+ # Series listing
+ if parts[0] == "series":
+ if len(parts) == 1:
+ return await self._browse_series(path)
+ # Specific series' books
+ return await self._browse_series_books(unquote(parts[1]))
+
+ # Narrators listing
+ if parts[0] == "narrators":
+ if len(parts) == 1:
+ return await self._browse_narrators(path)
+ return await self._browse_narrator_books(unquote(parts[1]))
+
+ # Genres listing
+ if parts[0] == "genres":
+ if len(parts) == 1:
+ return await self._browse_genres(path)
+ return await self._browse_genre_books(unquote(parts[1]))
+
+ # Publishers listing
+ if parts[0] == "publishers":
+ if len(parts) == 1:
+ return await self._browse_publishers(path)
+ return await self._browse_publisher_books(unquote(parts[1]))
+
+ # Fall back to base implementation for audiobooks/podcasts
+ return await super().browse(path)
+
+ def _browse_root(self, base_path: str) -> list[BrowseFolder]:
+ """Return root browse folders."""
+ return [
+ BrowseFolder(
+ item_id="audiobooks",
+ provider=self.instance_id,
+ path=f"{base_path}audiobooks",
+ name="",
+ translation_key="audiobooks",
+ ),
+ BrowseFolder(
+ item_id="podcasts",
+ provider=self.instance_id,
+ path=f"{base_path}podcasts",
+ name="",
+ translation_key="podcasts",
+ ),
+ BrowseFolder(
+ item_id="authors",
+ provider=self.instance_id,
+ path=f"{base_path}authors",
+ name="Authors",
+ ),
+ BrowseFolder(
+ item_id="series",
+ provider=self.instance_id,
+ path=f"{base_path}series",
+ name="Series",
+ ),
+ BrowseFolder(
+ item_id="narrators",
+ provider=self.instance_id,
+ path=f"{base_path}narrators",
+ name="Narrators",
+ ),
+ BrowseFolder(
+ item_id="genres",
+ provider=self.instance_id,
+ path=f"{base_path}genres",
+ name="Genres",
+ ),
+ BrowseFolder(
+ item_id="publishers",
+ provider=self.instance_id,
+ path=f"{base_path}publishers",
+ name="Publishers",
+ ),
+ ]
+
+ async def _browse_authors(self, base_path: str) -> list[BrowseFolder]:
+ """Return list of all authors."""
+ authors = await self.helper.get_authors()
+ return [
+ BrowseFolder(
+ item_id=asin,
+ provider=self.instance_id,
+ path=f"{base_path}/{quote(asin)}",
+ name=name,
+ )
+ for asin, name in sorted(authors.items(), key=lambda x: x[1])
+ ]
+
+ async def _browse_author_books(self, author_asin: str) -> list[Audiobook]:
+ """Return audiobooks by a specific author."""
+ return await self.helper.get_audiobooks_by_author(author_asin)
+
+ async def _browse_series(self, base_path: str) -> list[BrowseFolder]:
+ """Return list of all series."""
+ series = await self.helper.get_series()
+ return [
+ BrowseFolder(
+ item_id=asin,
+ provider=self.instance_id,
+ path=f"{base_path}/{quote(asin)}",
+ name=title,
+ )
+ for asin, title in sorted(series.items(), key=lambda x: x[1])
+ ]
+
+ async def _browse_series_books(self, series_asin: str) -> list[Audiobook]:
+ """Return audiobooks in a specific series."""
+ return await self.helper.get_audiobooks_by_series(series_asin)
+
+ async def _browse_narrators(self, base_path: str) -> list[BrowseFolder]:
+ """Return list of all narrators."""
+ narrators = await self.helper.get_narrators()
+ return [
+ BrowseFolder(
+ item_id=asin,
+ provider=self.instance_id,
+ path=f"{base_path}/{quote(asin)}",
+ name=name,
+ )
+ for asin, name in sorted(narrators.items(), key=lambda x: x[1])
+ ]
+
+ async def _browse_narrator_books(self, narrator_asin: str) -> list[Audiobook]:
+ """Return audiobooks by a specific narrator."""
+ return await self.helper.get_audiobooks_by_narrator(narrator_asin)
+
+ async def _browse_genres(self, base_path: str) -> list[BrowseFolder]:
+ """Return list of all genres."""
+ genres = await self.helper.get_genres()
+ return [
+ BrowseFolder(
+ item_id=genre,
+ provider=self.instance_id,
+ path=f"{base_path}/{quote(genre)}",
+ name=genre,
+ )
+ for genre in sorted(genres)
+ ]
+
+ async def _browse_genre_books(self, genre: str) -> list[Audiobook]:
+ """Return audiobooks matching a genre."""
+ return await self.helper.get_audiobooks_by_genre(genre)
+
+ async def _browse_publishers(self, base_path: str) -> list[BrowseFolder]:
+ """Return list of all publishers."""
+ publishers = await self.helper.get_publishers()
+ return [
+ BrowseFolder(
+ item_id=publisher,
+ provider=self.instance_id,
+ path=f"{base_path}/{quote(publisher)}",
+ name=publisher,
+ )
+ for publisher in sorted(publishers)
+ ]
+
+ async def _browse_publisher_books(self, publisher: str) -> list[Audiobook]:
+ """Return audiobooks from a specific publisher."""
+ return await self.helper.get_audiobooks_by_publisher(publisher)
+
+ async def get_library_podcasts(self) -> AsyncGenerator[Podcast, None]:
+ """Get all podcasts from the library."""
+ async for podcast in self.helper.get_library_podcasts():
+ yield podcast
+
+ async def get_podcast(self, prov_podcast_id: str) -> Podcast:
+ """Get full podcast details by id."""
+ return await self.helper.get_podcast(asin=prov_podcast_id)
+
+ async def get_podcast_episodes(
+ self, prov_podcast_id: str
+ ) -> AsyncGenerator[PodcastEpisode, None]:
+ """Get all episodes for a podcast."""
+ async for episode in self.helper.get_podcast_episodes(prov_podcast_id):
+ yield episode
+
+ async def get_podcast_episode(self, prov_episode_id: str) -> PodcastEpisode:
+ """Get full podcast episode details by id."""
+ return await self.helper.get_podcast_episode(prov_episode_id)
+
async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails:
- """Get streamdetails for a audiobook based of asin."""
+ """Get stream details for an audiobook or podcast episode.
+
+ :param item_id: The ASIN of the audiobook or podcast episode.
+ :param media_type: The type of media (audiobook or podcast episode).
+ """
try:
- return await self.helper.get_stream(asin=item_id)
+ return await self.helper.get_stream(asin=item_id, media_type=media_type)
except ValueError as exc:
raise MediaNotFoundError(f"Failed to get stream details for {item_id}") from exc
media_item is the full media item details of the played/playing track.
"""
- await self.helper.set_last_position(prov_item_id, position)
+ await self.helper.set_last_position(prov_item_id, position, media_type)
async def unload(self, is_removed: bool = False) -> None:
"""
from music_assistant_models.media_items import (
Audiobook,
AudioFormat,
+ ItemMapping,
MediaItemChapter,
MediaItemImage,
+ Podcast,
+ PodcastEpisode,
ProviderMapping,
UniqueList,
)
CACHE_CATEGORY_API = 0
CACHE_CATEGORY_AUDIOBOOK = 1
CACHE_CATEGORY_CHAPTERS = 2
+CACHE_CATEGORY_PODCAST = 3
+CACHE_CATEGORY_PODCAST_EPISODES = 4
+
+# Content delivery types
+AUDIOBOOK_CONTENT_TYPES = ("SinglePartBook", "MultiPartBook")
+PODCAST_CONTENT_TYPES = ("PodcastParent",)
_AUTH_CACHE: dict[str, audible.Authenticator] = {}
self.provider_domain = provider_domain
self.provider_instance = provider_instance
self.logger = logger or logging.getLogger("audible_helper")
+ self._acr_cache: dict[tuple[str, MediaType], str] = {}
- async def _process_audiobook_item(
- self, audiobook_data: dict[str, Any], total_processed: int
- ) -> tuple[Audiobook | None, int]:
- """Process a single audiobook item from the library."""
- content_type = audiobook_data.get("content_delivery_type", "")
- if content_type not in ("SinglePartBook", "MultiPartBook"):
- self.logger.debug(
- "Skipping non-audiobook item: %s (%s)",
- audiobook_data.get("title", "Unknown"),
- content_type,
- )
- return None, total_processed + 1
-
- # Ensure asin is a valid string
- asin = str(audiobook_data.get("asin", ""))
- cached_book = None
- if asin:
- cached_book = await self.mass.cache.get(
- key=asin,
- provider=self.provider_instance,
- category=CACHE_CATEGORY_AUDIOBOOK,
- default=None,
- )
-
- try:
- if cached_book is not None:
- album = self._parse_audiobook(cached_book)
- else:
- album = self._parse_audiobook(audiobook_data)
- return album, total_processed + 1
- except MediaNotFoundError as exc:
- self.logger.warning(f"Skipping invalid audiobook: {exc}")
- return None, total_processed + 1
- except Exception as exc:
- self.logger.warning(
- f"Error processing audiobook {audiobook_data.get('asin', 'unknown')}: {exc}"
- )
- return None, total_processed + 1
-
- async def get_library(self) -> AsyncGenerator[Audiobook, None]:
- """Fetch the user's library with pagination."""
- response_groups = [
- "contributors",
- "media",
- "product_attrs",
- "product_desc",
- "product_details",
- "product_extended_attrs",
- ]
-
+ async def _fetch_library_items(
+ self,
+ response_groups: str,
+ content_types: tuple[str, ...],
+ ) -> AsyncGenerator[dict[str, Any], None]:
+ """Fetch items from the library with pagination."""
page = 1
page_size = 50
total_processed = 0
while iteration < max_iterations:
iteration += 1
self.logger.debug(
- "Audible: Fetching library page %s with page_size %s (processed so far: %s)",
+ "Audible: Fetching library page %s (processed so far: %s)",
page,
- page_size,
total_processed,
)
library = await self._call_api(
"library",
use_cache=False,
- response_groups=",".join(response_groups),
+ response_groups=response_groups,
page=page,
num_results=page_size,
)
items = library.get("items", [])
- total_items = library.get("total_results", 0)
- self.logger.debug(
- "Audible: Got %s items (total reported by API: %s)", len(items), total_items
- )
if not items:
- self.logger.debug(
- "Audible: No more items returned, ending pagination (processed %s items)",
- total_processed,
- )
break
items_processed_this_page = 0
- for audiobook_data in items:
- album, total_processed = await self._process_audiobook_item(
- audiobook_data, total_processed
- )
- if album:
- yield album
- items_processed_this_page += 1
+ for item in items:
+ # Filter by content type if specified
+ if content_types and item.get("content_delivery_type") not in content_types:
+ continue
- self.logger.debug(
- "Audible: Processed %s valid audiobooks on page %s", items_processed_this_page, page
- )
+ yield item
+ items_processed_this_page += 1
+ total_processed += 1
- page += 1
self.logger.debug(
- "Audible: Moving to page %s (processed: %s, total reported: %s)",
- page,
- total_processed,
- total_items,
+ "Audible: Processed %s items on page %s", items_processed_this_page, page
)
+ page += 1
if len(items) < page_size:
- self.logger.debug(
- "Audible: Fewer than page size returned, ending pagination "
- "(processed %s items)",
- total_processed,
- )
break
if iteration >= max_iterations:
max_iterations,
total_processed,
)
- else:
- self.logger.info(
- "Audible: Successfully retrieved %s audiobooks from library", total_processed
+
+ async def _process_audiobook_item(self, audiobook_data: dict[str, Any]) -> Audiobook | None:
+ """Process a single audiobook item from the library."""
+ # Ensure asin is a valid string
+ asin = str(audiobook_data.get("asin", ""))
+ cached_book = None
+ if asin:
+ cached_book = await self.mass.cache.get(
+ key=asin,
+ provider=self.provider_instance,
+ category=CACHE_CATEGORY_AUDIOBOOK,
+ default=None,
)
+ try:
+ if cached_book is not None:
+ return self._parse_audiobook(cached_book)
+ return self._parse_audiobook(audiobook_data)
+ except MediaNotFoundError as exc:
+ self.logger.warning(f"Skipping invalid audiobook: {exc}")
+ return None
+ except Exception as exc:
+ self.logger.warning(
+ f"Error processing audiobook {audiobook_data.get('asin', 'unknown')}: {exc}"
+ )
+ return None
+
+ async def get_library(self) -> AsyncGenerator[Audiobook, None]:
+ """Fetch the user's library with pagination."""
+ response_groups = [
+ "contributors",
+ "media",
+ "product_attrs",
+ "product_desc",
+ "product_details",
+ "product_extended_attrs",
+ ]
+
+ async for item in self._fetch_library_items(
+ ",".join(response_groups), AUDIOBOOK_CONTENT_TYPES
+ ):
+ if album := await self._process_audiobook_item(item):
+ yield album
+
async def get_audiobook(self, asin: str, use_cache: bool = True) -> Audiobook:
"""Fetch the full audiobook by asin with all details including chapters.
# Fetch resume position
book.resume_position_ms = await self.get_last_postion(asin=asin)
- async def get_stream(self, asin: str) -> StreamDetails:
- """Get stream details for a track (audiobook chapter)."""
+ async def get_stream(
+ self, asin: str, media_type: MediaType = MediaType.AUDIOBOOK
+ ) -> StreamDetails:
+ """Get stream details for an audiobook or podcast episode.
+
+ :param asin: The ASIN of the content.
+ :param media_type: The type of media (audiobook or podcast episode).
+ """
if not asin:
self.logger.error("Invalid ASIN provided to get_stream")
raise ValueError("Invalid ASIN provided to get_stream")
- chapters = await self._fetch_chapters(asin=asin)
- if not chapters:
- self.logger.warning(f"No chapters found for ASIN {asin}, using default duration")
- duration = 0
- else:
- try:
- duration = sum(chapter.get("length_ms", 0) for chapter in chapters) / 1000
- except Exception as exc:
- self.logger.warning(f"Error calculating duration for ASIN {asin}: {exc}")
- duration = 0
+ duration = 0
+ # For audiobooks, try to get duration from chapters
+ if media_type == MediaType.AUDIOBOOK:
+ chapters = await self._fetch_chapters(asin=asin)
+ if chapters:
+ try:
+ duration = sum(chapter.get("length_ms", 0) for chapter in chapters) / 1000
+ except Exception as exc:
+ self.logger.warning(f"Error calculating duration for ASIN {asin}: {exc}")
try:
- playback_info = await self.client.post(
- f"content/{asin}/licenserequest",
- body={
- "quality": "High",
- "response_groups": "content_reference,certificate",
- "consumption_type": "Streaming",
- "supported_media_features": {
- "codecs": ["mp4a.40.2", "mp4a.40.42"],
- "drm_types": [
- "Hls",
- ],
+ # Podcasts use Mpeg (non-DRM MP3), audiobooks use HLS
+ if media_type == MediaType.PODCAST_EPISODE:
+ playback_info = await self.client.post(
+ f"content/{asin}/licenserequest",
+ body={
+ "consumption_type": "Streaming",
+ "drm_type": "Mpeg",
+ "quality": "High",
},
- "spatial": False,
- },
- )
+ )
+ else:
+ playback_info = await self.client.post(
+ f"content/{asin}/licenserequest",
+ body={
+ "quality": "High",
+ "response_groups": "content_reference,certificate",
+ "consumption_type": "Streaming",
+ "supported_media_features": {
+ "codecs": ["mp4a.40.2", "mp4a.40.42"],
+ "drm_types": [
+ "Hls",
+ ],
+ },
+ "spatial": False,
+ },
+ )
content_license = playback_info.get("content_license", {})
if not content_license:
content_reference = content_metadata.get("content_reference", {})
size = content_reference.get("content_size_in_bytes", 0)
- m3u8_url = content_license.get("license_response")
- if not m3u8_url:
+ stream_url = content_license.get("license_response")
+ if not stream_url:
self.logger.error(f"No license_response (stream URL) for ASIN {asin}")
raise ValueError(f"Missing stream URL for ASIN {asin}")
acr = content_license.get("acr", "")
+ if acr:
+ self._acr_cache[(asin, media_type)] = acr
+
+ content_type = (
+ ContentType.MP3 if media_type == MediaType.PODCAST_EPISODE else ContentType.AAC
+ )
except Exception as exc:
self.logger.error(f"Error getting stream details for ASIN {asin}: {exc}")
raise ValueError(f"Failed to get stream details: {exc}") from exc
+
return StreamDetails(
provider=self.provider_instance,
size=size,
item_id=f"{asin}",
- audio_format=AudioFormat(content_type=ContentType.AAC),
- media_type=MediaType.AUDIOBOOK,
+ audio_format=AudioFormat(content_type=content_type),
+ media_type=media_type,
stream_type=StreamType.HTTP,
- path=m3u8_url,
+ path=stream_url,
can_seek=True,
allow_seek=True,
duration=duration,
self.logger.error(f"Error getting last position for ASIN {asin}: {exc}")
return 0
- async def set_last_position(self, asin: str, pos: int) -> None:
+ async def set_last_position(
+ self, asin: str, pos: int, media_type: MediaType = MediaType.AUDIOBOOK
+ ) -> None:
"""Report last position to Audible.
- Args:
- asin: The audiobook ID
- pos: Position in seconds
+ :param asin: The content ID (audiobook or podcast episode).
+ :param pos: Position in seconds.
+ :param media_type: The type of media (audiobook or podcast episode).
"""
if not asin or asin == "error" or pos <= 0:
return
try:
position_ms = pos * 1000
- stream_details = await self.get_stream(asin=asin)
- acr = stream_details.data.get("acr")
+ # Try to get ACR from cache first
+ acr = self._acr_cache.get((asin, media_type))
+ if not acr:
+ stream_details = await self.get_stream(asin=asin, media_type=media_type)
+ acr = stream_details.data.get("acr")
if not acr:
self.logger.warning(f"No ACR available for ASIN {asin}, cannot report position")
return book
+ async def _process_podcast_item(self, podcast_data: dict[str, Any]) -> Podcast | None:
+ """Process a single podcast item from the library."""
+ asin = str(podcast_data.get("asin", ""))
+ cached_podcast = None
+ if asin:
+ cached_podcast = await self.mass.cache.get(
+ key=asin,
+ provider=self.provider_instance,
+ category=CACHE_CATEGORY_PODCAST,
+ default=None,
+ )
+
+ try:
+ if cached_podcast is not None:
+ return self._parse_podcast(cached_podcast)
+ return self._parse_podcast(podcast_data)
+ except MediaNotFoundError as exc:
+ self.logger.warning(f"Skipping invalid podcast: {exc}")
+ return None
+ except Exception as exc:
+ self.logger.warning(
+ f"Error processing podcast {podcast_data.get('asin', 'unknown')}: {exc}"
+ )
+ return None
+
+ async def get_library_podcasts(self) -> AsyncGenerator[Podcast, None]:
+ """Fetch podcasts from the user's library with pagination."""
+ response_groups = [
+ "contributors",
+ "media",
+ "product_attrs",
+ "product_desc",
+ "product_details",
+ "product_extended_attrs",
+ ]
+
+ async for item in self._fetch_library_items(
+ ",".join(response_groups), PODCAST_CONTENT_TYPES
+ ):
+ if podcast := await self._process_podcast_item(item):
+ yield podcast
+
+ async def get_podcast(self, asin: str, use_cache: bool = True) -> Podcast:
+ """Fetch full podcast details by ASIN.
+
+ :param asin: The ASIN of the podcast.
+ :param use_cache: Whether to use cached data if available.
+ """
+ if use_cache:
+ cached_podcast = await self.mass.cache.get(
+ key=asin,
+ provider=self.provider_instance,
+ category=CACHE_CATEGORY_PODCAST,
+ default=None,
+ )
+ if cached_podcast is not None:
+ return self._parse_podcast(cached_podcast)
+
+ response = await self._call_api(
+ f"library/{asin}",
+ response_groups="""
+ contributors, media, price, product_attrs, product_desc, product_details,
+ product_extended_attrs, relationships
+ """,
+ )
+
+ if response is None:
+ raise MediaNotFoundError(f"Podcast with ASIN {asin} not found")
+
+ item_data = response.get("item")
+ if item_data is None:
+ raise MediaNotFoundError(f"Podcast data for ASIN {asin} is empty")
+
+ await self.mass.cache.set(
+ key=asin,
+ provider=self.provider_instance,
+ category=CACHE_CATEGORY_PODCAST,
+ data=item_data,
+ )
+ return self._parse_podcast(item_data)
+
+ async def get_podcast_episodes(self, podcast_asin: str) -> AsyncGenerator[PodcastEpisode, None]:
+ """Fetch all episodes for a podcast.
+
+ :param podcast_asin: The ASIN of the parent podcast.
+ """
+ podcast = await self.get_podcast(podcast_asin)
+
+ # Fetch episodes - they're typically in relationships or we need to query children
+ response_groups = [
+ "contributors",
+ "media",
+ "product_attrs",
+ "product_desc",
+ "product_details",
+ "relationships",
+ ]
+
+ page = 1
+ page_size = 50
+ position = 0
+
+ while True:
+ # Query for children of the podcast parent
+ response = await self._call_api(
+ "library",
+ use_cache=False,
+ response_groups=",".join(response_groups),
+ parent_asin=podcast_asin,
+ page=page,
+ num_results=page_size,
+ )
+
+ items = response.get("items", [])
+ if not items:
+ break
+
+ for episode_data in items:
+ try:
+ episode = self._parse_podcast_episode(episode_data, podcast, position)
+ position += 1
+ yield episode
+ except Exception as exc:
+ asin = episode_data.get("asin", "unknown")
+ self.logger.warning(f"Error parsing podcast episode {asin}: {exc}")
+
+ page += 1
+ if len(items) < page_size:
+ break
+
+ async def get_podcast_episode(self, episode_asin: str) -> PodcastEpisode:
+ """Fetch full podcast episode details by ASIN.
+
+ :param episode_asin: The ASIN of the podcast episode.
+ """
+ response = await self._call_api(
+ f"library/{episode_asin}",
+ response_groups="""
+ contributors, media, price, product_attrs, product_desc, product_details,
+ product_extended_attrs, relationships
+ """,
+ )
+
+ if response is None:
+ raise MediaNotFoundError(f"Podcast episode with ASIN {episode_asin} not found")
+
+ item_data = response.get("item")
+ if item_data is None:
+ raise MediaNotFoundError(f"Podcast episode data for ASIN {episode_asin} is empty")
+
+ # Try to get parent podcast info from relationships
+ podcast: Podcast | None = None
+ relationships = item_data.get("relationships", [])
+ for rel in relationships:
+ if rel.get("relationship_type") == "parent":
+ parent_asin = rel.get("asin")
+ if parent_asin:
+ with suppress(MediaNotFoundError):
+ podcast = await self.get_podcast(parent_asin)
+ break
+
+ return self._parse_podcast_episode(item_data, podcast, 0)
+
+ def _parse_podcast(self, podcast_data: dict[str, Any] | None) -> Podcast:
+ """Parse podcast data from API response.
+
+ :param podcast_data: Raw podcast data from the Audible API.
+ """
+ if podcast_data is None:
+ self.logger.error("Received None podcast_data in _parse_podcast")
+ raise MediaNotFoundError("Podcast data not found")
+
+ asin = podcast_data.get("asin", "")
+ title = podcast_data.get("title", "")
+ publisher = podcast_data.get("publisher_name", "")
+
+ # Create podcast object
+ podcast = Podcast(
+ item_id=asin,
+ provider=self.provider_instance,
+ name=title,
+ publisher=publisher,
+ provider_mappings={
+ ProviderMapping(
+ item_id=asin,
+ provider_domain=self.provider_domain,
+ provider_instance=self.provider_instance,
+ )
+ },
+ )
+
+ # Set metadata
+ podcast.metadata.description = _html_to_txt(
+ str(
+ podcast_data.get("publisher_summary", "")
+ or podcast_data.get("extended_product_description", "")
+ )
+ )
+ podcast.metadata.languages = UniqueList([podcast_data.get("language") or ""])
+
+ # Set genres
+ podcast.metadata.genres = {
+ genre.replace("_", " ") for genre in (podcast_data.get("platinum_keywords") or [])
+ }
+
+ # Add images
+ image_path = podcast_data.get("product_images", {}).get("500")
+ podcast.metadata.images = UniqueList(self._create_images(image_path))
+
+ return podcast
+
+ def _parse_podcast_episode(
+ self,
+ episode_data: dict[str, Any] | None,
+ podcast: Podcast | None,
+ position: int,
+ ) -> PodcastEpisode:
+ """Parse podcast episode data from API response.
+
+ :param episode_data: Raw episode data from the Audible API.
+ :param podcast: Parent podcast object (optional).
+ :param position: Position/index of the episode in the podcast.
+ """
+ if episode_data is None:
+ self.logger.error("Received None episode_data in _parse_podcast_episode")
+ raise MediaNotFoundError("Podcast episode data not found")
+
+ asin = episode_data.get("asin", "")
+ title = episode_data.get("title", "")
+
+ # Get duration from runtime_length_min
+ runtime_minutes = episode_data.get("runtime_length_min", 0)
+ duration = runtime_minutes * 60 if runtime_minutes else 0
+
+ # Create podcast reference - use Podcast object or create ItemMapping
+ podcast_ref: Podcast | ItemMapping
+ if podcast is not None:
+ podcast_ref = podcast
+ else:
+ # Try to get parent_asin from relationships for ItemMapping
+ parent_asin = ""
+ relationships = episode_data.get("relationships", [])
+ for rel in relationships:
+ if rel.get("relationship_type") == "parent":
+ parent_asin = rel.get("asin", "")
+ break
+
+ if not parent_asin:
+ self.logger.warning(
+ "No parent_asin found for podcast episode %s; parent podcast is unknown",
+ asin,
+ )
+
+ podcast_ref = ItemMapping(
+ item_id=parent_asin or "",
+ provider=self.provider_instance,
+ name="Unknown Podcast",
+ media_type=MediaType.PODCAST,
+ )
+
+ # Create episode object
+ episode = PodcastEpisode(
+ item_id=asin,
+ provider=self.provider_instance,
+ name=title,
+ duration=duration,
+ position=position,
+ podcast=podcast_ref,
+ provider_mappings={
+ ProviderMapping(
+ item_id=asin,
+ provider_domain=self.provider_domain,
+ provider_instance=self.provider_instance,
+ )
+ },
+ )
+
+ # Set metadata
+ episode.metadata.description = _html_to_txt(
+ str(
+ episode_data.get("publisher_summary", "")
+ or episode_data.get("extended_product_description", "")
+ )
+ )
+
+ # Add images
+ image_path = episode_data.get("product_images", {}).get("500")
+ episode.metadata.images = UniqueList(self._create_images(image_path))
+
+ return episode
+
+ async def get_authors(self) -> dict[str, str]:
+ """Get all unique authors from the library.
+
+ Returns dict mapping author ASIN to author name.
+ """
+ authors: dict[str, str] = {}
+ async for item in self._fetch_library_items(
+ "contributors,product_attrs", AUDIOBOOK_CONTENT_TYPES
+ ):
+ for author in item.get("authors") or []:
+ asin = author.get("asin")
+ name = author.get("name")
+ if asin and name:
+ authors[asin] = name
+ return authors
+
+ async def get_series(self) -> dict[str, str]:
+ """Get all unique series from the library.
+
+ Returns dict mapping series ASIN to series title.
+ """
+ series: dict[str, str] = {}
+ async for item in self._fetch_library_items(
+ "series,product_attrs", AUDIOBOOK_CONTENT_TYPES
+ ):
+ for s in item.get("series") or []:
+ asin = s.get("asin")
+ title = s.get("title")
+ if asin and title:
+ series[asin] = title
+ return series
+
+ async def get_narrators(self) -> dict[str, str]:
+ """Get all unique narrators from the library.
+
+ Returns dict mapping narrator ASIN to narrator name.
+ """
+ narrators: dict[str, str] = {}
+ async for item in self._fetch_library_items(
+ "contributors,product_attrs", AUDIOBOOK_CONTENT_TYPES
+ ):
+ for narrator in item.get("narrators") or []:
+ asin = narrator.get("asin")
+ name = narrator.get("name")
+ if asin and name:
+ narrators[asin] = name
+ return narrators
+
+ async def get_genres(self) -> set[str]:
+ """Get all unique genres from the library."""
+ genres: set[str] = set()
+ async for item in self._fetch_library_items("product_attrs", AUDIOBOOK_CONTENT_TYPES):
+ for keyword in item.get("thesaurus_subject_keywords") or []:
+ genres.add(keyword.replace("_", " ").replace("-", " ").title())
+ return genres
+
+ async def get_publishers(self) -> set[str]:
+ """Get all unique publishers from the library."""
+ publishers: set[str] = set()
+ async for item in self._fetch_library_items("product_attrs", AUDIOBOOK_CONTENT_TYPES):
+ publisher = item.get("publisher_name")
+ if publisher:
+ publishers.add(publisher)
+ return publishers
+
+ async def get_audiobooks_by_author(self, author_asin: str) -> list[Audiobook]:
+ """Get all audiobooks by a specific author, sorted by release date."""
+ audiobooks: list[tuple[str, Audiobook]] = []
+ async for item in self._fetch_library_items(
+ "contributors,media,product_attrs,product_desc,series", AUDIOBOOK_CONTENT_TYPES
+ ):
+ for author in item.get("authors") or []:
+ if author.get("asin") == author_asin:
+ release_date = item.get("release_date") or "0000-00-00"
+ audiobooks.append((release_date, self._parse_audiobook(item)))
+ break
+ audiobooks.sort(key=lambda x: x[0], reverse=True)
+ return [book for _, book in audiobooks]
+
+ async def get_audiobooks_by_narrator(self, narrator_asin: str) -> list[Audiobook]:
+ """Get all audiobooks by a specific narrator, sorted by release date."""
+ audiobooks: list[tuple[str, Audiobook]] = []
+ async for item in self._fetch_library_items(
+ "contributors,media,product_attrs,product_desc,series", AUDIOBOOK_CONTENT_TYPES
+ ):
+ for narrator in item.get("narrators") or []:
+ if narrator.get("asin") == narrator_asin:
+ release_date = item.get("release_date") or "0000-00-00"
+ audiobooks.append((release_date, self._parse_audiobook(item)))
+ break
+ audiobooks.sort(key=lambda x: x[0], reverse=True)
+ return [book for _, book in audiobooks]
+
+ async def get_audiobooks_by_genre(self, genre: str) -> list[Audiobook]:
+ """Get all audiobooks matching a genre, sorted by release date."""
+ audiobooks: list[tuple[str, Audiobook]] = []
+ genre_key = genre.lower().replace(" ", "_")
+ genre_key_alt = genre.lower().replace(" ", "-")
+ async for item in self._fetch_library_items(
+ "contributors,media,product_attrs,product_desc,series", AUDIOBOOK_CONTENT_TYPES
+ ):
+ keywords = item.get("thesaurus_subject_keywords") or []
+ if genre_key in keywords or genre_key_alt in keywords:
+ release_date = item.get("release_date") or "0000-00-00"
+ audiobooks.append((release_date, self._parse_audiobook(item)))
+ audiobooks.sort(key=lambda x: x[0], reverse=True)
+ return [book for _, book in audiobooks]
+
+ async def get_audiobooks_by_publisher(self, publisher: str) -> list[Audiobook]:
+ """Get all audiobooks from a specific publisher, sorted by release date."""
+ audiobooks: list[tuple[str, Audiobook]] = []
+ async for item in self._fetch_library_items(
+ "contributors,media,product_attrs,product_desc,series", AUDIOBOOK_CONTENT_TYPES
+ ):
+ if item.get("publisher_name") == publisher:
+ release_date = item.get("release_date") or "0000-00-00"
+ audiobooks.append((release_date, self._parse_audiobook(item)))
+ audiobooks.sort(key=lambda x: x[0], reverse=True)
+ return [book for _, book in audiobooks]
+
+ async def get_audiobooks_by_series(self, series_asin: str) -> list[Audiobook]:
+ """Get all audiobooks in a specific series, ordered by sequence."""
+ audiobooks: list[tuple[float, Audiobook]] = []
+ async for item in self._fetch_library_items(
+ "contributors,media,product_attrs,product_desc,series", AUDIOBOOK_CONTENT_TYPES
+ ):
+ for s in item.get("series") or []:
+ if s.get("asin") == series_asin:
+ sequence = s.get("sequence")
+ try:
+ seq_num = float(sequence) if sequence else 999
+ except (ValueError, TypeError):
+ seq_num = 999
+ audiobooks.append((seq_num, self._parse_audiobook(item)))
+ break
+ audiobooks.sort(key=lambda x: x[0])
+ return [book for _, book in audiobooks]
+
async def deregister(self) -> None:
"""Deregister this provider from Audible."""
await asyncio.to_thread(self.client.auth.deregister_device)
async def audible_get_auth_info(locale: str) -> tuple[str, str, str]:
- """
- Generate the login URL and auth info for Audible OAuth flow asynchronously.
-
- Args:
- locale: The locale string (e.g., 'us', 'uk', 'de') to determine region settings
- Returns:
- A tuple containing:
- - code_verifier (str): The OAuth code verifier string
- - oauth_url (str): The complete OAuth URL for login
- - serial (str): The generated device serial number
+ """Generate the login URL and auth info for Audible OAuth flow.
+
+ :param locale: The locale string (e.g., 'us', 'uk', 'de').
+ :return: Tuple of (code_verifier, oauth_url, serial).
"""
locale_obj = audible.localization.Locale(locale)
code_verifier = await asyncio.to_thread(audible.login.create_code_verifier)
--- /dev/null
+"""Test Audible Provider."""
+
+from typing import Any
+from unittest.mock import AsyncMock, MagicMock, patch
+
+import pytest
+from music_assistant_models.enums import MediaType
+from music_assistant_models.media_items import PodcastEpisode
+
+from music_assistant.providers.audible import Audibleprovider
+from music_assistant.providers.audible.audible_helper import AudibleHelper
+
+
+@pytest.fixture
+def mass_mock() -> AsyncMock:
+ """Return a mock MusicAssistant instance."""
+ mass = AsyncMock()
+ mass.http_session = AsyncMock()
+ mass.cache.get = AsyncMock(return_value=None)
+ mass.cache.set = AsyncMock()
+ return mass
+
+
+@pytest.fixture
+def audible_client_mock() -> AsyncMock:
+ """Return a mock Audible AsyncClient."""
+ client = AsyncMock()
+ client.post = AsyncMock()
+ client.put = AsyncMock()
+ return client
+
+
+@pytest.fixture
+def helper(mass_mock: AsyncMock, audible_client_mock: AsyncMock) -> AudibleHelper:
+ """Return an AudibleHelper instance."""
+ return AudibleHelper(
+ mass=mass_mock,
+ client=audible_client_mock,
+ provider_domain="audible",
+ provider_instance="audible_test",
+ )
+
+
+@pytest.fixture
+def provider(mass_mock: AsyncMock) -> Audibleprovider:
+ """Return an Audibleprovider instance."""
+ manifest = MagicMock()
+ manifest.domain = "audible"
+ config = MagicMock()
+
+ def get_value(key: str) -> str | None:
+ if key == "locale":
+ return "us"
+ if key == "auth_file":
+ return "mock_auth_file"
+ return None
+
+ config.get_value.side_effect = get_value
+ config.get_value.return_value = None # Default
+
+ # Patch logger setLevel to avoid ValueError with 'us'
+ with patch("music_assistant.models.provider.logging.Logger.setLevel"):
+ prov = Audibleprovider(mass_mock, manifest, config)
+
+ prov.helper = MagicMock(spec=AudibleHelper)
+ return prov
+
+
+async def test_pagination_get_library(helper: AudibleHelper) -> None:
+ """Test get_library uses pagination correctly."""
+ # To trigger pagination, the first page must have 50 items (page_size)
+ # We generate 50 dummy items for page 1
+ page1_items = [
+ {
+ "asin": f"1_{i}",
+ "title": f"Book 1_{i}",
+ "content_delivery_type": "SinglePartBook",
+ "authors": [],
+ }
+ for i in range(50)
+ ]
+ page2_items = [
+ {
+ "asin": "2_1",
+ "title": "Book 2_1",
+ "content_delivery_type": "SinglePartBook",
+ "authors": [],
+ },
+ ]
+
+ # Mock side_effect for _call_api
+ async def side_effect(_: str, **kwargs: Any) -> dict[str, Any]:
+ if kwargs.get("page") == 1:
+ return {"items": page1_items, "total_results": 51}
+ if kwargs.get("page") == 2:
+ return {"items": page2_items, "total_results": 51}
+ return {"items": [], "total_results": 51}
+
+ with patch.object(helper, "_call_api", side_effect=side_effect) as mock_call:
+ books = []
+ async for book in helper.get_library():
+ books.append(book)
+
+ # 50 from page 1 + 1 from page 2 = 51
+ assert len(books) == 51
+ assert books[0].item_id == "1_0"
+ assert books[50].item_id == "2_1"
+
+ # Verify pagination calls
+ assert mock_call.call_count >= 2
+ calls = mock_call.call_args_list
+ assert calls[0].kwargs["page"] == 1
+ assert calls[1].kwargs["page"] == 2
+
+
+async def test_pagination_browse_helpers(helper: AudibleHelper) -> None:
+ """Test browse helpers (like get_authors) use pagination."""
+ # Mock _call_api to return items across pages
+ # Page 1 must be full (50 items) to trigger next page
+ page1_items = [
+ {
+ "asin": f"1_{i}",
+ "content_delivery_type": "SinglePartBook",
+ "authors": [{"asin": f"A1_{i}", "name": f"Author 1_{i}"}],
+ }
+ for i in range(50)
+ ]
+ page2_items = [
+ {
+ "asin": "2_1",
+ "content_delivery_type": "SinglePartBook",
+ "authors": [{"asin": "A2_1", "name": "Author 2_1"}],
+ },
+ ]
+
+ async def side_effect(_: str, **kwargs: Any) -> dict[str, Any]:
+ if kwargs.get("page") == 1:
+ return {"items": page1_items}
+ if kwargs.get("page") == 2:
+ return {"items": page2_items}
+ return {"items": []}
+
+ with patch.object(helper, "_call_api", side_effect=side_effect):
+ authors = await helper.get_authors()
+
+ # 50 authors from page 1 + 1 from page 2 = 51
+ assert len(authors) == 51
+ assert authors["A1_0"] == "Author 1_0"
+ assert authors["A2_1"] == "Author 2_1"
+
+
+async def test_acr_caching(helper: AudibleHelper, audible_client_mock: AsyncMock) -> None:
+ """Test ACR is cached and used for set_last_position."""
+ asin = "B001"
+
+ # Mock get_stream response
+ audible_client_mock.post.return_value = {
+ "content_license": {
+ "acr": "test_acr_value",
+ "license_response": "http://stream.url",
+ "content_metadata": {"content_reference": {"content_size_in_bytes": 1000}},
+ }
+ }
+
+ # 1. Call get_stream to populate cache
+ await helper.get_stream(asin, MediaType.AUDIOBOOK)
+ assert (asin, MediaType.AUDIOBOOK) in helper._acr_cache
+ assert helper._acr_cache[(asin, MediaType.AUDIOBOOK)] == "test_acr_value"
+
+ # Reset mock to ensure it's not called again if we were to call get_stream
+ # (but we check cache usage in set_last_position)
+ audible_client_mock.post.reset_mock()
+
+ # 2. Call set_last_position -> should use cache and NOT call get_stream
+ # (which calls client.post)
+ # We patch get_stream to verify it's NOT called
+ with patch.object(helper, "get_stream") as mock_get_stream:
+ await helper.set_last_position(asin, 10, MediaType.AUDIOBOOK)
+
+ mock_get_stream.assert_not_called()
+ audible_client_mock.put.assert_called_once()
+ call_args = audible_client_mock.put.call_args[1]
+ assert call_args["body"]["acr"] == "test_acr_value"
+
+
+async def test_set_last_position_without_cache(
+ helper: AudibleHelper, audible_client_mock: AsyncMock
+) -> None:
+ """Test set_last_position fetches ACR if not in cache."""
+ asin = "B002"
+
+ # Mock get_stream internal call
+ with patch.object(helper, "get_stream") as mock_get_stream:
+ mock_get_stream.return_value.data = {"acr": "fetched_acr"}
+
+ await helper.set_last_position(asin, 10, MediaType.AUDIOBOOK)
+
+ mock_get_stream.assert_called_once_with(asin=asin, media_type=MediaType.AUDIOBOOK)
+ audible_client_mock.put.assert_called_once()
+ call_args = audible_client_mock.put.call_args[1]
+ assert call_args["body"]["acr"] == "fetched_acr"
+
+
+async def test_podcast_parent_fallback(helper: AudibleHelper) -> None:
+ """Test podcast episode parsing handles missing parent ASIN."""
+ episode_data = {
+ "asin": "ep1",
+ "title": "Episode 1",
+ "relationships": [], # No parent relationship
+ }
+
+ # Should not raise error, but log warning and use empty/self ASIN for parent
+ episode = helper._parse_podcast_episode(episode_data, None, 0)
+
+ assert isinstance(episode, PodcastEpisode)
+ assert episode.podcast.item_id == ""
+
+
+async def test_browse_decoding(provider: Audibleprovider) -> None:
+ """Test browse path decoding."""
+ # We need to test the provider's browse method, not the helper's.
+ # We mocked the helper in the provider fixture.
+
+ # Mock helper methods to return empty lists/dicts so we just check calls
+ provider.helper.get_audiobooks_by_author = AsyncMock(return_value=[]) # type: ignore[method-assign]
+ provider.helper.get_audiobooks_by_genre = AsyncMock(return_value=[]) # type: ignore[method-assign]
+
+ # Test Author with special chars
+ await provider.browse("audible://authors/Author%20Name")
+ provider.helper.get_audiobooks_by_author.assert_called_with("Author Name")
+
+ # Test Genre with slash (encoded)
+ await provider.browse("audible://genres/Sci-Fi%2FFantasy")
+ provider.helper.get_audiobooks_by_genre.assert_called_with("Sci-Fi/Fantasy")