self.provider_instance = provider_instance
self.logger = logger or logging.getLogger("audible_helper")
+ async def _process_audiobook_item(
+ self, audiobook_data: dict[str, Any], total_processed: int
+ ) -> tuple[Audiobook | None, int]:
+ """Process a single audiobook item from the library."""
+ content_type = audiobook_data.get("content_delivery_type", "")
+ if content_type in ("PodcastParent", "NonAudio"):
+ self.logger.debug(
+ "Skipping non-audiobook item: %s (%s)",
+ audiobook_data.get("title", "Unknown"),
+ content_type,
+ )
+ return None, total_processed + 1
+
+ # Ensure asin is a valid string
+ asin = str(audiobook_data.get("asin", ""))
+ cached_book = None
+ if asin:
+ cached_book = await self.mass.cache.get(
+ key=asin, base_key=CACHE_DOMAIN, category=CACHE_CATEGORY_AUDIOBOOK, default=None
+ )
+
+ try:
+ if cached_book is not None:
+ album = await self._parse_audiobook(cached_book)
+ else:
+ album = await self._parse_audiobook(audiobook_data)
+ return album, total_processed + 1
+ except MediaNotFoundError as exc:
+ self.logger.warning(f"Skipping invalid audiobook: {exc}")
+ return None, total_processed + 1
+ except Exception as exc:
+ self.logger.warning(
+ f"Error processing audiobook {audiobook_data.get('asin', 'unknown')}: {exc}"
+ )
+ return None, total_processed + 1
+
async def get_library(self) -> AsyncGenerator[Audiobook, None]:
"""Fetch the user's library with pagination."""
response_groups = [
while iteration < max_iterations:
iteration += 1
-
self.logger.debug(
"Audible: Fetching library page %s with page_size %s (processed so far: %s)",
page,
items = library.get("items", [])
total_items = library.get("total_results", 0)
-
self.logger.debug(
"Audible: Got %s items (total reported by API: %s)", len(items), total_items
)
items_processed_this_page = 0
for audiobook_data in items:
- content_type = audiobook_data.get("content_delivery_type", "")
- if content_type in ("PodcastParent", "NonAudio"):
- self.logger.debug(
- "Skipping non-audiobook item: %s (%s)",
- audiobook_data.get("title", "Unknown"),
- content_type,
- )
- total_processed += 1
- continue
-
- asin = audiobook_data.get("asin")
- cached_book = await self.mass.cache.get(
- key=asin, base_key=CACHE_DOMAIN, category=CACHE_CATEGORY_AUDIOBOOK, default=None
+ album, total_processed = await self._process_audiobook_item(
+ audiobook_data, total_processed
)
-
- try:
- if cached_book is not None:
- album = await self._parse_audiobook(cached_book)
- yield album
- else:
- album = await self._parse_audiobook(audiobook_data)
- yield album
-
- total_processed += 1
+ if album:
+ yield album
items_processed_this_page += 1
- except MediaNotFoundError as exc:
- self.logger.warning(f"Skipping invalid audiobook: {exc}")
- total_processed += 1
- continue
self.logger.debug(
"Audible: Processed %s valid audiobooks on page %s", items_processed_this_page, page
if len(items) < page_size:
self.logger.debug(
- "Audible: Fewer than page size returned, "
- "ending pagination (processed %s items)",
+ "Audible: Fewer than page size returned, ending pagination "
+ "(processed %s items)",
total_processed,
)
break
self.logger.warning(f"No chapters found for ASIN {asin}, using default duration")
duration = 0
else:
- duration = sum(chapter["length_ms"] for chapter in chapters) / 1000
+ try:
+ duration = sum(chapter.get("length_ms", 0) for chapter in chapters) / 1000
+ except Exception as exc:
+ self.logger.warning(f"Error calculating duration for ASIN {asin}: {exc}")
+ duration = 0
try:
playback_info = await self.client.post(
self.logger.error(f"No license_response (stream URL) for ASIN {asin}")
raise ValueError(f"Missing stream URL for ASIN {asin}")
- acr = content_license.get("acr")
+ acr = content_license.get("acr", "")
except Exception as exc:
self.logger.error(f"Error getting stream details for ASIN {asin}: {exc}")
raise ValueError(f"Failed to get stream details: {exc}") from exc
self.logger.warning(f"No chapter_info for ASIN {asin}")
return []
- chapters_data = chapter_info.get("chapters", [])
+ chapters_data = chapter_info.get("chapters") or []
await self.mass.cache.set(
base_key=CACHE_DOMAIN,
)
return response
+ def _parse_contributors(
+ self, contributors_list: list[dict[str, Any]] | None, default_name: str
+ ) -> list[str]:
+ """Parse contributors (authors, narrators) from API response."""
+ result: list[str] = []
+ contributors: list[dict[str, Any]] = contributors_list or []
+ if isinstance(contributors, list):
+ for contributor in contributors:
+ if contributor and isinstance(contributor, dict):
+ result.append(contributor.get("name", default_name))
+ return result
+
+ def _create_images(self, image_path: str | None) -> list[MediaItemImage]:
+ """Create image objects if image path exists."""
+ images: list[MediaItemImage] = []
+ if image_path:
+ images.append(
+ MediaItemImage(
+ type=ImageType.THUMB,
+ path=image_path,
+ provider=self.provider_instance,
+ remotely_accessible=True,
+ )
+ )
+ images.append(
+ MediaItemImage(
+ type=ImageType.CLEARART,
+ path=image_path,
+ provider=self.provider_instance,
+ remotely_accessible=True,
+ )
+ )
+ return images
+
+ def _parse_chapter_data(self, chapter_data: dict[str, Any], index: int) -> MediaItemChapter:
+ """Parse chapter data into MediaItemChapter object."""
+ try:
+ start = int(chapter_data.get("start_offset_sec", 0))
+ except (TypeError, ValueError):
+ start = 0
+
+ try:
+ length = int(chapter_data.get("length_ms", 0)) / 1000
+ except (TypeError, ValueError):
+ length = 0
+
+ raw_title = chapter_data.get("title")
+ chapter_title: str
+ if raw_title is None:
+ chapter_title = f"Chapter {index + 1}"
+ elif isinstance(raw_title, str):
+ chapter_title = raw_title
+ else:
+ chapter_title = str(raw_title)
+
+ return MediaItemChapter(position=index, name=chapter_title, start=start, end=start + length)
+
async def _parse_audiobook(self, audiobook_data: dict[str, Any] | None) -> Audiobook:
+ """Parse audiobook data from API response."""
if audiobook_data is None:
self.logger.error("Received None audiobook_data in _parse_audiobook")
raise MediaNotFoundError("Audiobook data not found")
asin = audiobook_data.get("asin", "")
title = audiobook_data.get("title", "")
- authors = []
- narrators = []
-
- narrators_list = audiobook_data.get("narrators") or []
- if isinstance(narrators_list, list):
- for narrator in narrators_list:
- if narrator and isinstance(narrator, dict):
- narrators.append(narrator.get("name", "Unknown Narrator"))
-
- authors_list = audiobook_data.get("authors") or []
- if isinstance(authors_list, list):
- for author in authors_list:
- if author and isinstance(author, dict):
- authors.append(author.get("name", "Unknown Author"))
+
+ # Parse authors and narrators
+ narrators = self._parse_contributors(audiobook_data.get("narrators"), "Unknown Narrator")
+ authors = self._parse_contributors(audiobook_data.get("authors"), "Unknown Author")
+
+ # Get chapters and calculate duration
chapters_data = await self._fetch_chapters(asin=asin)
- duration = sum(chapter["length_ms"] for chapter in chapters_data) / 1000
+ try:
+ duration = sum(chapter.get("length_ms", 0) for chapter in chapters_data) / 1000
+ except Exception as exc:
+ self.logger.warning(f"Error calculating duration for audiobook {asin}: {exc}")
+ duration = 0
+
+ # Create audiobook object
book = Audiobook(
item_id=asin,
provider=self.provider_instance,
authors=UniqueList(authors),
narrators=UniqueList(narrators),
)
+
+ # Set metadata
book.metadata.copyright = audiobook_data.get("copyright")
book.metadata.description = _html_to_txt(
str(audiobook_data.get("extended_product_description", ""))
)
- book.metadata.languages = UniqueList([audiobook_data.get("language", "")])
+ book.metadata.languages = UniqueList([audiobook_data.get("language") or ""])
book.metadata.release_date = audiobook_data.get("release_date")
+
+ # Set review if available
reviews = audiobook_data.get("editorial_reviews", [])
- if reviews:
- book.metadata.review = _html_to_txt(reviews[0])
+ if reviews and reviews[0]:
+ book.metadata.review = _html_to_txt(str(reviews[0]))
+
+ # Set genres
book.metadata.genres = {
- genre.replace("_", " ") for genre in audiobook_data.get("platinum_keywords", "")
+ genre.replace("_", " ") for genre in (audiobook_data.get("platinum_keywords") or [])
}
- book.metadata.images = UniqueList(
- [
- MediaItemImage(
- type=ImageType.THUMB,
- path=audiobook_data.get("product_images", {}).get("500"),
- provider=self.provider_instance,
- remotely_accessible=True,
- ),
- MediaItemImage(
- type=ImageType.CLEARART,
- path=audiobook_data.get("product_images", {}).get("500"),
- provider=self.provider_instance,
- remotely_accessible=True,
- ),
- ]
- )
- chapters = []
- for index, chapter_data in enumerate(chapters_data):
- start = int(chapter_data.get("start_offset_sec", 0))
- length = int(chapter_data.get("length_ms", 0)) / 1000
- raw_title = chapter_data.get("title")
- chapter_title: str
- if raw_title is None:
- chapter_title = f"Chapter {index + 1}"
- elif isinstance(raw_title, str):
- chapter_title = raw_title
- else:
- chapter_title = str(raw_title)
+ # Add images
+ image_path = audiobook_data.get("product_images", {}).get("500")
+ book.metadata.images = UniqueList(self._create_images(image_path))
- chapters.append(
- MediaItemChapter(
- position=index, name=chapter_title, start=start, end=start + length
- )
- )
+ # Parse chapters
+ chapters: list[MediaItemChapter] = [
+ self._parse_chapter_data(chapter, idx) for idx, chapter in enumerate(chapters_data)
+ ]
book.metadata.chapters = chapters
+
+ # Get resume position
book.resume_position_ms = await self.get_last_postion(asin=asin)
return book