From: Ztripez Date: Tue, 4 Mar 2025 21:47:45 +0000 (+0100) Subject: Audible provider: Fixes pagination and small cleanups (#1982) X-Git-Url: https://git.kitaultman.com/?a=commitdiff_plain;h=4484df7d669540b5b42eebf8a81159b263cdc60d;p=music-assistant-server.git Audible provider: Fixes pagination and small cleanups (#1982) --- diff --git a/music_assistant/providers/audible/__init__.py b/music_assistant/providers/audible/__init__.py index 03f3a2b5..3f7bd329 100644 --- a/music_assistant/providers/audible/__init__.py +++ b/music_assistant/providers/audible/__init__.py @@ -17,13 +17,14 @@ from music_assistant_models.config_entries import ( ProviderConfig, ) from music_assistant_models.enums import ConfigEntryType, EventType, MediaType, ProviderFeature -from music_assistant_models.errors import LoginFailed +from music_assistant_models.errors import LoginFailed, MediaNotFoundError from music_assistant.models.music_provider import MusicProvider from music_assistant.providers.audible.audible_helper import ( AudibleHelper, audible_custom_login, audible_get_auth_info, + cached_authenticator_from_file, check_file_exists, remove_file, ) @@ -75,11 +76,10 @@ async def get_config_entries( locale = cast(str, values.get("locale", "") or "us") auth_file = cast(str, values.get(CONF_AUTH_FILE)) - # Check if auth file exists and is valid auth_required = True if auth_file and await check_file_exists(auth_file): try: - auth = await asyncio.to_thread(audible.Authenticator.from_file, auth_file) + auth = await cached_authenticator_from_file(auth_file) auth_required = False except Exception: auth_required = True @@ -229,14 +229,26 @@ class Audibleprovider(MusicProvider): """Handle asynchronous initialization of the provider.""" await self._login() + # Cache for authenticators to avoid repeated file I/O + _AUTH_CACHE: dict[str, audible.Authenticator] = {} + async def _login(self) -> None: """Authenticate with Audible using the saved authentication file.""" try: - auth = await asyncio.to_thread(audible.Authenticator.from_file, self.auth_file) + auth = self._AUTH_CACHE.get(self.instance_id) + + if auth is None: + self.logger.debug("Loading authenticator from file") + auth = await cached_authenticator_from_file(self.auth_file) + self._AUTH_CACHE[self.instance_id] = auth + else: + self.logger.debug("Using cached authenticator") if auth.access_token_expired: + self.logger.debug("Access token expired, refreshing") await asyncio.to_thread(auth.refresh_access_token) await asyncio.to_thread(auth.to_file, self.auth_file) + self._AUTH_CACHE[self.instance_id] = auth self._client = audible.AsyncClient(auth) @@ -245,6 +257,7 @@ class Audibleprovider(MusicProvider): client=self._client, provider_instance=self.instance_id, provider_domain=self.domain, + logger=self.logger, ) self.logger.info("Successfully authenticated with Audible.") @@ -270,14 +283,14 @@ class Audibleprovider(MusicProvider): async def get_audiobook(self, prov_audiobook_id: str) -> Audiobook: """Get full audiobook details by id.""" - audiobook = await self.helper.get_audiobook(asin=prov_audiobook_id, use_cache=False) - if audiobook is None: - raise ValueError(f"Audiobook with id {prov_audiobook_id} not found") - return audiobook + return await self.helper.get_audiobook(asin=prov_audiobook_id, use_cache=False) async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails: """Get streamdetails for a audiobook based of asin.""" - return await self.helper.get_stream(asin=item_id) + try: + return await self.helper.get_stream(asin=item_id) + except ValueError as exc: + raise MediaNotFoundError(f"Failed to get stream details for {item_id}") from exc async def on_played( self, diff --git a/music_assistant/providers/audible/audible_helper.py b/music_assistant/providers/audible/audible_helper.py index f7d11086..fd8a1953 100644 --- a/music_assistant/providers/audible/audible_helper.py +++ b/music_assistant/providers/audible/audible_helper.py @@ -6,6 +6,7 @@ import asyncio import hashlib import html import json +import logging import os import re from collections.abc import AsyncGenerator @@ -17,7 +18,7 @@ import audible import audible.register from audible import AsyncClient from music_assistant_models.enums import ContentType, ImageType, MediaType, StreamType -from music_assistant_models.errors import LoginFailed +from music_assistant_models.errors import LoginFailed, MediaNotFoundError from music_assistant_models.media_items import ( Audiobook, AudioFormat, @@ -35,6 +36,21 @@ CACHE_CATEGORY_API = 0 CACHE_CATEGORY_AUDIOBOOK = 1 CACHE_CATEGORY_CHAPTERS = 2 +# Cache for authenticator objects to avoid repeated file reads +_AUTH_CACHE: dict[str, audible.Authenticator] = {} + + +async def cached_authenticator_from_file(path: str) -> audible.Authenticator: + """Get an authenticator from file with caching to avoid repeated file reads.""" + logger = logging.getLogger("audible_helper") + if path in _AUTH_CACHE: + return _AUTH_CACHE[path] + + logger.debug("Loading authenticator from file %s and caching it", path) + auth = await asyncio.to_thread(audible.Authenticator.from_file, path) + _AUTH_CACHE[path] = auth + return auth + class AudibleHelper: """Helper for parsing and using audible api.""" @@ -45,12 +61,14 @@ class AudibleHelper: client: AsyncClient, provider_domain: str, provider_instance: str, + logger: logging.Logger | None = None, ): """Initialize the Audible Helper.""" self.mass = mass self.client = client self.provider_domain = provider_domain self.provider_instance = provider_instance + self.logger = logger or logging.getLogger("audible_helper") async def get_library(self) -> AsyncGenerator[Audiobook, None]: """Fetch the user's library with pagination.""" @@ -65,8 +83,20 @@ class AudibleHelper: page = 1 page_size = 50 + total_processed = 0 + max_iterations = 100 + iteration = 0 + + while iteration < max_iterations: + iteration += 1 + + self.logger.debug( + "Audible: Fetching library page %s with page_size %s (processed so far: %s)", + page, + page_size, + total_processed, + ) - while True: library = await self._call_api( "library", use_cache=False, @@ -76,30 +106,76 @@ class AudibleHelper: ) items = library.get("items", []) - if not items: + total_items = library.get("total_results", 0) + + self.logger.debug( + "Audible: Got %s items (total reported by API: %s)", len(items), total_items + ) + + if not items or len(items) < page_size: + self.logger.debug( + "Audible: No more items or fewer than page size returned, " + "ending pagination (processed %s items)", + total_processed, + ) break + items_processed_this_page = 0 for audiobook_data in items: + content_type = audiobook_data.get("content_delivery_type", "") + if content_type in ("PodcastParent", "NonAudio"): + self.logger.debug( + "Skipping non-audiobook item: %s (%s)", + audiobook_data.get("title", "Unknown"), + content_type, + ) + total_processed += 1 + continue + asin = audiobook_data.get("asin") cached_book = await self.mass.cache.get( key=asin, base_key=CACHE_DOMAIN, category=CACHE_CATEGORY_AUDIOBOOK, default=None ) - if cached_book is not None: - album = await self._parse_audiobook(cached_book) - yield album - else: - album = await self._parse_audiobook(audiobook_data) - yield album - - # Check if we've reached the end - total_items = library.get("total_results", 0) - if page * page_size >= total_items: - break + try: + if cached_book is not None: + album = await self._parse_audiobook(cached_book) + yield album + else: + album = await self._parse_audiobook(audiobook_data) + yield album + + total_processed += 1 + items_processed_this_page += 1 + except MediaNotFoundError as exc: + self.logger.warning(f"Skipping invalid audiobook: {exc}") + total_processed += 1 + continue + + self.logger.debug( + "Audible: Processed %s valid audiobooks on page %s", items_processed_this_page, page + ) page += 1 + self.logger.debug( + "Audible: Moving to page %s (processed: %s, total reported: %s)", + page, + total_processed, + total_items, + ) - async def get_audiobook(self, asin: str, use_cache: bool = True) -> Audiobook | None: + if iteration >= max_iterations: + self.logger.warning( + "Audible: Reached maximum iteration limit (%s) with %s items processed", + max_iterations, + total_processed, + ) + else: + self.logger.info( + "Audible: Successfully retrieved %s audiobooks from library", total_processed + ) + + async def get_audiobook(self, asin: str, use_cache: bool = True) -> Audiobook: """Fetch the audiobook by asin.""" if use_cache: cached_book = await self.mass.cache.get( @@ -116,45 +192,68 @@ class AudibleHelper: ) if response is None: - return None + raise MediaNotFoundError(f"Audiobook with ASIN {asin} not found") + + item_data = response.get("item") + if item_data is None: + raise MediaNotFoundError(f"Audiobook data for ASIN {asin} is empty") + await self.mass.cache.set( key=asin, base_key=CACHE_DOMAIN, category=CACHE_CATEGORY_AUDIOBOOK, - data=response.get("item"), + data=item_data, ) - return await self._parse_audiobook(response.get("item")) + return await self._parse_audiobook(item_data) async def get_stream(self, asin: str) -> StreamDetails: """Get stream details for a track (audiobook chapter).""" - chapters = await self._fetch_chapters(asin=asin) + if not asin: + self.logger.error("Invalid ASIN provided to get_stream") + raise ValueError("Invalid ASIN provided to get_stream") - duration = sum(chapter["length_ms"] for chapter in chapters) / 1000 - - playback_info = await self.client.post( - f"content/{asin}/licenserequest", - body={ - "quality": "High", - "response_groups": "content_reference,certificate", - "consumption_type": "Streaming", - "supported_media_features": { - "codecs": ["mp4a.40.2", "mp4a.40.42"], - "drm_types": [ - "Hls", - ], + chapters = await self._fetch_chapters(asin=asin) + if not chapters: + self.logger.warning(f"No chapters found for ASIN {asin}, using default duration") + duration = 0 + else: + duration = sum(chapter["length_ms"] for chapter in chapters) / 1000 + + try: + playback_info = await self.client.post( + f"content/{asin}/licenserequest", + body={ + "quality": "High", + "response_groups": "content_reference,certificate", + "consumption_type": "Streaming", + "supported_media_features": { + "codecs": ["mp4a.40.2", "mp4a.40.42"], + "drm_types": [ + "Hls", + ], + }, + "spatial": False, }, - "spatial": False, - }, - ) - size = ( - playback_info.get("content_license") - .get("content_metadata") - .get("content_reference") - .get("content_size_in_bytes", 0) - ) + ) + + content_license = playback_info.get("content_license", {}) + if not content_license: + self.logger.error(f"No content_license in playback_info for ASIN {asin}") + raise ValueError(f"Missing content_license for ASIN {asin}") + + content_metadata = content_license.get("content_metadata", {}) + content_reference = content_metadata.get("content_reference", {}) + size = content_reference.get("content_size_in_bytes", 0) - m3u8_url = playback_info.get("content_license").get("license_response") - acr = playback_info.get("content_license").get("acr") + m3u8_url = content_license.get("license_response") + if not m3u8_url: + self.logger.error(f"No license_response (stream URL) for ASIN {asin}") + raise ValueError(f"Missing stream URL for ASIN {asin}") + + acr = content_license.get("acr") + except Exception as exc: + self.logger.error(f"Error getting stream details for ASIN {asin}: {exc}") + raise ValueError(f"Failed to get stream details: {exc}") from exc return StreamDetails( provider=self.provider_instance, size=size, @@ -169,40 +268,128 @@ class AudibleHelper: data={"acr": acr}, ) - async def _fetch_chapters(self, asin: str) -> Any: + async def _fetch_chapters(self, asin: str) -> list[dict[str, Any]]: + """Fetch chapter data for an audiobook.""" + if not asin or asin == "error": + self.logger.warning( + "Invalid ASIN provided to _fetch_chapters, returning empty chapter list" + ) + return [] + chapters_data: list[Any] = await self.mass.cache.get( base_key=CACHE_DOMAIN, category=CACHE_CATEGORY_CHAPTERS, key=asin, default=[] ) + if not chapters_data: - response = await self._call_api( - f"content/{asin}/metadata", - response_groups="chapter_info, always-returned, content_reference, content_url", - chapter_titles_type="Flat", - ) - chapters_data = response.get("content_metadata").get("chapter_info").get("chapters") - await self.mass.cache.set( - base_key=CACHE_DOMAIN, - category=CACHE_CATEGORY_CHAPTERS, - key=asin, - data=chapters_data, - ) + try: + response = await self._call_api( + f"content/{asin}/metadata", + response_groups="chapter_info, always-returned, content_reference, content_url", + chapter_titles_type="Flat", + ) + + if not response: + self.logger.warning(f"Failed to get metadata for ASIN {asin}") + return [] + + content_metadata = response.get("content_metadata") + if not content_metadata: + self.logger.warning(f"No content_metadata for ASIN {asin}") + return [] + + chapter_info = content_metadata.get("chapter_info") + if not chapter_info: + self.logger.warning(f"No chapter_info for ASIN {asin}") + return [] + + chapters_data = chapter_info.get("chapters", []) + + await self.mass.cache.set( + base_key=CACHE_DOMAIN, + category=CACHE_CATEGORY_CHAPTERS, + key=asin, + data=chapters_data, + ) + except Exception as exc: + self.logger.error(f"Error fetching chapters for ASIN {asin}: {exc}") + chapters_data = [] + return chapters_data async def get_last_postion(self, asin: str) -> int: """Fetch last position of asin.""" - response = await self._call_api("annotations/lastpositions", asins=asin) - return int( - response.get("asin_last_position_heard_annots")[0] - .get("last_position_heard") - .get("position_ms", 0) - ) + if not asin or asin == "error": + return 0 + + try: + response = await self._call_api("annotations/lastpositions", asins=asin) + + if not response: + self.logger.debug(f"No last position data available for ASIN {asin}") + return 0 + + annotations = response.get("asin_last_position_heard_annots") + if not annotations or not isinstance(annotations, list) or len(annotations) == 0: + self.logger.debug(f"No annotations found for ASIN {asin}") + return 0 + + annotation = annotations[0] + if not annotation or not isinstance(annotation, dict): + self.logger.debug(f"Invalid annotation for ASIN {asin}") + return 0 + + last_position = annotation.get("last_position_heard") + if not last_position or not isinstance(last_position, dict): + self.logger.debug(f"Invalid last_position for ASIN {asin}") + return 0 - async def set_last_position(self, asin: str, pos: int) -> Any: - """Report last position.""" + position_ms = last_position.get("position_ms", 0) + return int(position_ms) + + except Exception as exc: + self.logger.error(f"Error getting last position for ASIN {asin}: {exc}") + return 0 + + async def set_last_position(self, asin: str, pos: int) -> None: + """Report last position to Audible. + + Args: + asin: The audiobook ID + pos: Position in seconds + """ + if not asin or asin == "error" or pos <= 0: + return + + try: + position_ms = pos * 1000 + + stream_details = await self.get_stream(asin=asin) + acr = stream_details.data.get("acr") + + if not acr: + self.logger.warning(f"No ACR available for ASIN {asin}, cannot report position") + return + + await self.client.put( + f"lastpositions/{asin}", body={"acr": acr, "asin": asin, "position_ms": position_ms} + ) + + self.logger.debug(f"Successfully reported position {position_ms}ms for ASIN {asin}") + + except (KeyError, TypeError) as exc: + self.logger.error( + f"Error accessing data while reporting position for ASIN {asin}: {exc}" + ) + except TimeoutError as exc: + self.logger.error(f"Timeout while reporting position for ASIN {asin}: {exc}") + except ConnectionError as exc: + self.logger.error(f"Connection error while reporting position for ASIN {asin}: {exc}") + except Exception as exc: + self.logger.error(f"Unexpected error reporting position for ASIN {asin}: {exc}") async def _call_api(self, path: str, **kwargs: Any) -> Any: response = None - use_cache = False + use_cache = kwargs.pop("use_cache", False) params_str = json.dumps(kwargs, sort_keys=True) params_hash = hashlib.md5(params_str.encode()).hexdigest() cache_key_with_params = f"{path}:{params_hash}" @@ -217,15 +404,27 @@ class AudibleHelper: ) return response - async def _parse_audiobook(self, audiobook_data: dict[str, Any]) -> Audiobook: + async def _parse_audiobook(self, audiobook_data: dict[str, Any] | None) -> Audiobook: + if audiobook_data is None: + self.logger.error("Received None audiobook_data in _parse_audiobook") + raise MediaNotFoundError("Audiobook data not found") + asin = audiobook_data.get("asin", "") title = audiobook_data.get("title", "") authors = [] narrators = [] - for narrator in audiobook_data.get("narrators", []): - narrators.append(narrator.get("name")) - for author in audiobook_data.get("authors", []): - authors.append(author.get("name")) + + narrators_list = audiobook_data.get("narrators") or [] + if isinstance(narrators_list, list): + for narrator in narrators_list: + if narrator and isinstance(narrator, dict): + narrators.append(narrator.get("name", "Unknown Narrator")) + + authors_list = audiobook_data.get("authors") or [] + if isinstance(authors_list, list): + for author in authors_list: + if author and isinstance(author, dict): + authors.append(author.get("name", "Unknown Author")) chapters_data = await self._fetch_chapters(asin=asin) duration = sum(chapter["length_ms"] for chapter in chapters_data) / 1000 book = Audiobook( @@ -277,9 +476,18 @@ class AudibleHelper: for index, chapter_data in enumerate(chapters_data): start = int(chapter_data.get("start_offset_sec", 0)) length = int(chapter_data.get("length_ms", 0)) / 1000 + raw_title = chapter_data.get("title") + chapter_title: str + if raw_title is None: + chapter_title = f"Chapter {index + 1}" + elif isinstance(raw_title, str): + chapter_title = raw_title + else: + chapter_title = str(raw_title) + chapters.append( MediaItemChapter( - position=index, name=chapter_data.get("title"), start=start, end=start + length + position=index, name=chapter_title, start=start, end=start + length ) ) book.metadata.chapters = chapters @@ -299,7 +507,6 @@ def _html_to_txt(html_text: str) -> str: return txt -# Audible Authorization async def audible_get_auth_info(locale: str) -> tuple[str, str, str]: """ Generate the login URL and auth info for Audible OAuth flow asynchronously. @@ -312,13 +519,8 @@ async def audible_get_auth_info(locale: str) -> tuple[str, str, str]: - oauth_url (str): The complete OAuth URL for login - serial (str): The generated device serial number """ - # Create locale object (not I/O operation) locale_obj = audible.localization.Locale(locale) - - # Create code verifier (potential crypto operations) code_verifier = await asyncio.to_thread(audible.login.create_code_verifier) - - # Build OAuth URL (potential network operations) oauth_url, serial = await asyncio.to_thread( audible.login.build_oauth_url, country_code=locale_obj.country_code, @@ -350,7 +552,6 @@ async def audible_custom_login( auth = audible.Authenticator() auth.locale = audible.localization.Locale(locale) - # URL parsing (not I/O operation) response_url_parsed = urlparse(response_url) parsed_qs = parse_qs(response_url_parsed.query) @@ -358,10 +559,7 @@ async def audible_custom_login( if not authorization_codes: raise LoginFailed("Authorization code not found in the provided URL.") - # Get the first authorization code from the list authorization_code = authorization_codes[0] - - # Register device (network operation) registration_data = await asyncio.to_thread( audible.register.register, authorization_code=authorization_code,