audible_get_auth_info,
cached_authenticator_from_file,
check_file_exists,
+ refresh_access_token_compat,
remove_file,
)
post_login_url = str(values.get(CONF_POST_LOGIN_URL))
storage_path = mass.storage_path
- auth = await audible_custom_login(code_verifier, post_login_url, serial, locale)
- auth_file_path = os.path.join(storage_path, f"audible_auth_{uuid4().hex}.json")
- await asyncio.to_thread(auth.to_file, auth_file_path)
- values[CONF_AUTH_FILE] = auth_file_path
- auth_required = False
+ try:
+ auth = await audible_custom_login(code_verifier, post_login_url, serial, locale)
+
+ # Verify signing auth was obtained (critical for stability)
+ if not (auth.adp_token and auth.device_private_key):
+ raise LoginFailed(
+ "Registration succeeded but signing keys were not obtained. "
+ "This may cause authentication issues. Please try again."
+ )
+
+ auth_file_path = os.path.join(storage_path, f"audible_auth_{uuid4().hex}.json")
+ await asyncio.to_thread(auth.to_file, auth_file_path)
+ values[CONF_AUTH_FILE] = auth_file_path
+ auth_required = False
+ except LoginFailed:
+ raise
+ except Exception as e:
+ raise LoginFailed(f"Verification failed: {e}") from e
return (
ConfigEntry(
else:
self.logger.debug("Using cached authenticator")
+ # Check if we have signing auth (preferred, stable - not affected by API changes)
+ has_signing_auth = auth.adp_token and auth.device_private_key
+ if has_signing_auth:
+ self.logger.debug("Using signing auth (stable RSA-signed requests)")
+ else:
+ self.logger.debug("Signing auth not available, using bearer auth")
+
+ # Handle token refresh if needed
if auth.access_token_expired:
self.logger.debug("Access token expired, refreshing")
- await asyncio.to_thread(auth.refresh_access_token)
- await asyncio.to_thread(auth.to_file, self.auth_file)
- self._AUTH_CACHE[self.instance_id] = auth
+ try:
+ # Use compatible refresh that handles new API token format
+ if auth.refresh_token and auth.locale:
+ refresh_data = await refresh_access_token_compat(
+ refresh_token=auth.refresh_token,
+ domain=auth.locale.domain,
+ http_session=self.mass.http_session,
+ with_username=auth.with_username or False,
+ )
+ auth._update_attrs(**refresh_data)
+ await asyncio.to_thread(auth.to_file, self.auth_file)
+ self._AUTH_CACHE[self.instance_id] = auth
+ self.logger.debug("Token refreshed successfully")
+ else:
+ self.logger.warning("Cannot refresh: missing refresh_token or locale")
+ except Exception as refresh_error:
+ self.logger.warning(f"Token refresh failed: {refresh_error}")
+ if not has_signing_auth:
+ # Only fail if we don't have signing auth as fallback
+ raise LoginFailed(
+ "Token refresh failed and signing auth not available. "
+ "Please re-authenticate with Audible."
+ ) from refresh_error
+ # Continue with signing auth
self._client = audible.AsyncClient(auth)
self.logger.info("Successfully authenticated with Audible.")
+ except LoginFailed:
+ raise
except Exception as e:
self.logger.error(f"Failed to authenticate with Audible: {e}")
- raise LoginFailed("Failed to authenticate with Audible.")
+ raise LoginFailed(f"Failed to authenticate with Audible: {e}") from e
@property
def is_streaming_provider(self) -> bool:
import re
from collections.abc import AsyncGenerator
from contextlib import suppress
-from datetime import UTC, datetime
+from datetime import UTC, datetime, timedelta
from os import PathLike
-from typing import Any
+from typing import TYPE_CHECKING, Any
from urllib.parse import parse_qs, urlparse
import audible
import audible.register
from audible import AsyncClient
+
+if TYPE_CHECKING:
+ from aiohttp import ClientSession
from music_assistant_models.enums import ContentType, ImageType, MediaType, StreamType
from music_assistant_models.errors import LoginFailed, MediaNotFoundError
from music_assistant_models.media_items import (
_AUTH_CACHE: dict[str, audible.Authenticator] = {}
+async def refresh_access_token_compat(
+ refresh_token: str, domain: str, http_session: ClientSession, with_username: bool = False
+) -> dict[str, Any]:
+ """Refresh tokens with compatibility for new Audible API format.
+
+ The Audible API changed from returning 'access_token' to 'actor_access_token'.
+ This function handles both formats for backward compatibility.
+
+ :param refresh_token: The refresh token obtained after device registration.
+ :param domain: The top level domain (e.g., com, de).
+ :param http_session: The HTTP client session to use for requests.
+ :param with_username: If True, use audible domain instead of amazon.
+ :return: Dict with access_token and expires timestamp.
+ """
+ logger = logging.getLogger("audible_helper")
+
+ body = {
+ "app_name": "Audible",
+ "app_version": "3.56.2",
+ "source_token": refresh_token,
+ "requested_token_type": "access_token",
+ "source_token_type": "refresh_token",
+ }
+
+ target_domain = "audible" if with_username else "amazon"
+ url = f"https://api.{target_domain}.{domain}/auth/token"
+
+ async with http_session.post(url, data=body) as resp:
+ resp.raise_for_status()
+ resp_dict = await resp.json()
+
+ expires_in_sec = int(resp_dict.get("expires_in", 3600))
+ expires = (datetime.now(UTC) + timedelta(seconds=expires_in_sec)).timestamp()
+
+ # Handle new format (actor_access_token) or fall back to legacy (access_token)
+ access_token = resp_dict.get("actor_access_token") or resp_dict.get("access_token")
+
+ if not access_token:
+ logger.error("Token refresh response missing both actor_access_token and access_token")
+ raise LoginFailed("Token refresh failed: no access token in response")
+
+ logger.debug(
+ "Token refreshed successfully using %s format",
+ "new (actor)" if "actor_access_token" in resp_dict else "legacy",
+ )
+
+ return {"access_token": access_token, "expires": expires}
+
+
async def cached_authenticator_from_file(path: str) -> audible.Authenticator:
- """Get an authenticator from file with caching to avoid repeated file reads."""
+ """Get an authenticator from file with caching and signing auth validation.
+
+ :param path: Path to the authenticator JSON file.
+ :return: The cached or loaded Authenticator instance.
+ """
logger = logging.getLogger("audible_helper")
if path in _AUTH_CACHE:
return _AUTH_CACHE[path]
logger.debug("Loading authenticator from file %s and caching it", path)
auth = await asyncio.to_thread(audible.Authenticator.from_file, path)
+
+ # Verify signing auth is available (not affected by API changes)
+ if auth.adp_token and auth.device_private_key:
+ logger.debug("Signing auth available - using stable RSA-signed requests")
+ else:
+ logger.warning(
+ "Signing auth not available - only bearer auth will work. "
+ "Consider re-authenticating for more stable auth."
+ )
+
_AUTH_CACHE[path] = auth
return auth
async def audible_custom_login(
code_verifier: str, response_url: str, serial: str, locale: str
) -> audible.Authenticator:
+ """Complete the authentication using the code_verifier, response_url, and serial.
+
+ :param code_verifier: The code verifier string used in OAuth flow.
+ :param response_url: The response URL containing the authorization code.
+ :param serial: The device serial number.
+ :param locale: The locale string.
+ :return: Audible Authenticator object.
+ :raises LoginFailed: If authorization code is not found in the URL.
"""
- Complete the authentication using the code_verifier, response_url, and serial asynchronously.
-
- Args:
- code_verifier: The code verifier string used in OAuth flow
- response_url: The response URL containing the authorization code
- serial: The device serial number
- locale: The locale string
- Returns:
- Audible Authenticator object
- Raises:
- LoginFailed: If authorization code is not found in the URL
- """
+ logger = logging.getLogger("audible_helper")
auth = audible.Authenticator()
auth.locale = audible.localization.Locale(locale)
response_url_parsed = urlparse(response_url)
parsed_qs = parse_qs(response_url_parsed.query)
- authorization_codes = parsed_qs.get("openid.oa2.authorization_code")
- if not authorization_codes:
- raise LoginFailed("Authorization code not found in the provided URL.")
+ # Try multiple parameter names for authorization code
+ # Audible may use different parameter names depending on the flow
+ authorization_code = None
+ for param_name in ["openid.oa2.authorization_code", "authorization_code", "code"]:
+ if codes := parsed_qs.get(param_name):
+ authorization_code = codes[0]
+ logger.debug("Found authorization code in parameter: %s", param_name)
+ break
+
+ if not authorization_code:
+ available_params = list(parsed_qs.keys())
+ raise LoginFailed(
+ f"Authorization code not found in URL. "
+ f"Expected 'openid.oa2.authorization_code' but found parameters: {available_params}"
+ )
- authorization_code = authorization_codes[0]
registration_data = await asyncio.to_thread(
audible.register.register,
authorization_code=authorization_code,
serial=serial,
)
auth._update_attrs(**registration_data)
+
+ # Log what auth methods are available after registration
+ if auth.adp_token and auth.device_private_key:
+ logger.info("Registration successful with signing auth (stable)")
+ else:
+ logger.warning("Registration successful but signing auth not available")
+
return auth