Audible provider: Fixes pagination and small cleanups (#1982)
authorZtripez <reg@otherland.nu>
Tue, 4 Mar 2025 21:47:45 +0000 (22:47 +0100)
committerMarcel van der Veldt <m.vanderveldt@outlook.com>
Wed, 5 Mar 2025 20:51:59 +0000 (21:51 +0100)
music_assistant/providers/audible/__init__.py
music_assistant/providers/audible/audible_helper.py

index 03f3a2b58d80d6d37576a15e2e562d22e5a7f086..3f7bd329262c116b4b3ba983d28693e71d3b354e 100644 (file)
@@ -17,13 +17,14 @@ from music_assistant_models.config_entries import (
     ProviderConfig,
 )
 from music_assistant_models.enums import ConfigEntryType, EventType, MediaType, ProviderFeature
-from music_assistant_models.errors import LoginFailed
+from music_assistant_models.errors import LoginFailed, MediaNotFoundError
 
 from music_assistant.models.music_provider import MusicProvider
 from music_assistant.providers.audible.audible_helper import (
     AudibleHelper,
     audible_custom_login,
     audible_get_auth_info,
+    cached_authenticator_from_file,
     check_file_exists,
     remove_file,
 )
@@ -75,11 +76,10 @@ async def get_config_entries(
     locale = cast(str, values.get("locale", "") or "us")
     auth_file = cast(str, values.get(CONF_AUTH_FILE))
 
-    # Check if auth file exists and is valid
     auth_required = True
     if auth_file and await check_file_exists(auth_file):
         try:
-            auth = await asyncio.to_thread(audible.Authenticator.from_file, auth_file)
+            auth = await cached_authenticator_from_file(auth_file)
             auth_required = False
         except Exception:
             auth_required = True
@@ -229,14 +229,26 @@ class Audibleprovider(MusicProvider):
         """Handle asynchronous initialization of the provider."""
         await self._login()
 
+    # Cache for authenticators to avoid repeated file I/O
+    _AUTH_CACHE: dict[str, audible.Authenticator] = {}
+
     async def _login(self) -> None:
         """Authenticate with Audible using the saved authentication file."""
         try:
-            auth = await asyncio.to_thread(audible.Authenticator.from_file, self.auth_file)
+            auth = self._AUTH_CACHE.get(self.instance_id)
+
+            if auth is None:
+                self.logger.debug("Loading authenticator from file")
+                auth = await cached_authenticator_from_file(self.auth_file)
+                self._AUTH_CACHE[self.instance_id] = auth
+            else:
+                self.logger.debug("Using cached authenticator")
 
             if auth.access_token_expired:
+                self.logger.debug("Access token expired, refreshing")
                 await asyncio.to_thread(auth.refresh_access_token)
                 await asyncio.to_thread(auth.to_file, self.auth_file)
+                self._AUTH_CACHE[self.instance_id] = auth
 
             self._client = audible.AsyncClient(auth)
 
@@ -245,6 +257,7 @@ class Audibleprovider(MusicProvider):
                 client=self._client,
                 provider_instance=self.instance_id,
                 provider_domain=self.domain,
+                logger=self.logger,
             )
 
             self.logger.info("Successfully authenticated with Audible.")
@@ -270,14 +283,14 @@ class Audibleprovider(MusicProvider):
 
     async def get_audiobook(self, prov_audiobook_id: str) -> Audiobook:
         """Get full audiobook details by id."""
-        audiobook = await self.helper.get_audiobook(asin=prov_audiobook_id, use_cache=False)
-        if audiobook is None:
-            raise ValueError(f"Audiobook with id {prov_audiobook_id} not found")
-        return audiobook
+        return await self.helper.get_audiobook(asin=prov_audiobook_id, use_cache=False)
 
     async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails:
         """Get streamdetails for a audiobook based of asin."""
-        return await self.helper.get_stream(asin=item_id)
+        try:
+            return await self.helper.get_stream(asin=item_id)
+        except ValueError as exc:
+            raise MediaNotFoundError(f"Failed to get stream details for {item_id}") from exc
 
     async def on_played(
         self,
index f7d11086b632f3963aa471aae394849467d53722..fd8a1953c30a1de5380e4a0dd11c8fb07df5e8f9 100644 (file)
@@ -6,6 +6,7 @@ import asyncio
 import hashlib
 import html
 import json
+import logging
 import os
 import re
 from collections.abc import AsyncGenerator
@@ -17,7 +18,7 @@ import audible
 import audible.register
 from audible import AsyncClient
 from music_assistant_models.enums import ContentType, ImageType, MediaType, StreamType
-from music_assistant_models.errors import LoginFailed
+from music_assistant_models.errors import LoginFailed, MediaNotFoundError
 from music_assistant_models.media_items import (
     Audiobook,
     AudioFormat,
@@ -35,6 +36,21 @@ CACHE_CATEGORY_API = 0
 CACHE_CATEGORY_AUDIOBOOK = 1
 CACHE_CATEGORY_CHAPTERS = 2
 
+# Cache for authenticator objects to avoid repeated file reads
+_AUTH_CACHE: dict[str, audible.Authenticator] = {}
+
+
+async def cached_authenticator_from_file(path: str) -> audible.Authenticator:
+    """Get an authenticator from file with caching to avoid repeated file reads."""
+    logger = logging.getLogger("audible_helper")
+    if path in _AUTH_CACHE:
+        return _AUTH_CACHE[path]
+
+    logger.debug("Loading authenticator from file %s and caching it", path)
+    auth = await asyncio.to_thread(audible.Authenticator.from_file, path)
+    _AUTH_CACHE[path] = auth
+    return auth
+
 
 class AudibleHelper:
     """Helper for parsing and using audible api."""
@@ -45,12 +61,14 @@ class AudibleHelper:
         client: AsyncClient,
         provider_domain: str,
         provider_instance: str,
+        logger: logging.Logger | None = None,
     ):
         """Initialize the Audible Helper."""
         self.mass = mass
         self.client = client
         self.provider_domain = provider_domain
         self.provider_instance = provider_instance
+        self.logger = logger or logging.getLogger("audible_helper")
 
     async def get_library(self) -> AsyncGenerator[Audiobook, None]:
         """Fetch the user's library with pagination."""
@@ -65,8 +83,20 @@ class AudibleHelper:
 
         page = 1
         page_size = 50
+        total_processed = 0
+        max_iterations = 100
+        iteration = 0
+
+        while iteration < max_iterations:
+            iteration += 1
+
+            self.logger.debug(
+                "Audible: Fetching library page %s with page_size %s (processed so far: %s)",
+                page,
+                page_size,
+                total_processed,
+            )
 
-        while True:
             library = await self._call_api(
                 "library",
                 use_cache=False,
@@ -76,30 +106,76 @@ class AudibleHelper:
             )
 
             items = library.get("items", [])
-            if not items:
+            total_items = library.get("total_results", 0)
+
+            self.logger.debug(
+                "Audible: Got %s items (total reported by API: %s)", len(items), total_items
+            )
+
+            if not items or len(items) < page_size:
+                self.logger.debug(
+                    "Audible: No more items or fewer than page size returned, "
+                    "ending pagination (processed %s items)",
+                    total_processed,
+                )
                 break
 
+            items_processed_this_page = 0
             for audiobook_data in items:
+                content_type = audiobook_data.get("content_delivery_type", "")
+                if content_type in ("PodcastParent", "NonAudio"):
+                    self.logger.debug(
+                        "Skipping non-audiobook item: %s (%s)",
+                        audiobook_data.get("title", "Unknown"),
+                        content_type,
+                    )
+                    total_processed += 1
+                    continue
+
                 asin = audiobook_data.get("asin")
                 cached_book = await self.mass.cache.get(
                     key=asin, base_key=CACHE_DOMAIN, category=CACHE_CATEGORY_AUDIOBOOK, default=None
                 )
 
-                if cached_book is not None:
-                    album = await self._parse_audiobook(cached_book)
-                    yield album
-                else:
-                    album = await self._parse_audiobook(audiobook_data)
-                    yield album
-
-            # Check if we've reached the end
-            total_items = library.get("total_results", 0)
-            if page * page_size >= total_items:
-                break
+                try:
+                    if cached_book is not None:
+                        album = await self._parse_audiobook(cached_book)
+                        yield album
+                    else:
+                        album = await self._parse_audiobook(audiobook_data)
+                        yield album
+
+                    total_processed += 1
+                    items_processed_this_page += 1
+                except MediaNotFoundError as exc:
+                    self.logger.warning(f"Skipping invalid audiobook: {exc}")
+                    total_processed += 1
+                    continue
+
+            self.logger.debug(
+                "Audible: Processed %s valid audiobooks on page %s", items_processed_this_page, page
+            )
 
             page += 1
+            self.logger.debug(
+                "Audible: Moving to page %s (processed: %s, total reported: %s)",
+                page,
+                total_processed,
+                total_items,
+            )
 
-    async def get_audiobook(self, asin: str, use_cache: bool = True) -> Audiobook | None:
+        if iteration >= max_iterations:
+            self.logger.warning(
+                "Audible: Reached maximum iteration limit (%s) with %s items processed",
+                max_iterations,
+                total_processed,
+            )
+        else:
+            self.logger.info(
+                "Audible: Successfully retrieved %s audiobooks from library", total_processed
+            )
+
+    async def get_audiobook(self, asin: str, use_cache: bool = True) -> Audiobook:
         """Fetch the audiobook by asin."""
         if use_cache:
             cached_book = await self.mass.cache.get(
@@ -116,45 +192,68 @@ class AudibleHelper:
         )
 
         if response is None:
-            return None
+            raise MediaNotFoundError(f"Audiobook with ASIN {asin} not found")
+
+        item_data = response.get("item")
+        if item_data is None:
+            raise MediaNotFoundError(f"Audiobook data for ASIN {asin} is empty")
+
         await self.mass.cache.set(
             key=asin,
             base_key=CACHE_DOMAIN,
             category=CACHE_CATEGORY_AUDIOBOOK,
-            data=response.get("item"),
+            data=item_data,
         )
-        return await self._parse_audiobook(response.get("item"))
+        return await self._parse_audiobook(item_data)
 
     async def get_stream(self, asin: str) -> StreamDetails:
         """Get stream details for a track (audiobook chapter)."""
-        chapters = await self._fetch_chapters(asin=asin)
+        if not asin:
+            self.logger.error("Invalid ASIN provided to get_stream")
+            raise ValueError("Invalid ASIN provided to get_stream")
 
-        duration = sum(chapter["length_ms"] for chapter in chapters) / 1000
-
-        playback_info = await self.client.post(
-            f"content/{asin}/licenserequest",
-            body={
-                "quality": "High",
-                "response_groups": "content_reference,certificate",
-                "consumption_type": "Streaming",
-                "supported_media_features": {
-                    "codecs": ["mp4a.40.2", "mp4a.40.42"],
-                    "drm_types": [
-                        "Hls",
-                    ],
+        chapters = await self._fetch_chapters(asin=asin)
+        if not chapters:
+            self.logger.warning(f"No chapters found for ASIN {asin}, using default duration")
+            duration = 0
+        else:
+            duration = sum(chapter["length_ms"] for chapter in chapters) / 1000
+
+        try:
+            playback_info = await self.client.post(
+                f"content/{asin}/licenserequest",
+                body={
+                    "quality": "High",
+                    "response_groups": "content_reference,certificate",
+                    "consumption_type": "Streaming",
+                    "supported_media_features": {
+                        "codecs": ["mp4a.40.2", "mp4a.40.42"],
+                        "drm_types": [
+                            "Hls",
+                        ],
+                    },
+                    "spatial": False,
                 },
-                "spatial": False,
-            },
-        )
-        size = (
-            playback_info.get("content_license")
-            .get("content_metadata")
-            .get("content_reference")
-            .get("content_size_in_bytes", 0)
-        )
+            )
+
+            content_license = playback_info.get("content_license", {})
+            if not content_license:
+                self.logger.error(f"No content_license in playback_info for ASIN {asin}")
+                raise ValueError(f"Missing content_license for ASIN {asin}")
+
+            content_metadata = content_license.get("content_metadata", {})
+            content_reference = content_metadata.get("content_reference", {})
+            size = content_reference.get("content_size_in_bytes", 0)
 
-        m3u8_url = playback_info.get("content_license").get("license_response")
-        acr = playback_info.get("content_license").get("acr")
+            m3u8_url = content_license.get("license_response")
+            if not m3u8_url:
+                self.logger.error(f"No license_response (stream URL) for ASIN {asin}")
+                raise ValueError(f"Missing stream URL for ASIN {asin}")
+
+            acr = content_license.get("acr")
+        except Exception as exc:
+            self.logger.error(f"Error getting stream details for ASIN {asin}: {exc}")
+            raise ValueError(f"Failed to get stream details: {exc}") from exc
         return StreamDetails(
             provider=self.provider_instance,
             size=size,
@@ -169,40 +268,128 @@ class AudibleHelper:
             data={"acr": acr},
         )
 
-    async def _fetch_chapters(self, asin: str) -> Any:
+    async def _fetch_chapters(self, asin: str) -> list[dict[str, Any]]:
+        """Fetch chapter data for an audiobook."""
+        if not asin or asin == "error":
+            self.logger.warning(
+                "Invalid ASIN provided to _fetch_chapters, returning empty chapter list"
+            )
+            return []
+
         chapters_data: list[Any] = await self.mass.cache.get(
             base_key=CACHE_DOMAIN, category=CACHE_CATEGORY_CHAPTERS, key=asin, default=[]
         )
+
         if not chapters_data:
-            response = await self._call_api(
-                f"content/{asin}/metadata",
-                response_groups="chapter_info, always-returned, content_reference, content_url",
-                chapter_titles_type="Flat",
-            )
-            chapters_data = response.get("content_metadata").get("chapter_info").get("chapters")
-            await self.mass.cache.set(
-                base_key=CACHE_DOMAIN,
-                category=CACHE_CATEGORY_CHAPTERS,
-                key=asin,
-                data=chapters_data,
-            )
+            try:
+                response = await self._call_api(
+                    f"content/{asin}/metadata",
+                    response_groups="chapter_info, always-returned, content_reference, content_url",
+                    chapter_titles_type="Flat",
+                )
+
+                if not response:
+                    self.logger.warning(f"Failed to get metadata for ASIN {asin}")
+                    return []
+
+                content_metadata = response.get("content_metadata")
+                if not content_metadata:
+                    self.logger.warning(f"No content_metadata for ASIN {asin}")
+                    return []
+
+                chapter_info = content_metadata.get("chapter_info")
+                if not chapter_info:
+                    self.logger.warning(f"No chapter_info for ASIN {asin}")
+                    return []
+
+                chapters_data = chapter_info.get("chapters", [])
+
+                await self.mass.cache.set(
+                    base_key=CACHE_DOMAIN,
+                    category=CACHE_CATEGORY_CHAPTERS,
+                    key=asin,
+                    data=chapters_data,
+                )
+            except Exception as exc:
+                self.logger.error(f"Error fetching chapters for ASIN {asin}: {exc}")
+                chapters_data = []
+
         return chapters_data
 
     async def get_last_postion(self, asin: str) -> int:
         """Fetch last position of asin."""
-        response = await self._call_api("annotations/lastpositions", asins=asin)
-        return int(
-            response.get("asin_last_position_heard_annots")[0]
-            .get("last_position_heard")
-            .get("position_ms", 0)
-        )
+        if not asin or asin == "error":
+            return 0
+
+        try:
+            response = await self._call_api("annotations/lastpositions", asins=asin)
+
+            if not response:
+                self.logger.debug(f"No last position data available for ASIN {asin}")
+                return 0
+
+            annotations = response.get("asin_last_position_heard_annots")
+            if not annotations or not isinstance(annotations, list) or len(annotations) == 0:
+                self.logger.debug(f"No annotations found for ASIN {asin}")
+                return 0
+
+            annotation = annotations[0]
+            if not annotation or not isinstance(annotation, dict):
+                self.logger.debug(f"Invalid annotation for ASIN {asin}")
+                return 0
+
+            last_position = annotation.get("last_position_heard")
+            if not last_position or not isinstance(last_position, dict):
+                self.logger.debug(f"Invalid last_position for ASIN {asin}")
+                return 0
 
-    async def set_last_position(self, asin: str, pos: int) -> Any:
-        """Report last position."""
+            position_ms = last_position.get("position_ms", 0)
+            return int(position_ms)
+
+        except Exception as exc:
+            self.logger.error(f"Error getting last position for ASIN {asin}: {exc}")
+            return 0
+
+    async def set_last_position(self, asin: str, pos: int) -> None:
+        """Report last position to Audible.
+
+        Args:
+            asin: The audiobook ID
+            pos: Position in seconds
+        """
+        if not asin or asin == "error" or pos <= 0:
+            return
+
+        try:
+            position_ms = pos * 1000
+
+            stream_details = await self.get_stream(asin=asin)
+            acr = stream_details.data.get("acr")
+
+            if not acr:
+                self.logger.warning(f"No ACR available for ASIN {asin}, cannot report position")
+                return
+
+            await self.client.put(
+                f"lastpositions/{asin}", body={"acr": acr, "asin": asin, "position_ms": position_ms}
+            )
+
+            self.logger.debug(f"Successfully reported position {position_ms}ms for ASIN {asin}")
+
+        except (KeyError, TypeError) as exc:
+            self.logger.error(
+                f"Error accessing data while reporting position for ASIN {asin}: {exc}"
+            )
+        except TimeoutError as exc:
+            self.logger.error(f"Timeout while reporting position for ASIN {asin}: {exc}")
+        except ConnectionError as exc:
+            self.logger.error(f"Connection error while reporting position for ASIN {asin}: {exc}")
+        except Exception as exc:
+            self.logger.error(f"Unexpected error reporting position for ASIN {asin}: {exc}")
 
     async def _call_api(self, path: str, **kwargs: Any) -> Any:
         response = None
-        use_cache = False
+        use_cache = kwargs.pop("use_cache", False)
         params_str = json.dumps(kwargs, sort_keys=True)
         params_hash = hashlib.md5(params_str.encode()).hexdigest()
         cache_key_with_params = f"{path}:{params_hash}"
@@ -217,15 +404,27 @@ class AudibleHelper:
             )
         return response
 
-    async def _parse_audiobook(self, audiobook_data: dict[str, Any]) -> Audiobook:
+    async def _parse_audiobook(self, audiobook_data: dict[str, Any] | None) -> Audiobook:
+        if audiobook_data is None:
+            self.logger.error("Received None audiobook_data in _parse_audiobook")
+            raise MediaNotFoundError("Audiobook data not found")
+
         asin = audiobook_data.get("asin", "")
         title = audiobook_data.get("title", "")
         authors = []
         narrators = []
-        for narrator in audiobook_data.get("narrators", []):
-            narrators.append(narrator.get("name"))
-        for author in audiobook_data.get("authors", []):
-            authors.append(author.get("name"))
+
+        narrators_list = audiobook_data.get("narrators") or []
+        if isinstance(narrators_list, list):
+            for narrator in narrators_list:
+                if narrator and isinstance(narrator, dict):
+                    narrators.append(narrator.get("name", "Unknown Narrator"))
+
+        authors_list = audiobook_data.get("authors") or []
+        if isinstance(authors_list, list):
+            for author in authors_list:
+                if author and isinstance(author, dict):
+                    authors.append(author.get("name", "Unknown Author"))
         chapters_data = await self._fetch_chapters(asin=asin)
         duration = sum(chapter["length_ms"] for chapter in chapters_data) / 1000
         book = Audiobook(
@@ -277,9 +476,18 @@ class AudibleHelper:
         for index, chapter_data in enumerate(chapters_data):
             start = int(chapter_data.get("start_offset_sec", 0))
             length = int(chapter_data.get("length_ms", 0)) / 1000
+            raw_title = chapter_data.get("title")
+            chapter_title: str
+            if raw_title is None:
+                chapter_title = f"Chapter {index + 1}"
+            elif isinstance(raw_title, str):
+                chapter_title = raw_title
+            else:
+                chapter_title = str(raw_title)
+
             chapters.append(
                 MediaItemChapter(
-                    position=index, name=chapter_data.get("title"), start=start, end=start + length
+                    position=index, name=chapter_title, start=start, end=start + length
                 )
             )
         book.metadata.chapters = chapters
@@ -299,7 +507,6 @@ def _html_to_txt(html_text: str) -> str:
     return txt
 
 
-# Audible Authorization
 async def audible_get_auth_info(locale: str) -> tuple[str, str, str]:
     """
     Generate the login URL and auth info for Audible OAuth flow asynchronously.
@@ -312,13 +519,8 @@ async def audible_get_auth_info(locale: str) -> tuple[str, str, str]:
         - oauth_url (str): The complete OAuth URL for login
         - serial (str): The generated device serial number
     """
-    # Create locale object (not I/O operation)
     locale_obj = audible.localization.Locale(locale)
-
-    # Create code verifier (potential crypto operations)
     code_verifier = await asyncio.to_thread(audible.login.create_code_verifier)
-
-    # Build OAuth URL (potential network operations)
     oauth_url, serial = await asyncio.to_thread(
         audible.login.build_oauth_url,
         country_code=locale_obj.country_code,
@@ -350,7 +552,6 @@ async def audible_custom_login(
     auth = audible.Authenticator()
     auth.locale = audible.localization.Locale(locale)
 
-    # URL parsing (not I/O operation)
     response_url_parsed = urlparse(response_url)
     parsed_qs = parse_qs(response_url_parsed.query)
 
@@ -358,10 +559,7 @@ async def audible_custom_login(
     if not authorization_codes:
         raise LoginFailed("Authorization code not found in the provided URL.")
 
-    # Get the first authorization code from the list
     authorization_code = authorization_codes[0]
-
-    # Register device (network operation)
     registration_data = await asyncio.to_thread(
         audible.register.register,
         authorization_code=authorization_code,