import hashlib
import html
import json
+import logging
import os
import re
from collections.abc import AsyncGenerator
import audible.register
from audible import AsyncClient
from music_assistant_models.enums import ContentType, ImageType, MediaType, StreamType
-from music_assistant_models.errors import LoginFailed
+from music_assistant_models.errors import LoginFailed, MediaNotFoundError
from music_assistant_models.media_items import (
Audiobook,
AudioFormat,
CACHE_CATEGORY_AUDIOBOOK = 1
CACHE_CATEGORY_CHAPTERS = 2
+# Cache for authenticator objects to avoid repeated file reads
+_AUTH_CACHE: dict[str, audible.Authenticator] = {}
+
+
+async def cached_authenticator_from_file(path: str) -> audible.Authenticator:
+ """Get an authenticator from file with caching to avoid repeated file reads."""
+ logger = logging.getLogger("audible_helper")
+ if path in _AUTH_CACHE:
+ return _AUTH_CACHE[path]
+
+ logger.debug("Loading authenticator from file %s and caching it", path)
+ auth = await asyncio.to_thread(audible.Authenticator.from_file, path)
+ _AUTH_CACHE[path] = auth
+ return auth
+
class AudibleHelper:
"""Helper for parsing and using audible api."""
client: AsyncClient,
provider_domain: str,
provider_instance: str,
+ logger: logging.Logger | None = None,
):
"""Initialize the Audible Helper."""
self.mass = mass
self.client = client
self.provider_domain = provider_domain
self.provider_instance = provider_instance
+ self.logger = logger or logging.getLogger("audible_helper")
async def get_library(self) -> AsyncGenerator[Audiobook, None]:
"""Fetch the user's library with pagination."""
page = 1
page_size = 50
+ total_processed = 0
+ max_iterations = 100
+ iteration = 0
+
+ while iteration < max_iterations:
+ iteration += 1
+
+ self.logger.debug(
+ "Audible: Fetching library page %s with page_size %s (processed so far: %s)",
+ page,
+ page_size,
+ total_processed,
+ )
- while True:
library = await self._call_api(
"library",
use_cache=False,
)
items = library.get("items", [])
- if not items:
+ total_items = library.get("total_results", 0)
+
+ self.logger.debug(
+ "Audible: Got %s items (total reported by API: %s)", len(items), total_items
+ )
+
+ if not items or len(items) < page_size:
+ self.logger.debug(
+ "Audible: No more items or fewer than page size returned, "
+ "ending pagination (processed %s items)",
+ total_processed,
+ )
break
+ items_processed_this_page = 0
for audiobook_data in items:
+ content_type = audiobook_data.get("content_delivery_type", "")
+ if content_type in ("PodcastParent", "NonAudio"):
+ self.logger.debug(
+ "Skipping non-audiobook item: %s (%s)",
+ audiobook_data.get("title", "Unknown"),
+ content_type,
+ )
+ total_processed += 1
+ continue
+
asin = audiobook_data.get("asin")
cached_book = await self.mass.cache.get(
key=asin, base_key=CACHE_DOMAIN, category=CACHE_CATEGORY_AUDIOBOOK, default=None
)
- if cached_book is not None:
- album = await self._parse_audiobook(cached_book)
- yield album
- else:
- album = await self._parse_audiobook(audiobook_data)
- yield album
-
- # Check if we've reached the end
- total_items = library.get("total_results", 0)
- if page * page_size >= total_items:
- break
+ try:
+ if cached_book is not None:
+ album = await self._parse_audiobook(cached_book)
+ yield album
+ else:
+ album = await self._parse_audiobook(audiobook_data)
+ yield album
+
+ total_processed += 1
+ items_processed_this_page += 1
+ except MediaNotFoundError as exc:
+ self.logger.warning(f"Skipping invalid audiobook: {exc}")
+ total_processed += 1
+ continue
+
+ self.logger.debug(
+ "Audible: Processed %s valid audiobooks on page %s", items_processed_this_page, page
+ )
page += 1
+ self.logger.debug(
+ "Audible: Moving to page %s (processed: %s, total reported: %s)",
+ page,
+ total_processed,
+ total_items,
+ )
- async def get_audiobook(self, asin: str, use_cache: bool = True) -> Audiobook | None:
+ if iteration >= max_iterations:
+ self.logger.warning(
+ "Audible: Reached maximum iteration limit (%s) with %s items processed",
+ max_iterations,
+ total_processed,
+ )
+ else:
+ self.logger.info(
+ "Audible: Successfully retrieved %s audiobooks from library", total_processed
+ )
+
+ async def get_audiobook(self, asin: str, use_cache: bool = True) -> Audiobook:
"""Fetch the audiobook by asin."""
if use_cache:
cached_book = await self.mass.cache.get(
)
if response is None:
- return None
+ raise MediaNotFoundError(f"Audiobook with ASIN {asin} not found")
+
+ item_data = response.get("item")
+ if item_data is None:
+ raise MediaNotFoundError(f"Audiobook data for ASIN {asin} is empty")
+
await self.mass.cache.set(
key=asin,
base_key=CACHE_DOMAIN,
category=CACHE_CATEGORY_AUDIOBOOK,
- data=response.get("item"),
+ data=item_data,
)
- return await self._parse_audiobook(response.get("item"))
+ return await self._parse_audiobook(item_data)
async def get_stream(self, asin: str) -> StreamDetails:
"""Get stream details for a track (audiobook chapter)."""
- chapters = await self._fetch_chapters(asin=asin)
+ if not asin:
+ self.logger.error("Invalid ASIN provided to get_stream")
+ raise ValueError("Invalid ASIN provided to get_stream")
- duration = sum(chapter["length_ms"] for chapter in chapters) / 1000
-
- playback_info = await self.client.post(
- f"content/{asin}/licenserequest",
- body={
- "quality": "High",
- "response_groups": "content_reference,certificate",
- "consumption_type": "Streaming",
- "supported_media_features": {
- "codecs": ["mp4a.40.2", "mp4a.40.42"],
- "drm_types": [
- "Hls",
- ],
+ chapters = await self._fetch_chapters(asin=asin)
+ if not chapters:
+ self.logger.warning(f"No chapters found for ASIN {asin}, using default duration")
+ duration = 0
+ else:
+ duration = sum(chapter["length_ms"] for chapter in chapters) / 1000
+
+ try:
+ playback_info = await self.client.post(
+ f"content/{asin}/licenserequest",
+ body={
+ "quality": "High",
+ "response_groups": "content_reference,certificate",
+ "consumption_type": "Streaming",
+ "supported_media_features": {
+ "codecs": ["mp4a.40.2", "mp4a.40.42"],
+ "drm_types": [
+ "Hls",
+ ],
+ },
+ "spatial": False,
},
- "spatial": False,
- },
- )
- size = (
- playback_info.get("content_license")
- .get("content_metadata")
- .get("content_reference")
- .get("content_size_in_bytes", 0)
- )
+ )
+
+ content_license = playback_info.get("content_license", {})
+ if not content_license:
+ self.logger.error(f"No content_license in playback_info for ASIN {asin}")
+ raise ValueError(f"Missing content_license for ASIN {asin}")
+
+ content_metadata = content_license.get("content_metadata", {})
+ content_reference = content_metadata.get("content_reference", {})
+ size = content_reference.get("content_size_in_bytes", 0)
- m3u8_url = playback_info.get("content_license").get("license_response")
- acr = playback_info.get("content_license").get("acr")
+ m3u8_url = content_license.get("license_response")
+ if not m3u8_url:
+ self.logger.error(f"No license_response (stream URL) for ASIN {asin}")
+ raise ValueError(f"Missing stream URL for ASIN {asin}")
+
+ acr = content_license.get("acr")
+ except Exception as exc:
+ self.logger.error(f"Error getting stream details for ASIN {asin}: {exc}")
+ raise ValueError(f"Failed to get stream details: {exc}") from exc
return StreamDetails(
provider=self.provider_instance,
size=size,
data={"acr": acr},
)
- async def _fetch_chapters(self, asin: str) -> Any:
+ async def _fetch_chapters(self, asin: str) -> list[dict[str, Any]]:
+ """Fetch chapter data for an audiobook."""
+ if not asin or asin == "error":
+ self.logger.warning(
+ "Invalid ASIN provided to _fetch_chapters, returning empty chapter list"
+ )
+ return []
+
chapters_data: list[Any] = await self.mass.cache.get(
base_key=CACHE_DOMAIN, category=CACHE_CATEGORY_CHAPTERS, key=asin, default=[]
)
+
if not chapters_data:
- response = await self._call_api(
- f"content/{asin}/metadata",
- response_groups="chapter_info, always-returned, content_reference, content_url",
- chapter_titles_type="Flat",
- )
- chapters_data = response.get("content_metadata").get("chapter_info").get("chapters")
- await self.mass.cache.set(
- base_key=CACHE_DOMAIN,
- category=CACHE_CATEGORY_CHAPTERS,
- key=asin,
- data=chapters_data,
- )
+ try:
+ response = await self._call_api(
+ f"content/{asin}/metadata",
+ response_groups="chapter_info, always-returned, content_reference, content_url",
+ chapter_titles_type="Flat",
+ )
+
+ if not response:
+ self.logger.warning(f"Failed to get metadata for ASIN {asin}")
+ return []
+
+ content_metadata = response.get("content_metadata")
+ if not content_metadata:
+ self.logger.warning(f"No content_metadata for ASIN {asin}")
+ return []
+
+ chapter_info = content_metadata.get("chapter_info")
+ if not chapter_info:
+ self.logger.warning(f"No chapter_info for ASIN {asin}")
+ return []
+
+ chapters_data = chapter_info.get("chapters", [])
+
+ await self.mass.cache.set(
+ base_key=CACHE_DOMAIN,
+ category=CACHE_CATEGORY_CHAPTERS,
+ key=asin,
+ data=chapters_data,
+ )
+ except Exception as exc:
+ self.logger.error(f"Error fetching chapters for ASIN {asin}: {exc}")
+ chapters_data = []
+
return chapters_data
async def get_last_postion(self, asin: str) -> int:
"""Fetch last position of asin."""
- response = await self._call_api("annotations/lastpositions", asins=asin)
- return int(
- response.get("asin_last_position_heard_annots")[0]
- .get("last_position_heard")
- .get("position_ms", 0)
- )
+ if not asin or asin == "error":
+ return 0
+
+ try:
+ response = await self._call_api("annotations/lastpositions", asins=asin)
+
+ if not response:
+ self.logger.debug(f"No last position data available for ASIN {asin}")
+ return 0
+
+ annotations = response.get("asin_last_position_heard_annots")
+ if not annotations or not isinstance(annotations, list) or len(annotations) == 0:
+ self.logger.debug(f"No annotations found for ASIN {asin}")
+ return 0
+
+ annotation = annotations[0]
+ if not annotation or not isinstance(annotation, dict):
+ self.logger.debug(f"Invalid annotation for ASIN {asin}")
+ return 0
+
+ last_position = annotation.get("last_position_heard")
+ if not last_position or not isinstance(last_position, dict):
+ self.logger.debug(f"Invalid last_position for ASIN {asin}")
+ return 0
- async def set_last_position(self, asin: str, pos: int) -> Any:
- """Report last position."""
+ position_ms = last_position.get("position_ms", 0)
+ return int(position_ms)
+
+ except Exception as exc:
+ self.logger.error(f"Error getting last position for ASIN {asin}: {exc}")
+ return 0
+
+ async def set_last_position(self, asin: str, pos: int) -> None:
+ """Report last position to Audible.
+
+ Args:
+ asin: The audiobook ID
+ pos: Position in seconds
+ """
+ if not asin or asin == "error" or pos <= 0:
+ return
+
+ try:
+ position_ms = pos * 1000
+
+ stream_details = await self.get_stream(asin=asin)
+ acr = stream_details.data.get("acr")
+
+ if not acr:
+ self.logger.warning(f"No ACR available for ASIN {asin}, cannot report position")
+ return
+
+ await self.client.put(
+ f"lastpositions/{asin}", body={"acr": acr, "asin": asin, "position_ms": position_ms}
+ )
+
+ self.logger.debug(f"Successfully reported position {position_ms}ms for ASIN {asin}")
+
+ except (KeyError, TypeError) as exc:
+ self.logger.error(
+ f"Error accessing data while reporting position for ASIN {asin}: {exc}"
+ )
+ except TimeoutError as exc:
+ self.logger.error(f"Timeout while reporting position for ASIN {asin}: {exc}")
+ except ConnectionError as exc:
+ self.logger.error(f"Connection error while reporting position for ASIN {asin}: {exc}")
+ except Exception as exc:
+ self.logger.error(f"Unexpected error reporting position for ASIN {asin}: {exc}")
async def _call_api(self, path: str, **kwargs: Any) -> Any:
response = None
- use_cache = False
+ use_cache = kwargs.pop("use_cache", False)
params_str = json.dumps(kwargs, sort_keys=True)
params_hash = hashlib.md5(params_str.encode()).hexdigest()
cache_key_with_params = f"{path}:{params_hash}"
)
return response
- async def _parse_audiobook(self, audiobook_data: dict[str, Any]) -> Audiobook:
+ async def _parse_audiobook(self, audiobook_data: dict[str, Any] | None) -> Audiobook:
+ if audiobook_data is None:
+ self.logger.error("Received None audiobook_data in _parse_audiobook")
+ raise MediaNotFoundError("Audiobook data not found")
+
asin = audiobook_data.get("asin", "")
title = audiobook_data.get("title", "")
authors = []
narrators = []
- for narrator in audiobook_data.get("narrators", []):
- narrators.append(narrator.get("name"))
- for author in audiobook_data.get("authors", []):
- authors.append(author.get("name"))
+
+ narrators_list = audiobook_data.get("narrators") or []
+ if isinstance(narrators_list, list):
+ for narrator in narrators_list:
+ if narrator and isinstance(narrator, dict):
+ narrators.append(narrator.get("name", "Unknown Narrator"))
+
+ authors_list = audiobook_data.get("authors") or []
+ if isinstance(authors_list, list):
+ for author in authors_list:
+ if author and isinstance(author, dict):
+ authors.append(author.get("name", "Unknown Author"))
chapters_data = await self._fetch_chapters(asin=asin)
duration = sum(chapter["length_ms"] for chapter in chapters_data) / 1000
book = Audiobook(
for index, chapter_data in enumerate(chapters_data):
start = int(chapter_data.get("start_offset_sec", 0))
length = int(chapter_data.get("length_ms", 0)) / 1000
+ raw_title = chapter_data.get("title")
+ chapter_title: str
+ if raw_title is None:
+ chapter_title = f"Chapter {index + 1}"
+ elif isinstance(raw_title, str):
+ chapter_title = raw_title
+ else:
+ chapter_title = str(raw_title)
+
chapters.append(
MediaItemChapter(
- position=index, name=chapter_data.get("title"), start=start, end=start + length
+ position=index, name=chapter_title, start=start, end=start + length
)
)
book.metadata.chapters = chapters
return txt
-# Audible Authorization
async def audible_get_auth_info(locale: str) -> tuple[str, str, str]:
"""
Generate the login URL and auth info for Audible OAuth flow asynchronously.
- oauth_url (str): The complete OAuth URL for login
- serial (str): The generated device serial number
"""
- # Create locale object (not I/O operation)
locale_obj = audible.localization.Locale(locale)
-
- # Create code verifier (potential crypto operations)
code_verifier = await asyncio.to_thread(audible.login.create_code_verifier)
-
- # Build OAuth URL (potential network operations)
oauth_url, serial = await asyncio.to_thread(
audible.login.build_oauth_url,
country_code=locale_obj.country_code,
auth = audible.Authenticator()
auth.locale = audible.localization.Locale(locale)
- # URL parsing (not I/O operation)
response_url_parsed = urlparse(response_url)
parsed_qs = parse_qs(response_url_parsed.query)
if not authorization_codes:
raise LoginFailed("Authorization code not found in the provided URL.")
- # Get the first authorization code from the list
authorization_code = authorization_codes[0]
-
- # Register device (network operation)
registration_data = await asyncio.to_thread(
audible.register.register,
authorization_code=authorization_code,