from __future__ import annotations
+import functools
import itertools
-from collections.abc import AsyncGenerator, Sequence
-from typing import TYPE_CHECKING
+import time
+from collections.abc import AsyncGenerator, Callable, Coroutine, Sequence
+from typing import TYPE_CHECKING, Any, ParamSpec, TypeVar, cast
import aioaudiobookshelf as aioabs
from aioaudiobookshelf.client.items import LibraryItemExpandedBook as AbsLibraryItemExpandedBook
from aioaudiobookshelf.client.items import (
LibraryItemExpandedPodcast as AbsLibraryItemExpandedPodcast,
)
+from aioaudiobookshelf.client.session_configuration import asyncio
from aioaudiobookshelf.exceptions import LoginError as AbsLoginError
+from aioaudiobookshelf.exceptions import RefreshTokenExpiredError
from aioaudiobookshelf.schema.author import AuthorExpanded
from aioaudiobookshelf.schema.calls_authors import (
AuthorWithItemsAndSeries as AbsAuthorWithItemsAndSeries,
ProviderFeature,
StreamType,
)
-from music_assistant_models.errors import LoginFailed, MediaNotFoundError
+from music_assistant_models.errors import AudioError, LoginFailed, MediaNotFoundError
from music_assistant_models.media_items import (
Audiobook,
AudioFormat,
ABS_SHELF_ID_ICONS,
CACHE_CATEGORY_LIBRARIES,
CACHE_KEY_LIBRARIES,
+ CONF_API_TOKEN,
CONF_HIDE_EMPTY_PODCASTS,
+ CONF_OLD_TOKEN,
CONF_PASSWORD,
- CONF_TOKEN,
CONF_URL,
CONF_USERNAME,
CONF_VERIFY_SSL,
type=ConfigEntryType.LABEL,
label="Please provide the address of your Audiobookshelf instance. To authenticate "
"you have two options: "
- "a) Provide username AND password. Leave token empty."
- "b) Provide ONLY the token.",
+ "a) Provide username AND password. Leave the API key empty. "
+ "b) Provide ONLY an API key.",
),
ConfigEntry(
key=CONF_URL,
description="The password to authenticate to the remote server.",
),
ConfigEntry(
- key=CONF_TOKEN,
+ key=CONF_API_TOKEN,
type=ConfigEntryType.SECURE_STRING,
- label="Token _instead_ of user/ password.",
+ label="API key _instead_ of user/ password. (ABS version >= 2.26)",
required=False,
description="Instead of using a username and password, "
- "you may provide the user's token."
- "\nThe token can be seen in Audiobookshelf as an admin user in Settings -> Users.",
+ "you may provide an API key (ABS version >= 2.26). "
+ "Please consult the docs.",
+ ),
+ ConfigEntry(
+ key=CONF_OLD_TOKEN,
+ type=ConfigEntryType.SECURE_STRING,
+ label="old token",
+ required=False,
+ hidden=True,
),
ConfigEntry(
key=CONF_VERIFY_SSL,
)
+R = TypeVar("R")
+P = ParamSpec("P")
+
+
class Audiobookshelf(MusicProvider):
"""Audiobookshelf MusicProvider."""
+ @staticmethod
+ def handle_refresh_token(
+ method: Callable[P, Coroutine[Any, Any, R]],
+ ) -> Callable[P, Coroutine[Any, Any, R]]:
+ """Decorate a method to handle an expired refresh token by relogin."""
+
+ @functools.wraps(method)
+ async def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
+ self = cast("Audiobookshelf", args[0])
+ try:
+ return await method(*args, **kwargs)
+ except RefreshTokenExpiredError:
+ self.logger.debug("Refresh token expired. Trying to renew.")
+ await self.reauthenticate()
+ return await method(*args, **kwargs)
+
+ return wrapper
+
@property
def supported_features(self) -> set[ProviderFeature]:
"""Features supported by this Provider."""
base_url = str(self.config.get_value(CONF_URL))
username = str(self.config.get_value(CONF_USERNAME))
password = str(self.config.get_value(CONF_PASSWORD))
- token = self.config.get_value(CONF_TOKEN)
+ token_old = self.config.get_value(CONF_OLD_TOKEN)
+ token_api = self.config.get_value(CONF_API_TOKEN)
verify_ssl = bool(self.config.get_value(CONF_VERIFY_SSL))
session_config = aioabs.SessionConfiguration(
session=self.mass.http_session,
pagination_items_per_page=30, # audible provider goes with 50 for pagination
)
try:
- if token is not None:
- session_config.token = str(token)
+ if token_api is not None or token_old is not None:
+ _token = token_api if token_api is not None else token_old
+ session_config.token = str(_token)
(
self._client,
self._client_socket,
except AbsLoginError as exc:
raise LoginFailed(f"Login to abs instance at {base_url} failed.") from exc
+ if token_old is not None and token_api is None:
+ # Log Message that the old token won't work
+ _version = self._client.server_settings.version.split(".")
+ if len(_version) >= 2:
+ try:
+ major, minor = int(_version[0]), int(_version[1])
+ except ValueError:
+ major = minor = 0
+ if major >= 2 and minor >= 26:
+ self.logger.warning(
+ """
+
+######## Audiobookshelf API key change #############################################################
+
+Audiobookshelf introduced a new API key system in version 2.26 (JWT).
+You are still using a token configured with a previous version of Audiobookshelf,
+but you are running version %s. This will stop working in a future Audiobookshelf release.
+Please create a non-expiring API Key instead, and update your configuration accordingly.
+Refer to the documentation of Audiobookshelf, https://www.audiobookshelf.org/guides/api-keys/
+and of Music Assistant https://www.music-assistant.io/music-providers/audiobookshelf/
+for more details.
+
+""",
+ self._client.server_settings.version,
+ )
+
self.cache_base_key = self.instance_id
cached_libraries = await self.mass.cache.get(
on_user_item_progress_updated=self._socket_abs_user_item_progress_updated,
)
+ self._client_socket.set_refresh_token_expired_callback(
+ on_refresh_token_expired=self._socket_abs_refresh_token_expired
+ )
+
# progress guard
self.progress_guard = ProgressGuard()
user = await self._client.get_my_user()
await self._set_playlog_from_user(user)
+ # safe guard reauthentication
+ self.reauthenticate_lock = asyncio.Lock()
+ self.reauthenticate_last = 0.0
+
+ @handle_refresh_token
async def unload(self, is_removed: bool = False) -> None:
"""
Handle unload/close of the provider.
# For streaming providers return True here but for local file based providers return False.
return False
+ @handle_refresh_token
async def sync_library(self, media_type: MediaType) -> None:
"""Obtain audiobook library ids and podcast library ids."""
libraries = await self._client.get_all_libraries()
continue
yield mass_podcast
+ @handle_refresh_token
async def _get_abs_expanded_podcast(
self, prov_podcast_id: str
) -> AbsLibraryItemExpandedPodcast:
return abs_podcast
+ @handle_refresh_token
async def get_podcast(self, prov_podcast_id: str) -> Podcast:
"""Get single podcast."""
abs_podcast = await self._get_abs_expanded_podcast(prov_podcast_id=prov_podcast_id)
yield mass_episode
episode_cnt += 1
+ @handle_refresh_token
async def get_podcast_episode(
self, prov_episode_id: str, add_progress: bool = True
) -> PodcastEpisode:
)
yield mass_audiobook
+ @handle_refresh_token
async def _get_abs_expanded_audiobook(
self, prov_audiobook_id: str
) -> AbsLibraryItemExpandedBook:
return abs_audiobook
+ @handle_refresh_token
async def get_audiobook(self, prov_audiobook_id: str) -> Audiobook:
"""Get a single audiobook.
async def _get_stream_details_audiobook(
self, abs_audiobook: AbsLibraryItemExpandedBook
) -> StreamDetails:
- """Streamdetails audiobook."""
+ """Streamdetails audiobook.
+
+ We always use a custom stream type, also for single file, such
+ that we can handle an ffmpeg error and refresh our tokens.
+ """
tracks = abs_audiobook.media.tracks
- token = self._client.token
- base_url = str(self.config.get_value(CONF_URL))
if len(tracks) == 0:
raise MediaNotFoundError("Stream not found")
if abs_audiobook.media.tracks[0].metadata is not None:
content_type = ContentType.try_parse(abs_audiobook.media.tracks[0].metadata.ext)
- if len(tracks) > 1:
- self.logger.debug("Using playback for multiple file audiobook.")
-
- return StreamDetails(
- provider=self.instance_id,
- item_id=abs_audiobook.id_,
- audio_format=AudioFormat(content_type=content_type),
- media_type=MediaType.AUDIOBOOK,
- stream_type=StreamType.CUSTOM,
- duration=int(abs_audiobook.media.duration),
- data=tracks,
- can_seek=True,
- allow_seek=True,
- )
-
- self.logger.debug(
- f'Using direct playback for audiobook "{abs_audiobook.media.metadata.title}".'
- )
- media_url = abs_audiobook.media.tracks[0].content_url
- stream_url = f"{base_url}{media_url}?token={token}"
-
return StreamDetails(
provider=self.lookup_key,
item_id=abs_audiobook.id_,
- audio_format=AudioFormat(
- content_type=content_type,
- ),
+ audio_format=AudioFormat(content_type=content_type),
media_type=MediaType.AUDIOBOOK,
- stream_type=StreamType.HTTP,
- path=stream_url,
+ stream_type=StreamType.CUSTOM,
+ duration=int(abs_audiobook.media.duration),
+ data=tracks,
can_seek=True,
allow_seek=True,
)
streamdetails: The stream to be used
seek_position: The seeking position in seconds
"""
- tracks, position = self._get_track_from_position(streamdetails.data, seek_position)
- if not tracks:
- raise MediaNotFoundError(f"Track not found at seek position {seek_position}.")
- self.logger.debug(
- f"Skipped {len(streamdetails.data) - len(tracks)} tracks while seeking to position {seek_position}." # noqa: E501
- )
- base_url = str(self.config.get_value(CONF_URL))
- track_urls = []
- for track in tracks:
- stream_url = f"{base_url}{track.content_url}?token={self._client.token}"
- track_urls.append(stream_url)
-
- async for chunk in get_multi_file_stream(
- mass=self.mass,
- streamdetails=StreamDetails(
- provider=self.instance_id,
- item_id=streamdetails.item_id,
- audio_format=streamdetails.audio_format,
- media_type=MediaType.AUDIOBOOK,
- stream_type=StreamType.MULTI_FILE,
- duration=streamdetails.duration,
- data=track_urls,
- can_seek=True,
- allow_seek=True,
- ),
- seek_position=position,
- ):
- yield chunk
+ async def _get_audio_stream() -> AsyncGenerator[bytes, None]:
+ tracks, position = self._get_track_from_position(streamdetails.data, seek_position)
+ if not tracks:
+ raise MediaNotFoundError(f"Track not found at seek position {seek_position}.")
+
+ self.logger.debug(
+ f"Skipped {len(streamdetails.data) - len(tracks)} tracks"
+ " while seeking to position {seek_position}."
+ )
+ base_url = str(self.config.get_value(CONF_URL))
+ track_urls = []
+ for track in tracks:
+ stream_url = f"{base_url}{track.content_url}?token={self._client.token}"
+ track_urls.append(stream_url)
+
+ async for chunk in get_multi_file_stream(
+ mass=self.mass,
+ streamdetails=StreamDetails(
+ provider=self.lookup_key,
+ item_id=streamdetails.item_id,
+ audio_format=streamdetails.audio_format,
+ media_type=MediaType.AUDIOBOOK,
+ stream_type=StreamType.MULTI_FILE,
+ duration=streamdetails.duration,
+ data=track_urls,
+ can_seek=True,
+ allow_seek=True,
+ ),
+ seek_position=position,
+ raise_ffmpeg_exception=True,
+ ):
+ yield chunk
+
+ # Should our token expire, we try to refresh them and continue streaming once.
+ _refreshed = False
+ while True:
+ try:
+ async for chunk in _get_audio_stream():
+ _refreshed = False
+ yield chunk
+ break
+ except AudioError as err:
+ if not _refreshed:
+ self.logger.debug("FFmpeg raised an error. Trying to refresh token.")
+ try:
+ await self._client.session_config.refresh()
+ except RefreshTokenExpiredError:
+ await self.reauthenticate()
+ _refreshed = True
+ else:
+ self.logger.error(err)
+ break
async def _get_stream_details_episode(self, podcast_id: str) -> StreamDetails:
- """Streamdetails of a podcast episode."""
+ """Streamdetails of a podcast episode.
+
+ There are no multi-file podcasts in abs, but we use a custom
+ stream to handle possible ffmpeg errors.
+ """
abs_podcast_id, abs_episode_id = podcast_id.split(" ")
abs_episode = None
if abs_episode is None:
raise MediaNotFoundError("Stream not found")
self.logger.debug(f'Using direct playback for podcast episode "{abs_episode.title}".')
- token = self._client.token
- base_url = str(self.config.get_value(CONF_URL))
- media_url = abs_episode.audio_track.content_url
- full_url = f"{base_url}{media_url}?token={token}"
content_type = ContentType.UNKNOWN
if abs_episode.audio_track.metadata is not None:
content_type = ContentType.try_parse(abs_episode.audio_track.metadata.ext)
content_type=content_type,
),
media_type=MediaType.PODCAST_EPISODE,
- stream_type=StreamType.HTTP,
- path=full_url,
+ stream_type=StreamType.CUSTOM,
can_seek=True,
allow_seek=True,
+ data=[abs_episode.audio_track],
)
+ @handle_refresh_token
async def get_resume_position(self, item_id: str, media_type: MediaType) -> tuple[bool, int]:
"""Return finished:bool, position_ms: int."""
progress: None | MediaProgress = None
return False, 0
+ @handle_refresh_token
async def recommendations(self) -> list[RecommendationFolder]:
"""Get recommendations."""
# We have to avoid "flooding" the home page, which becomes especially troublesome if users
items_collected.append(items)
items_by_shelf_id[shelf.id_] = items_collected
+ @handle_refresh_token
async def on_played(
self,
media_type: MediaType,
is_finished=fully_played,
)
+ @handle_refresh_token
async def browse(self, path: str) -> Sequence[MediaItemType | ItemMapping | BrowseFolder]:
"""Browse for audiobookshelf.
return
await self._update_playlog_episode(progress)
+ async def _socket_abs_refresh_token_expired(self) -> None:
+ await self.reauthenticate()
+
+ async def reauthenticate(self) -> None:
+ """Reauthorize the abs session config if refresh token expired."""
+ # some safe guarding should that function be called simultaneously
+ if self.reauthenticate_lock.locked() or time.time() - self.reauthenticate_last < 5:
+ while True:
+ if not self.reauthenticate_lock.locked():
+ return
+ await asyncio.sleep(0.5)
+ async with self.reauthenticate_lock:
+ await self._client.session_config.authenticate(
+ username=str(self.config.get_value(CONF_USERNAME)),
+ password=str(self.config.get_value(CONF_PASSWORD)),
+ )
+ self.reauthenticate_last = time.time()
+
def _get_all_known_item_ids(self) -> set[str]:
known_ids = set()
for lib in self.libraries.podcasts.values():