--- /dev/null
+"""Audible provider for Music Assistant, utilizing the audible library."""
+
+from __future__ import annotations
+
+import asyncio
+import os
+import webbrowser
+from collections.abc import AsyncGenerator
+from logging import getLevelName
+from typing import TYPE_CHECKING, cast
+from uuid import uuid4
+
+import audible
+from music_assistant_models.config_entries import (
+ ConfigEntry,
+ ConfigValueOption,
+ ConfigValueType,
+ ProviderConfig,
+)
+from music_assistant_models.enums import ConfigEntryType, MediaType, ProviderFeature
+from music_assistant_models.errors import LoginFailed
+
+from music_assistant.models.music_provider import MusicProvider
+from music_assistant.providers.audible.audible_helper import (
+ AudibleHelper,
+ audible_custom_login,
+ audible_get_auth_info,
+ check_file_exists,
+ remove_file,
+)
+
+if TYPE_CHECKING:
+ from music_assistant_models.media_items import Audiobook
+ from music_assistant_models.provider import ProviderManifest
+ from music_assistant_models.streamdetails import StreamDetails
+
+ from music_assistant.mass import MusicAssistant
+ from music_assistant.models import ProviderInstanceType
+
+
+# Constants for config actions
+CONF_ACTION_AUTH = "authenticate"
+CONF_ACTION_VERIFY = "verify_link"
+CONF_ACTION_CLEAR_AUTH = "clear_auth"
+CONF_AUTH_FILE = "auth_file"
+CONF_POST_LOGIN_URL = "post_login_url"
+CONF_CODE_VERIFIER = "code_verifier"
+CONF_SERIAL = "serial"
+CONF_LOGIN_URL = "login_url"
+CONF_LOCALE = "locale"
+
+
+async def setup(
+ mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
+) -> ProviderInstanceType:
+ """Initialize provider(instance) with given configuration."""
+ return Audibleprovider(mass, manifest, config)
+
+
+async def get_config_entries(
+ mass: MusicAssistant,
+ instance_id: str | None = None, # noqa: ARG001
+ action: str | None = None,
+ values: dict[str, ConfigValueType] | None = None,
+) -> tuple[ConfigEntry, ...]:
+ """
+ Return Config entries to setup this provider.
+
+ instance_id: id of an existing provider instance (None if new instance setup).
+ action: [optional] action key called from config entries UI.
+ values: the (intermediate) raw values for config entries sent with the action.
+ """
+ if values is None:
+ values = {}
+
+ locale = cast(str, values.get("locale", "") or "us")
+ auth_file = cast(str, values.get(CONF_AUTH_FILE))
+
+ # Check if auth file exists and is valid
+ auth_required = True
+ if auth_file and await check_file_exists(auth_file):
+ try:
+ auth = await asyncio.to_thread(audible.Authenticator.from_file, auth_file)
+ auth_required = False
+ except Exception:
+ auth_required = True
+ label_text = ""
+ if auth_required:
+ label_text = (
+ "You need to authenticate with Audible. Click the authenticate button below"
+ "to start the authentication process which will open in a new (popup) window,"
+ "so make sure to disable any popup blockers.\n\n"
+ "NOTE: \n"
+ "After successful login you will get a 'page not found' message - this is expected."
+ "Copy the address to the textbox below and press verify."
+ "This will register this provider as a virtual device with Audible."
+ )
+ else:
+ label_text = (
+ "Successfully authenticated with Audible."
+ "\nNote: Changing marketplace needs new authorization"
+ )
+
+ if action == CONF_ACTION_AUTH:
+ if auth_file and await check_file_exists(auth_file):
+ await remove_file(auth_file)
+ values[CONF_AUTH_FILE] = None
+ auth_file = ""
+
+ code_verifier, login_url, serial = await audible_get_auth_info(locale)
+ values[CONF_CODE_VERIFIER] = code_verifier
+ values[CONF_SERIAL] = serial
+ values[CONF_LOGIN_URL] = login_url
+ webbrowser.open_new_tab(login_url)
+
+ if action == CONF_ACTION_VERIFY:
+ code_verifier = str(values.get(CONF_CODE_VERIFIER))
+ serial = str(values.get(CONF_SERIAL))
+ post_login_url = str(values.get(CONF_POST_LOGIN_URL))
+ storage_path = mass.storage_path
+
+ auth = await audible_custom_login(code_verifier, post_login_url, serial, locale)
+ auth_file_path = os.path.join(storage_path, f"audible_auth_{uuid4().hex}.json")
+ await asyncio.to_thread(auth.to_file, auth_file_path)
+ values[CONF_AUTH_FILE] = auth_file_path
+ auth_required = False
+
+ return (
+ ConfigEntry(
+ key="label_text",
+ type=ConfigEntryType.LABEL,
+ label=label_text,
+ ),
+ ConfigEntry(
+ key=CONF_LOCALE,
+ type=ConfigEntryType.STRING,
+ label="Marketplace",
+ hidden=not auth_required,
+ required=True,
+ value=locale,
+ options=(
+ ConfigValueOption("US and all other countries not listed", "us"),
+ ConfigValueOption("Canada", "ca"),
+ ConfigValueOption("UK and Ireland", "uk"),
+ ConfigValueOption("Australia and New Zealand", "au"),
+ ConfigValueOption("France, Belgium, Switzerland", "fr"),
+ ConfigValueOption("Germany, Austria, Switzerland", "de"),
+ ConfigValueOption("Japan", "jp"),
+ ConfigValueOption("Italy", "it"),
+ ConfigValueOption("India", "in"),
+ ConfigValueOption("Spain", "es"),
+ ConfigValueOption("Brazil", "br"),
+ ),
+ default_value="us",
+ ),
+ ConfigEntry(
+ key=CONF_ACTION_AUTH,
+ type=ConfigEntryType.ACTION,
+ label="(Re)Authenticate with Audible",
+ description="This button will redirect you to Audible to authenticate.",
+ action=CONF_ACTION_AUTH,
+ ),
+ ConfigEntry(
+ key=CONF_POST_LOGIN_URL,
+ type=ConfigEntryType.STRING,
+ label="Post Login Url",
+ required=False,
+ value=values.get(CONF_POST_LOGIN_URL),
+ hidden=not auth_required,
+ ),
+ ConfigEntry(
+ key=CONF_ACTION_VERIFY,
+ type=ConfigEntryType.ACTION,
+ label="Verify Audible URL",
+ description="This button will check the url and register this provider.",
+ action=CONF_ACTION_VERIFY,
+ hidden=not auth_required,
+ ),
+ ConfigEntry(
+ key=CONF_CODE_VERIFIER,
+ type=ConfigEntryType.STRING,
+ label="Code Verifier",
+ hidden=True,
+ required=False,
+ value=values.get(CONF_CODE_VERIFIER),
+ ),
+ ConfigEntry(
+ key=CONF_SERIAL,
+ type=ConfigEntryType.STRING,
+ label="Serial",
+ hidden=True,
+ required=False,
+ value=values.get(CONF_SERIAL),
+ ),
+ ConfigEntry(
+ key=CONF_LOGIN_URL,
+ type=ConfigEntryType.STRING,
+ label="Login Url",
+ hidden=True,
+ required=False,
+ value=values.get(CONF_LOGIN_URL),
+ ),
+ ConfigEntry(
+ key=CONF_AUTH_FILE,
+ type=ConfigEntryType.STRING,
+ label="Authentication File",
+ hidden=True,
+ required=True,
+ value=values.get(CONF_AUTH_FILE),
+ ),
+ )
+
+
+class Audibleprovider(MusicProvider):
+ """Implementation of a Audible Audiobook Provider."""
+
+ def __init__(
+ self, mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
+ ) -> None:
+ """Initialize the Audible Audiobook Provider."""
+ super().__init__(mass, manifest, config)
+ self.locale = cast(str, self.config.get_value(CONF_LOCALE) or "us")
+ self.auth_file = cast(str, self.config.get_value(CONF_AUTH_FILE))
+ self._client: audible.AsyncClient | None = None
+ audible.log_helper.set_level(getLevelName(self.logger.level))
+
+ async def handle_async_init(self) -> None:
+ """Handle asynchronous initialization of the provider."""
+ await self._login()
+
+ async def _login(self) -> None:
+ """Authenticate with Audible using the saved authentication file."""
+ try:
+ auth = await asyncio.to_thread(audible.Authenticator.from_file, self.auth_file)
+
+ if auth.access_token_expired:
+ await asyncio.to_thread(auth.refresh_access_token)
+ await asyncio.to_thread(auth.to_file, self.auth_file)
+
+ self._client = audible.AsyncClient(auth)
+
+ self.helper = AudibleHelper(
+ mass=self.mass,
+ client=self._client,
+ provider_instance=self.instance_id,
+ provider_domain=self.domain,
+ )
+
+ self.logger.info("Successfully authenticated with Audible.")
+
+ except Exception as e:
+ self.logger.error(f"Failed to authenticate with Audible: {e}")
+ raise LoginFailed("Failed to authenticate with Audible.")
+
+ @property
+ def supported_features(self) -> set[ProviderFeature]:
+ """Return the features supported by this Provider."""
+ return {ProviderFeature.BROWSE, ProviderFeature.LIBRARY_AUDIOBOOKS}
+
+ @property
+ def is_streaming_provider(self) -> bool:
+ """Return True if the provider is a streaming provider."""
+ return True
+
+ async def get_library_audiobooks(self) -> AsyncGenerator[Audiobook, None]:
+ """Get all audiobooks from the library."""
+ async for audiobook in self.helper.get_library():
+ yield audiobook
+
+ async def get_audiobook(self, prov_audiobook_id: str) -> Audiobook:
+ """Get full audiobook details by id."""
+ audiobook = await self.helper.get_audiobook(asin=prov_audiobook_id, use_cache=False)
+ if audiobook is None:
+ raise ValueError(f"Audiobook with id {prov_audiobook_id} not found")
+ return audiobook
+
+ async def get_stream_details(
+ self, item_id: str, media_type: MediaType = MediaType.AUDIOBOOK
+ ) -> StreamDetails:
+ """Get streamdetails for a audiobook based of asin."""
+ return await self.helper.get_stream(asin=item_id)
+
+ async def on_streamed(
+ self,
+ streamdetails: StreamDetails,
+ seconds_streamed: int,
+ fully_played: bool = False,
+ ) -> None:
+ """Handle callback when an item completed streaming."""
+ await self.helper.set_last_position(streamdetails.item_id, seconds_streamed)
+
+ async def unload(self, is_removed: bool = False) -> None:
+ """
+ Handle unload/close of the provider.
+
+ Called when provider is deregistered (e.g. MA exiting or config reloading).
+ is_removed will be set to True when the provider is removed from the configuration.
+ """
+ if is_removed:
+ await self.helper.deregister()
--- /dev/null
+"""Helper for parsing and using audible api."""
+
+from __future__ import annotations
+
+import asyncio
+import hashlib
+import html
+import json
+import os
+import re
+from collections.abc import AsyncGenerator
+from os import PathLike
+from typing import Any
+from urllib.parse import parse_qs, urlparse
+
+import audible
+import audible.register
+from audible import AsyncClient
+from music_assistant_models.enums import (
+ ContentType,
+ ImageType,
+ MediaType,
+ StreamType,
+)
+from music_assistant_models.errors import LoginFailed
+from music_assistant_models.media_items import (
+ Audiobook,
+ AudioFormat,
+ MediaItemChapter,
+ MediaItemImage,
+ ProviderMapping,
+ UniqueList,
+)
+from music_assistant_models.streamdetails import StreamDetails
+
+from music_assistant.mass import MusicAssistant
+
+CACHE_DOMAIN = "audible"
+CACHE_CATEGORY_API = 0
+CACHE_CATEGORY_AUDIOBOOK = 1
+CACHE_CATEGORY_CHAPTERS = 2
+
+
+class AudibleHelper:
+ """Helper for parsing and using audible api."""
+
+ def __init__(
+ self,
+ mass: MusicAssistant,
+ client: AsyncClient,
+ provider_domain: str,
+ provider_instance: str,
+ ):
+ """Initialize the Audible Helper."""
+ self.mass = mass
+ self.client = client
+ self.provider_domain = provider_domain
+ self.provider_instance = provider_instance
+
+ async def get_library(self) -> AsyncGenerator[Audiobook, None]:
+ """Fetch the user's library with pagination."""
+ response_groups = [
+ "contributors",
+ "media",
+ "product_attrs",
+ "product_desc",
+ "product_details",
+ "product_extended_attrs",
+ ]
+
+ page = 1
+ page_size = 50
+
+ while True:
+ library = await self._call_api(
+ "library",
+ response_groups=",".join(response_groups),
+ page=page,
+ num_results=page_size,
+ )
+
+ items = library.get("items", [])
+ if not items:
+ break
+
+ for audiobook_data in items:
+ asin = audiobook_data.get("asin")
+ cached_book = await self.mass.cache.get(
+ key=asin, base_key=CACHE_DOMAIN, category=CACHE_CATEGORY_AUDIOBOOK, default=None
+ )
+
+ if cached_book is not None:
+ album = await self._parse_audiobook(cached_book)
+ yield album
+ else:
+ album = await self._parse_audiobook(audiobook_data)
+ yield album
+
+ # Check if we've reached the end
+ total_items = library.get("total_results", 0)
+ if page * page_size >= total_items:
+ break
+
+ page += 1
+
+ async def get_audiobook(self, asin: str, use_cache: bool = True) -> Audiobook | None:
+ """Fetch the audiobook by asin."""
+ if use_cache:
+ cached_book = await self.mass.cache.get(
+ key=asin, base_key=CACHE_DOMAIN, category=CACHE_CATEGORY_AUDIOBOOK, default=None
+ )
+ if cached_book is not None:
+ return await self._parse_audiobook(cached_book)
+ response = await self._call_api(
+ f"library/{asin}",
+ response_groups="""
+ contributors, media, price, product_attrs, product_desc, product_details,
+ product_extended_attrs,is_finished
+ """,
+ )
+
+ if response is None:
+ return None
+ await self.mass.cache.set(
+ key=asin,
+ base_key=CACHE_DOMAIN,
+ category=CACHE_CATEGORY_AUDIOBOOK,
+ data=response.get("item"),
+ )
+ return await self._parse_audiobook(response.get("item"))
+
+ async def get_stream(self, asin: str) -> StreamDetails:
+ """Get stream details for a track (audiobook chapter)."""
+ chapters = await self._fetch_chapters(asin=asin)
+
+ duration = sum(chapter["length_ms"] for chapter in chapters) / 1000
+
+ playback_info = await self.client.post(
+ f"content/{asin}/licenserequest",
+ body={
+ "quality": "High",
+ "response_groups": "content_reference,certificate",
+ "consumption_type": "Streaming",
+ "supported_media_features": {
+ "codecs": ["mp4a.40.2", "mp4a.40.42"],
+ "drm_types": [
+ "Hls",
+ ],
+ },
+ "spatial": False,
+ },
+ )
+ size = (
+ playback_info.get("content_license")
+ .get("content_metadata")
+ .get("content_reference")
+ .get("content_size_in_bytes", 0)
+ )
+
+ m3u8_url = playback_info.get("content_license").get("license_response")
+ acr = playback_info.get("content_license").get("acr")
+ return StreamDetails(
+ provider=self.provider_instance,
+ size=size,
+ item_id=f"{asin}",
+ audio_format=AudioFormat(content_type=ContentType.AAC),
+ media_type=MediaType.AUDIOBOOK,
+ stream_type=StreamType.HTTP,
+ path=m3u8_url,
+ can_seek=True,
+ duration=duration,
+ data={"acr": acr},
+ )
+
+ async def _fetch_chapters(self, asin: str) -> Any:
+ chapters_data: list[Any] = await self.mass.cache.get(
+ base_key=CACHE_DOMAIN, category=CACHE_CATEGORY_CHAPTERS, key=asin, default=[]
+ )
+ if not chapters_data:
+ response = await self._call_api(
+ f"content/{asin}/metadata",
+ response_groups="chapter_info, always-returned, content_reference, content_url",
+ chapter_titles_type="Flat",
+ )
+ chapters_data = response.get("content_metadata").get("chapter_info").get("chapters")
+ await self.mass.cache.set(
+ base_key=CACHE_DOMAIN,
+ category=CACHE_CATEGORY_CHAPTERS,
+ key=asin,
+ data=chapters_data,
+ )
+ return chapters_data
+
+ async def get_last_postion(self, asin: str) -> int:
+ """Fetch last position of asin."""
+ response = await self._call_api("annotations/lastpositions", asins=asin)
+ return int(
+ response.get("asin_last_position_heard_annots")[0]
+ .get("last_position_heard")
+ .get("position_ms", 0)
+ )
+
+ async def set_last_position(self, asin: str, pos: int) -> Any:
+ """Report last position."""
+
+ async def _call_api(self, path: str, **kwargs: Any) -> Any:
+ params_str = json.dumps(kwargs, sort_keys=True)
+ params_hash = hashlib.md5(params_str.encode()).hexdigest()
+ cache_key_with_params = f"{path}:{params_hash}"
+
+ response = await self.mass.cache.get(
+ key=cache_key_with_params, base_key=CACHE_DOMAIN, category=CACHE_CATEGORY_API
+ )
+ if not response:
+ response = await self.client.get(path, **kwargs)
+ await self.mass.cache.set(
+ key=cache_key_with_params, base_key=CACHE_DOMAIN, data=response
+ )
+ return response
+
+ async def _parse_audiobook(self, audiobook_data: dict[str, Any]) -> Audiobook:
+ asin = audiobook_data.get("asin", "")
+ title = audiobook_data.get("title", "")
+ authors = []
+ narrators = []
+ for narrator in audiobook_data.get("narrators", []):
+ narrators.append(narrator.get("name"))
+ for author in audiobook_data.get("authors", []):
+ authors.append(author.get("name"))
+ chapters_data = await self._fetch_chapters(asin=asin)
+ duration = sum(chapter["length_ms"] for chapter in chapters_data) / 1000
+ book = Audiobook(
+ item_id=asin,
+ provider=self.provider_instance,
+ name=title,
+ duration=duration,
+ provider_mappings={
+ ProviderMapping(
+ item_id=asin,
+ provider_domain=self.provider_domain,
+ provider_instance=self.provider_instance,
+ )
+ },
+ publisher=audiobook_data.get("publisher_name"),
+ authors=UniqueList(authors),
+ narrators=UniqueList(narrators),
+ )
+ book.metadata.copyright = audiobook_data.get("copyright")
+ book.metadata.description = _html_to_txt(
+ str(audiobook_data.get("extended_product_description", ""))
+ )
+ book.metadata.languages = UniqueList([audiobook_data.get("language", "")])
+ book.metadata.release_date = audiobook_data.get("release_date")
+ reviews = audiobook_data.get("editorial_reviews", [])
+ if reviews:
+ book.metadata.review = _html_to_txt(reviews[0])
+ book.metadata.genres = {
+ genre.replace("_", " ") for genre in audiobook_data.get("platinum_keywords", "")
+ }
+ book.metadata.images = UniqueList(
+ [
+ MediaItemImage(
+ type=ImageType.THUMB,
+ path=audiobook_data.get("product_images", {}).get("500"),
+ provider=self.provider_instance,
+ remotely_accessible=True,
+ ),
+ MediaItemImage(
+ type=ImageType.CLEARART,
+ path=audiobook_data.get("product_images", {}).get("500"),
+ provider=self.provider_instance,
+ remotely_accessible=True,
+ ),
+ ]
+ )
+
+ chapters = []
+ for index, chapter_data in enumerate(chapters_data):
+ start = int(chapter_data.get("start_offset_sec", 0))
+ length = int(chapter_data.get("length_ms", 0)) / 1000
+ chapters.append(
+ MediaItemChapter(
+ position=index, name=chapter_data.get("title"), start=start, end=start + length
+ )
+ )
+ book.metadata.chapters = chapters
+ book.resume_position_ms = await self.get_last_postion(asin=asin)
+ return book
+
+ async def deregister(self) -> None:
+ """Deregister this provider from Audible."""
+ await asyncio.to_thread(self.client.auth.deregister_device)
+
+
+def _html_to_txt(html_text: str) -> str:
+ txt = html.unescape(html_text)
+ tags = re.findall("<[^>]+>", txt)
+ for tag in tags:
+ txt = txt.replace(tag, "")
+ return txt
+
+
+# Audible Authorization
+async def audible_get_auth_info(locale: str) -> tuple[str, str, str]:
+ """
+ Generate the login URL and auth info for Audible OAuth flow asynchronously.
+
+ Args:
+ locale: The locale string (e.g., 'us', 'uk', 'de') to determine region settings
+ Returns:
+ A tuple containing:
+ - code_verifier (str): The OAuth code verifier string
+ - oauth_url (str): The complete OAuth URL for login
+ - serial (str): The generated device serial number
+ """
+ # Create locale object (not I/O operation)
+ locale_obj = audible.localization.Locale(locale)
+
+ # Create code verifier (potential crypto operations)
+ code_verifier = await asyncio.to_thread(audible.login.create_code_verifier)
+
+ # Build OAuth URL (potential network operations)
+ oauth_url, serial = await asyncio.to_thread(
+ audible.login.build_oauth_url,
+ country_code=locale_obj.country_code,
+ domain=locale_obj.domain,
+ market_place_id=locale_obj.market_place_id,
+ code_verifier=code_verifier,
+ with_username=False,
+ )
+
+ return code_verifier.decode(), oauth_url, serial
+
+
+async def audible_custom_login(
+ code_verifier: str, response_url: str, serial: str, locale: str
+) -> audible.Authenticator:
+ """
+ Complete the authentication using the code_verifier, response_url, and serial asynchronously.
+
+ Args:
+ code_verifier: The code verifier string used in OAuth flow
+ response_url: The response URL containing the authorization code
+ serial: The device serial number
+ locale: The locale string
+ Returns:
+ Audible Authenticator object
+ Raises:
+ LoginFailed: If authorization code is not found in the URL
+ """
+ auth = audible.Authenticator()
+ auth.locale = audible.localization.Locale(locale)
+
+ # URL parsing (not I/O operation)
+ response_url_parsed = urlparse(response_url)
+ parsed_qs = parse_qs(response_url_parsed.query)
+
+ authorization_codes = parsed_qs.get("openid.oa2.authorization_code")
+ if not authorization_codes:
+ raise LoginFailed("Authorization code not found in the provided URL.")
+
+ # Get the first authorization code from the list
+ authorization_code = authorization_codes[0]
+
+ # Register device (network operation)
+ registration_data = await asyncio.to_thread(
+ audible.register.register,
+ authorization_code=authorization_code,
+ code_verifier=code_verifier.encode(),
+ domain=auth.locale.domain,
+ serial=serial,
+ )
+ auth._update_attrs(**registration_data)
+ return auth
+
+
+async def check_file_exists(path: str | PathLike[str]) -> bool:
+ """Async file exists check."""
+ return await asyncio.to_thread(os.path.exists, path)
+
+
+async def remove_file(path: str | PathLike[str]) -> None:
+ """Async file delete."""
+ await asyncio.to_thread(os.remove, path)