--- /dev/null
+"""Audiobookshelf provider for Music Assistant.
+
+Audiobookshelf is abbreviated ABS here.
+"""
+
+from __future__ import annotations
+
+import logging
+from collections.abc import AsyncGenerator, Sequence
+from typing import TYPE_CHECKING
+
+from music_assistant_models.config_entries import (
+ ConfigEntry,
+ ConfigValueType,
+ ProviderConfig,
+)
+from music_assistant_models.enums import (
+ ConfigEntryType,
+ ContentType,
+ ImageType,
+ MediaType,
+ ProviderFeature,
+ StreamType,
+)
+from music_assistant_models.errors import MediaNotFoundError
+from music_assistant_models.media_items import (
+ Audiobook,
+ AudioFormat,
+ BrowseFolder,
+ ItemMapping,
+ MediaItemChapter,
+ MediaItemImage,
+ MediaItemType,
+ Podcast,
+ PodcastEpisode,
+ ProviderMapping,
+ UniqueList,
+)
+from music_assistant_models.streamdetails import StreamDetails
+
+from music_assistant.models.music_provider import MusicProvider
+from music_assistant.providers.audiobookshelf.abs_client import (
+ ABSClient,
+)
+from music_assistant.providers.audiobookshelf.abs_schema import (
+ ABSAudioBook,
+ ABSLibrary,
+ ABSPodcast,
+ ABSPodcastEpisodeExpanded,
+)
+
+if TYPE_CHECKING:
+ from music_assistant_models.provider import ProviderManifest
+
+ from music_assistant.mass import MusicAssistant
+ from music_assistant.models import ProviderInstanceType
+
+CONF_URL = "url"
+CONF_USERNAME = "username"
+CONF_PASSWORD = "password"
+CONF_VERIFY_SSL = "verify_ssl"
+
+
+async def setup(
+ mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
+) -> ProviderInstanceType:
+ """Initialize provider(instance) with given configuration."""
+ return Audiobookshelf(mass, manifest, config)
+
+
+async def get_config_entries(
+ mass: MusicAssistant,
+ instance_id: str | None = None,
+ action: str | None = None,
+ values: dict[str, ConfigValueType] | None = None,
+) -> tuple[ConfigEntry, ...]:
+ """
+ Return Config entries to setup this provider.
+
+ instance_id: id of an existing provider instance (None if new instance setup).
+ action: [optional] action key called from config entries UI.
+ values: the (intermediate) raw values for config entries sent with the action.
+ """
+ # ruff: noqa: ARG001
+ return (
+ ConfigEntry(
+ key=CONF_URL,
+ type=ConfigEntryType.STRING,
+ label="Server",
+ required=True,
+ description="The url of the Audiobookshelf server to connect to.",
+ ),
+ ConfigEntry(
+ key=CONF_USERNAME,
+ type=ConfigEntryType.STRING,
+ label="Username",
+ required=True,
+ description="The username to authenticate to the remote server.",
+ ),
+ ConfigEntry(
+ key=CONF_PASSWORD,
+ type=ConfigEntryType.SECURE_STRING,
+ label="Password",
+ required=False,
+ description="The password to authenticate to the remote server.",
+ ),
+ ConfigEntry(
+ key=CONF_VERIFY_SSL,
+ type=ConfigEntryType.BOOLEAN,
+ label="Verify SSL",
+ required=False,
+ description="Whether or not to verify the certificate of SSL/TLS connections.",
+ category="advanced",
+ default_value=True,
+ ),
+ )
+
+
+class Audiobookshelf(MusicProvider):
+ """Audiobookshelf MusicProvider."""
+
+ @property
+ def supported_features(self) -> set[ProviderFeature]:
+ """Features supported by this Provider."""
+ return {
+ ProviderFeature.LIBRARY_PODCASTS,
+ ProviderFeature.LIBRARY_AUDIOBOOKS,
+ ProviderFeature.BROWSE,
+ }
+
+ async def handle_async_init(self) -> None:
+ """Pass config values to client and initialize."""
+ self._client = ABSClient()
+ await self._client.init(
+ session=self.mass.http_session,
+ base_url=str(self.config.get_value(CONF_URL)),
+ username=str(self.config.get_value(CONF_USERNAME)),
+ password=str(self.config.get_value(CONF_PASSWORD)),
+ check_ssl=bool(self.config.get_value(CONF_VERIFY_SSL)),
+ )
+ await self._client.sync()
+
+ async def unload(self, is_removed: bool = False) -> None:
+ """
+ Handle unload/close of the provider.
+
+ Called when provider is deregistered (e.g. MA exiting or config reloading).
+ is_removed will be set to True when the provider is removed from the configuration.
+ """
+ await self._client.logout()
+
+ @property
+ def is_streaming_provider(self) -> bool:
+ """Return True if the provider is a streaming provider."""
+ # For streaming providers return True here but for local file based providers return False.
+ return False
+
+ async def sync_library(self, media_types: tuple[MediaType, ...]) -> None:
+ """Run library sync for this provider."""
+ await self._client.sync()
+ await super().sync_library(media_types=media_types)
+
+ def _parse_podcast(self, abs_podcast: ABSPodcast) -> Podcast:
+ """Translate ABSPodcast to MassPodcast."""
+ title = abs_podcast.media.metadata.title
+ # Per API doc title may be None.
+ if title is None:
+ title = "UNKNOWN"
+ mass_podcast = Podcast(
+ item_id=abs_podcast.id_,
+ name=title,
+ publisher=abs_podcast.media.metadata.author,
+ provider=self.domain,
+ total_episodes=abs_podcast.media.num_episodes,
+ provider_mappings={
+ ProviderMapping(
+ item_id=abs_podcast.id_,
+ provider_domain=self.domain,
+ provider_instance=self.instance_id,
+ )
+ },
+ )
+ mass_podcast.metadata.description = abs_podcast.media.metadata.description
+ token = self._client.token
+ image_url = f"{self.config.get_value(CONF_URL)}/api/items/{abs_podcast.id_}/cover?token={token}"
+ mass_podcast.metadata.images = UniqueList(
+ [MediaItemImage(type=ImageType.THUMB, path=image_url, provider=self.lookup_key)]
+ )
+ mass_podcast.metadata.explicit = abs_podcast.media.metadata.explicit
+ if abs_podcast.media.metadata.language is not None:
+ mass_podcast.metadata.languages = UniqueList([abs_podcast.media.metadata.language])
+ if abs_podcast.media.metadata.genres is not None:
+ mass_podcast.metadata.genres = set(abs_podcast.media.metadata.genres)
+ mass_podcast.metadata.release_date = abs_podcast.media.metadata.release_date
+
+ return mass_podcast
+
+ async def _parse_podcast_episode(
+ self,
+ episode: ABSPodcastEpisodeExpanded,
+ prov_podcast_id: str,
+ fallback_episode_cnt: int | None = None,
+ ) -> PodcastEpisode:
+ """Translate ABSPodcastEpisode to MassPodcastEpisode.
+
+ For an episode the id is set to f"{podcast_id} {episode_id}".
+ ABS ids have no spaces, so we can split at a space to retrieve both
+ in other functions.
+ """
+ url = f"{self.config.get_value(CONF_URL)}{episode.audio_track.content_url}"
+ episode_id = f"{prov_podcast_id} {episode.id_}"
+
+ if episode.published_at is not None:
+ position = -episode.published_at
+ else:
+ position = 0
+ if fallback_episode_cnt is not None:
+ position = fallback_episode_cnt
+ mass_episode = PodcastEpisode(
+ item_id=episode_id,
+ provider=self.domain,
+ name=episode.title,
+ duration=int(episode.duration),
+ position=position,
+ podcast=ItemMapping(
+ item_id=prov_podcast_id,
+ provider=self.instance_id,
+ name=episode.title,
+ media_type=MediaType.PODCAST,
+ ),
+ provider_mappings={
+ ProviderMapping(
+ item_id=episode_id,
+ provider_domain=self.domain,
+ provider_instance=self.instance_id,
+ audio_format=AudioFormat(
+ content_type=ContentType.UNKNOWN,
+ ),
+ url=url,
+ )
+ },
+ )
+ progress, finished = await self._client.get_podcast_progress_ms(
+ prov_podcast_id, episode.id_
+ )
+ if progress is not None:
+ mass_episode.resume_position_ms = progress
+ mass_episode.fully_played = finished
+
+ # cover image
+ url_base = f"{self.config.get_value(CONF_URL)}"
+ url_api = f"/api/items/{prov_podcast_id}/cover?token={self._client.token}"
+ url_cover = f"{url_base}{url_api}"
+ mass_episode.metadata.images = UniqueList(
+ [MediaItemImage(type=ImageType.THUMB, path=url_cover, provider=self.lookup_key)]
+ )
+
+ return mass_episode
+
+ async def get_library_podcasts(self) -> AsyncGenerator[Podcast, None]:
+ """Retrieve library/subscribed podcasts from the provider."""
+ async for abs_podcast in self._client.get_all_podcasts():
+ mass_podcast = self._parse_podcast(abs_podcast)
+ yield mass_podcast
+
+ async def get_podcast(self, prov_podcast_id: str) -> Podcast:
+ """Get single podcast."""
+ abs_podcast = await self._client.get_podcast(prov_podcast_id)
+ return self._parse_podcast(abs_podcast)
+
+ async def get_podcast_episodes(self, prov_podcast_id: str) -> list[PodcastEpisode]:
+ """Get all podcast episodes of podcast."""
+ abs_podcast = await self._client.get_podcast(prov_podcast_id)
+ episode_list = []
+ episode_cnt = 1
+ for abs_episode in abs_podcast.media.episodes:
+ mass_episode = await self._parse_podcast_episode(
+ abs_episode, prov_podcast_id, episode_cnt
+ )
+ episode_list.append(mass_episode)
+ episode_cnt += 1
+ return episode_list
+
+ async def get_podcast_episode(self, prov_episode_id: str) -> PodcastEpisode:
+ """Get single podcast episode."""
+ prov_podcast_id, e_id = prov_episode_id.split(" ")
+ abs_podcast = await self._client.get_podcast(prov_podcast_id)
+ episode_cnt = 1
+ for abs_episode in abs_podcast.media.episodes:
+ if abs_episode.id_ == e_id:
+ return await self._parse_podcast_episode(abs_episode, prov_podcast_id, episode_cnt)
+
+ episode_cnt += 1
+ raise MediaNotFoundError("Episode not found")
+
+ async def _parse_audiobook(self, abs_audiobook: ABSAudioBook) -> Audiobook:
+ mass_audiobook = Audiobook(
+ item_id=abs_audiobook.id_,
+ provider=self.domain,
+ name=abs_audiobook.media.metadata.title,
+ duration=int(abs_audiobook.media.duration),
+ provider_mappings={
+ ProviderMapping(
+ item_id=abs_audiobook.id_,
+ provider_domain=self.domain,
+ provider_instance=self.instance_id,
+ )
+ },
+ publisher=abs_audiobook.media.metadata.publisher,
+ authors=UniqueList([x.name for x in abs_audiobook.media.metadata.authors]),
+ narrators=UniqueList(abs_audiobook.media.metadata.narrators),
+ )
+ mass_audiobook.metadata.description = abs_audiobook.media.metadata.description
+ if abs_audiobook.media.metadata.language is not None:
+ mass_audiobook.metadata.languages = UniqueList([abs_audiobook.media.metadata.language])
+ mass_audiobook.metadata.release_date = abs_audiobook.media.metadata.published_date
+ if abs_audiobook.media.metadata.genres is not None:
+ mass_audiobook.metadata.genres = set(abs_audiobook.media.metadata.genres)
+
+ # chapters
+ chapters = []
+ for idx, chapter in enumerate(abs_audiobook.media.chapters):
+ chapters.append(
+ MediaItemChapter(
+ position=idx + 1, # chapter starting at 1
+ name=chapter.title,
+ start=chapter.start,
+ end=chapter.end,
+ )
+ )
+ mass_audiobook.metadata.chapters = chapters
+
+ mass_audiobook.metadata.explicit = abs_audiobook.media.metadata.explicit
+ progress, finished = await self._client.get_audiobook_progress_ms(abs_audiobook.id_)
+ if progress is not None:
+ mass_audiobook.resume_position_ms = progress
+ mass_audiobook.fully_played = finished
+
+ # cover
+ base_url = f"{self.config.get_value(CONF_URL)}"
+ api_url = f"/api/items/{abs_audiobook.id_}/cover?token={self._client.token}"
+ cover_url = f"{base_url}{api_url}"
+ mass_audiobook.metadata.images = UniqueList(
+ [MediaItemImage(type=ImageType.THUMB, path=cover_url, provider=self.lookup_key)]
+ )
+
+ return mass_audiobook
+
+ async def get_library_audiobooks(self) -> AsyncGenerator[Audiobook, None]:
+ """Get Audiobook libraries."""
+ async for abs_audiobook in self._client.get_all_audiobooks():
+ mass_audiobook = await self._parse_audiobook(abs_audiobook)
+ yield mass_audiobook
+
+ async def get_audiobook(self, prov_audiobook_id: str) -> Audiobook:
+ """Get a single audiobook."""
+ abs_audiobook = await self._client.get_audiobook(prov_audiobook_id)
+ return await self._parse_audiobook(abs_audiobook)
+
+ async def get_stream_details(
+ self, item_id: str, media_type: MediaType = MediaType.TRACK
+ ) -> StreamDetails:
+ """Get stream of item."""
+ if media_type == MediaType.PODCAST_EPISODE:
+ return await self._get_stream_details_podcast_episode(item_id)
+ elif media_type == MediaType.AUDIOBOOK:
+ return await self._get_stream_details_audiobook(item_id)
+ raise MediaNotFoundError("Stream unknown")
+
+ async def _get_stream_details_audiobook(self, audiobook_id: str) -> StreamDetails:
+ """Only single audio file in audiobook."""
+ abs_audiobook = await self._client.get_audiobook(audiobook_id)
+ tracks = abs_audiobook.media.tracks
+ if len(tracks) == 0:
+ raise MediaNotFoundError("Stream not found")
+ if len(tracks) > 1:
+ logging.warning("Music Assistant only supports single file base audiobooks")
+ token = self._client.token
+ base_url = str(self.config.get_value(CONF_URL))
+ media_url = tracks[0].content_url
+ stream_url = f"{base_url}{media_url}?token={token}"
+ # audiobookshelf returns information of stream, so we should be able
+ # to lift unknown at some point.
+ return StreamDetails(
+ provider=self.instance_id,
+ item_id=audiobook_id,
+ audio_format=AudioFormat(
+ content_type=ContentType.UNKNOWN,
+ ),
+ media_type=MediaType.AUDIOBOOK,
+ stream_type=StreamType.HTTP,
+ path=stream_url,
+ )
+
+ async def _get_stream_details_podcast_episode(self, podcast_id: str) -> StreamDetails:
+ """Stream of a Podcast."""
+ abs_podcast_id, abs_episode_id = podcast_id.split(" ")
+ abs_episode = None
+
+ abs_podcast = await self._client.get_podcast(abs_podcast_id)
+ for abs_episode in abs_podcast.media.episodes:
+ if abs_episode.id_ == abs_episode_id:
+ break
+ if abs_episode is None:
+ raise MediaNotFoundError("Stream not found")
+ token = self._client.token
+ base_url = str(self.config.get_value(CONF_URL))
+ media_url = abs_episode.audio_track.content_url
+ full_url = f"{base_url}{media_url}?token={token}"
+ return StreamDetails(
+ provider=self.instance_id,
+ item_id=podcast_id,
+ audio_format=AudioFormat(
+ content_type=ContentType.UNKNOWN,
+ ),
+ media_type=MediaType.PODCAST_EPISODE,
+ stream_type=StreamType.HTTP,
+ path=full_url,
+ )
+
+ async def on_played(
+ self, media_type: MediaType, item_id: str, fully_played: bool, position: int
+ ) -> None:
+ """Update progress in Audiobookshelf."""
+ if media_type == MediaType.PODCAST_EPISODE:
+ abs_podcast_id, abs_episode_id = item_id.split(" ")
+ mass_podcast_episode = await self.get_podcast_episode(item_id)
+ duration = mass_podcast_episode.duration
+ await self._client.update_podcast_progress(
+ podcast_id=abs_podcast_id,
+ episode_id=abs_episode_id,
+ progress_s=position,
+ duration_s=duration,
+ is_finished=fully_played,
+ )
+ if media_type == MediaType.AUDIOBOOK:
+ mass_audiobook = await self.get_audiobook(item_id)
+ duration = mass_audiobook.duration
+ await self._client.update_audiobook_progress(
+ audiobook_id=item_id,
+ progress_s=position,
+ duration_s=duration,
+ is_finished=fully_played,
+ )
+
+ async def _browse_root(
+ self, library_list: list[ABSLibrary], item_path: str
+ ) -> Sequence[MediaItemType | ItemMapping]:
+ """Browse root folder in browse view.
+
+ Helper functions. Shows the library name, ABS supports multiple libraries
+ of both podcasts and audiobooks.
+ """
+ items: list[MediaItemType | ItemMapping] = []
+ for library in library_list:
+ items.append(
+ BrowseFolder(
+ item_id=library.id_,
+ name=library.name,
+ provider=self.instance_id,
+ path=f"{self.instance_id}://{item_path}/{library.id_}",
+ )
+ )
+ return items
+
+ async def _browse_lib(
+ self,
+ library_id: str,
+ library_list: list[ABSLibrary],
+ media_type: MediaType,
+ ) -> Sequence[MediaItemType | ItemMapping]:
+ """Browse lib folder in browse view.
+
+ Helper functions. Shows the items which are part of an ABS library.
+ """
+ library = None
+ for library in library_list:
+ if library_id == library.id_:
+ break
+ if library is None:
+ raise MediaNotFoundError("Lib missing.")
+
+ def get_item_mapping(item: ABSAudioBook | ABSPodcast) -> ItemMapping:
+ title = item.media.metadata.title
+ if title is None:
+ title = "UNKNOWN"
+ token = self._client.token
+ url = f"{self.config.get_value(CONF_URL)}/api/items/{item.id_}/cover?token={token}"
+ image = MediaItemImage(type=ImageType.THUMB, path=url, provider=self.lookup_key)
+ return ItemMapping(
+ media_type=media_type,
+ item_id=item.id_,
+ provider=self.instance_id,
+ name=title,
+ image=image,
+ )
+
+ items: list[MediaItemType | ItemMapping] = []
+ if media_type == MediaType.PODCAST:
+ async for podcast in self._client.get_all_podcasts_by_library(library):
+ items.append(get_item_mapping(podcast))
+ elif media_type == MediaType.AUDIOBOOK:
+ async for audiobook in self._client.get_all_audiobooks_by_library(library):
+ items.append(get_item_mapping(audiobook))
+ else:
+ raise RuntimeError(f"Media type must not be {media_type}")
+ return items
+
+ async def browse(self, path: str) -> Sequence[MediaItemType | ItemMapping]:
+ """Browse features shows libraries names."""
+ item_path = path.split("://", 1)[1]
+ if not item_path: # root
+ return await super().browse(path)
+
+ # HANDLE ROOT PATH
+ if item_path == "audiobooks":
+ library_list = self._client.audiobook_libraries
+ return await self._browse_root(library_list, item_path)
+ elif item_path == "podcasts":
+ library_list = self._client.podcast_libraries
+ return await self._browse_root(library_list, item_path)
+
+ # HANDLE WITHIN LIBRARY
+ library_type, library_id = item_path.split("/")
+ if library_type == "audiobooks":
+ library_list = self._client.audiobook_libraries
+ media_type = MediaType.AUDIOBOOK
+ elif library_type == "podcasts":
+ library_list = self._client.podcast_libraries
+ media_type = MediaType.PODCAST
+ else:
+ raise MediaNotFoundError("Specified Lib Type unknown")
+
+ return await self._browse_lib(library_id, library_list, media_type)
--- /dev/null
+"""Simple Client for Audiobookshelf.
+
+We only implement the functions necessary for mass.
+"""
+
+from collections.abc import AsyncGenerator
+from enum import Enum
+from typing import Any
+
+from aiohttp import ClientSession
+
+from music_assistant.providers.audiobookshelf.abs_schema import (
+ ABSAudioBook,
+ ABSLibrariesItemsResponse,
+ ABSLibrariesResponse,
+ ABSLibrary,
+ ABSLibraryItem,
+ ABSLoginResponse,
+ ABSMediaProgress,
+ ABSPodcast,
+ ABSUser,
+)
+
+# use page calls in case of large libraries
+LIMIT_ITEMS_PER_PAGE = 10
+
+
+class ABSStatus(Enum):
+ """ABS Status Enum."""
+
+ STATUS_OK = 200
+ STATUS_NOT_FOUND = 404
+
+
+class ABSClient:
+ """Simple Audiobookshelf client.
+
+ Only implements methods needed for Music Assistant.
+ """
+
+ def __init__(self) -> None:
+ """Client authorization."""
+ self.podcast_libraries: list[ABSLibrary] = []
+ self.audiobook_libraries: list[ABSLibrary] = []
+ self.user: ABSUser
+ self.check_ssl: bool
+
+ async def init(
+ self,
+ session: ClientSession,
+ base_url: str,
+ username: str,
+ password: str,
+ check_ssl: bool = True,
+ ) -> None:
+ """Initialize."""
+ self.session = session
+ self.base_url = base_url
+ self.check_ssl = check_ssl
+ self.session_headers = {}
+ self.user = await self.login(username=username, password=password)
+ self.token: str = self.user.token
+ self.session_headers = {"Authorization": f"Bearer {self.token}"}
+
+ async def _post(
+ self,
+ endpoint: str,
+ data: dict[str, Any] | None = None,
+ add_api_endpoint: bool = True,
+ ) -> bytes:
+ """POST request to abs api.
+
+ login and logout endpoint do not have "api" in url
+ """
+ _endpoint = (
+ f"{self.base_url}/api/{endpoint}" if add_api_endpoint else f"{self.base_url}/{endpoint}"
+ )
+ response = await self.session.post(
+ _endpoint, json=data, ssl=self.check_ssl, headers=self.session_headers
+ )
+ status = response.status
+ if status != ABSStatus.STATUS_OK.value:
+ raise RuntimeError(f"API post call to {endpoint=} failed.")
+ return await response.read()
+
+ async def _get(self, endpoint: str, params: dict[str, str | int] | None = None) -> bytes:
+ """GET request to abs api."""
+ _endpoint = f"{self.base_url}/api/{endpoint}"
+ response = await self.session.get(
+ _endpoint, params=params, ssl=self.check_ssl, headers=self.session_headers
+ )
+ status = response.status
+ if status not in [ABSStatus.STATUS_OK.value, ABSStatus.STATUS_NOT_FOUND.value]:
+ raise RuntimeError(f"API get call to {endpoint=} failed.")
+ if response.content_type == "application/json":
+ return await response.read()
+ elif status == ABSStatus.STATUS_NOT_FOUND.value:
+ return b""
+ else:
+ raise RuntimeError("Response must be json.")
+
+ async def _patch(self, endpoint: str, data: dict[str, Any] | None = None) -> None:
+ """PATCH request to abs api."""
+ _endpoint = f"{self.base_url}/api/{endpoint}"
+ response = await self.session.patch(
+ _endpoint, json=data, ssl=self.check_ssl, headers=self.session_headers
+ )
+ status = response.status
+ if status != ABSStatus.STATUS_OK.value:
+ raise RuntimeError(f"API patch call to {endpoint=} failed.")
+
+ async def login(self, username: str, password: str) -> ABSUser:
+ """Obtain user holding token from ABS with username/ password authentication."""
+ data = await self._post(
+ "login",
+ add_api_endpoint=False,
+ data={"username": username, "password": password},
+ )
+
+ return ABSLoginResponse.from_json(data).user
+
+ async def logout(self) -> None:
+ """Logout from ABS."""
+ await self._post("logout", add_api_endpoint=False)
+
+ async def get_user(self, id_: str) -> ABSUser:
+ """Get an ABS user."""
+ data = await self._get(f"users/{id_}")
+ return ABSUser.from_json(data)
+
+ async def sync(self) -> None:
+ """Update available book and podcast libraries."""
+ data = await self._get("libraries")
+ libraries = ABSLibrariesResponse.from_json(data)
+ ids = [x.id_ for x in self.audiobook_libraries]
+ ids.extend([x.id_ for x in self.podcast_libraries])
+ for library in libraries.libraries:
+ media_type = library.media_type
+ if library.id_ not in ids:
+ if media_type == "book":
+ self.audiobook_libraries.append(library)
+ elif media_type == "podcast":
+ self.podcast_libraries.append(library)
+ self.user = await self.get_user(self.user.id_)
+
+ async def get_all_podcasts(self) -> AsyncGenerator[ABSPodcast]:
+ """Get all available podcasts."""
+ for library in self.podcast_libraries:
+ async for podcast in self.get_all_podcasts_by_library(library):
+ yield podcast
+
+ async def _get_lib_items(self, lib: ABSLibrary) -> AsyncGenerator[bytes]:
+ """Get library items with pagination."""
+ page_cnt = 0
+ while True:
+ data = await self._get(
+ f"/libraries/{lib.id_}/items",
+ params={"limit": LIMIT_ITEMS_PER_PAGE, "page": page_cnt},
+ )
+ page_cnt += 1
+ yield data
+
+ async def get_all_podcasts_by_library(self, lib: ABSLibrary) -> AsyncGenerator[ABSPodcast]:
+ """Get all podcasts in a library."""
+ async for podcast_data in self._get_lib_items(lib):
+ podcast_list = ABSLibrariesItemsResponse.from_json(podcast_data).results
+ if not podcast_list: # [] if page exceeds
+ return
+
+ async def _get_id(plist: list[ABSLibraryItem] = podcast_list) -> AsyncGenerator[str]:
+ for entry in plist:
+ yield entry.id_
+
+ async for id_ in _get_id():
+ podcast = await self.get_podcast(id_)
+ yield podcast
+
+ async def get_podcast(self, id_: str) -> ABSPodcast:
+ """Get a single Podcast by ID."""
+ # this endpoint gives more podcast extra data
+ data = await self._get(f"items/{id_}?expanded=1")
+ return ABSPodcast.from_json(data)
+
+ async def _get_progress_ms(
+ self,
+ endpoint: str,
+ ) -> tuple[int | None, bool]:
+ data = await self._get(endpoint=endpoint)
+ if not data:
+ # entry doesn't exist, so it wasn't played yet
+ return 0, False
+ abs_media_progress = ABSMediaProgress.from_json(data)
+
+ return (
+ int(abs_media_progress.current_time * 1000),
+ abs_media_progress.is_finished,
+ )
+
+ async def get_podcast_progress_ms(
+ self, podcast_id: str, episode_id: str
+ ) -> tuple[int | None, bool]:
+ """Get podcast progress."""
+ endpoint = f"me/progress/{podcast_id}/{episode_id}"
+ return await self._get_progress_ms(endpoint)
+
+ async def get_audiobook_progress_ms(self, audiobook_id: str) -> tuple[int | None, bool]:
+ """Get audiobook progress."""
+ endpoint = f"me/progress/{audiobook_id}"
+ return await self._get_progress_ms(endpoint)
+
+ async def _update_progress(
+ self,
+ endpoint: str,
+ progress_seconds: int,
+ duration_seconds: int,
+ is_finished: bool,
+ ) -> None:
+ """Update progress of media item.
+
+ 0 <= progress_percent <= 1
+
+ Notes:
+ - progress in abs is percentage
+ - multiple parameters in one call don't work in all combinations
+ - currentTime is current position in s
+ - currentTime works only if duration is sent as well, but then don't
+ send progress at the same time.
+ """
+ await self._patch(
+ endpoint,
+ data={"isFinished": is_finished},
+ )
+ if is_finished:
+ return
+ await self._patch(
+ endpoint,
+ data={"progress": progress_seconds / duration_seconds},
+ )
+ await self._patch(
+ endpoint,
+ data={"duration": duration_seconds, "currentTime": progress_seconds},
+ )
+
+ async def update_podcast_progress(
+ self,
+ podcast_id: str,
+ episode_id: str,
+ progress_s: int,
+ duration_s: int,
+ is_finished: bool = False,
+ ) -> None:
+ """Update podcast episode progress."""
+ endpoint = f"me/progress/{podcast_id}/{episode_id}"
+
+ await self._update_progress(endpoint, progress_s, duration_s, is_finished)
+
+ async def update_audiobook_progress(
+ self,
+ audiobook_id: str,
+ progress_s: int,
+ duration_s: int,
+ is_finished: bool = False,
+ ) -> None:
+ """Update audiobook progress."""
+ endpoint = f"me/progress/{audiobook_id}"
+ await self._update_progress(endpoint, progress_s, duration_s, is_finished)
+
+ async def get_all_audiobooks(self) -> AsyncGenerator[ABSAudioBook]:
+ """Get all audiobooks."""
+ for library in self.audiobook_libraries:
+ async for book in self.get_all_audiobooks_by_library(library):
+ yield book
+
+ async def get_all_audiobooks_by_library(self, lib: ABSLibrary) -> AsyncGenerator[ABSAudioBook]:
+ """Get all Audiobooks in a library."""
+ async for audiobook_data in self._get_lib_items(lib):
+ audiobook_list = ABSLibrariesItemsResponse.from_json(audiobook_data).results
+ if not audiobook_list: # [] if page exceeds
+ return
+
+ async def _get_id(alist: list[ABSLibraryItem] = audiobook_list) -> AsyncGenerator[str]:
+ for entry in alist:
+ yield entry.id_
+
+ async for id_ in _get_id():
+ audiobook = await self.get_audiobook(id_)
+ yield audiobook
+
+ async def get_audiobook(self, id_: str) -> ABSAudioBook:
+ """Get a single Audiobook by ID."""
+ # this endpoint gives more audiobook extra data
+ audiobook = await self._get(f"items/{id_}?expanded=1")
+ return ABSAudioBook.from_json(audiobook)
--- /dev/null
+"""Schema definition of Audiobookshelf.
+
+https://api.audiobookshelf.org/
+"""
+
+from dataclasses import dataclass
+from typing import Annotated
+
+from mashumaro.config import BaseConfig
+from mashumaro.mixins.json import DataClassJSONMixin
+from mashumaro.types import Alias
+
+
+class BaseModel(DataClassJSONMixin):
+ """BaseModel for Schema part where we don't need all keys."""
+
+ class Config(BaseConfig):
+ """Not all keys required."""
+
+ forbid_extra_keys = False
+
+
+@dataclass
+class ABSAudioTrack(BaseModel):
+ """ABS audioTrack.
+
+ https://api.audiobookshelf.org/#audio-track
+ """
+
+ index: int
+ start_offset: Annotated[float, Alias("startOffset")] = 0.0
+ duration: float = 0.0
+ title: str = ""
+ content_url: Annotated[str, Alias("contentUrl")] = ""
+ mime_type: str = ""
+ # metadata: # not needed for mass application
+
+
+@dataclass
+class ABSPodcastEpisodeExpanded(BaseModel):
+ """ABSPodcastEpisode.
+
+ https://api.audiobookshelf.org/#podcast-episode
+ """
+
+ library_item_id: Annotated[str, Alias("libraryItemId")]
+ id_: Annotated[str, Alias("id")]
+ index: int | None
+ # audio_file: # not needed for mass application
+ published_at: Annotated[int | None, Alias("publishedAt")] # ms posix epoch
+ added_at: Annotated[int | None, Alias("addedAt")] # ms posix epoch
+ updated_at: Annotated[int | None, Alias("updatedAt")] # ms posix epoch
+ audio_track: Annotated[ABSAudioTrack, Alias("audioTrack")]
+ size: int # in bytes
+ season: str = ""
+ episode: str = ""
+ episode_type: Annotated[str, Alias("episodeType")] = ""
+ title: str = ""
+ subtitle: str = ""
+ description: str = ""
+ enclosure: str = ""
+ pub_date: Annotated[str, Alias("pubDate")] = ""
+ guid: str = ""
+ # chapters
+ duration: float = 0.0
+
+
+@dataclass
+class ABSPodcastMetaData(BaseModel):
+ """PodcastMetaData https://api.audiobookshelf.org/?shell#podcasts."""
+
+ title: str | None
+ author: str | None
+ description: str | None
+ release_date: Annotated[str | None, Alias("releaseDate")]
+ genres: list[str] | None
+ feed_url: Annotated[str | None, Alias("feedUrl")]
+ image_url: Annotated[str | None, Alias("imageUrl")]
+ itunes_page_url: Annotated[str | None, Alias("itunesPageUrl")]
+ itunes_id: Annotated[int | None, Alias("itunesId")]
+ itunes_artist_id: Annotated[int | None, Alias("itunesArtistId")]
+ explicit: bool
+ language: str | None
+ type_: Annotated[str | None, Alias("type")]
+
+
+@dataclass
+class ABSPodcastMedia(BaseModel):
+ """ABSPodcastMedia."""
+
+ metadata: ABSPodcastMetaData
+ cover_path: Annotated[str, Alias("coverPath")]
+ episodes: list[ABSPodcastEpisodeExpanded]
+ num_episodes: Annotated[int, Alias("numEpisodes")] = 0
+
+
+@dataclass
+class ABSPodcast(BaseModel):
+ """ABSPodcast.
+
+ Depending on endpoint we get different results. This class does not
+ fully reflect https://api.audiobookshelf.org/#podcast.
+ """
+
+ id_: Annotated[str, Alias("id")]
+ media: ABSPodcastMedia
+
+
+@dataclass
+class ABSAuthorMinified(BaseModel):
+ """ABSAuthor.
+
+ https://api.audiobookshelf.org/#author
+ """
+
+ id_: Annotated[str, Alias("id")]
+ name: str
+
+
+@dataclass
+class ABSSeriesSequence(BaseModel):
+ """Series Sequence.
+
+ https://api.audiobookshelf.org/#series
+ """
+
+ id_: Annotated[str, Alias("id")]
+ name: str
+ sequence: str | None
+
+
+@dataclass
+class ABSAudioBookMetaData(BaseModel):
+ """ABSAudioBookMetaData.
+
+ https://api.audiobookshelf.org/#book-metadata
+ """
+
+ title: str
+ subtitle: str
+ authors: list[ABSAuthorMinified]
+ narrators: list[str]
+ series: list[ABSSeriesSequence]
+ genres: list[str] | None
+ published_year: Annotated[str | None, Alias("publishedYear")]
+ published_date: Annotated[str | None, Alias("publishedDate")]
+ publisher: str | None
+ description: str | None
+ isbn: str | None
+ asin: str | None
+ language: str | None
+ explicit: bool
+
+
+@dataclass
+class ABSAudioBookChapter(BaseModel):
+ """
+ ABSAudioBookChapter.
+
+ https://api.audiobookshelf.org/#book-chapter
+ """
+
+ id_: Annotated[int, Alias("id")]
+ start: float
+ end: float
+ title: str
+
+
+@dataclass
+class ABSAudioBookMedia(BaseModel):
+ """ABSAudioBookMedia.
+
+ Helper class due to API endpoint used.
+ """
+
+ metadata: ABSAudioBookMetaData
+ cover_path: Annotated[str, Alias("coverPath")]
+ chapters: list[ABSAudioBookChapter]
+ duration: float
+ tracks: list[ABSAudioTrack]
+
+
+@dataclass
+class ABSAudioBook(BaseModel):
+ """ABSAudioBook.
+
+ Depending on endpoint we get different results. This class does not
+ full reflect https://api.audiobookshelf.org/#book.
+ """
+
+ id_: Annotated[str, Alias("id")]
+ media: ABSAudioBookMedia
+
+
+@dataclass
+class ABSMediaProgress(BaseModel):
+ """ABSMediaProgress.
+
+ https://api.audiobookshelf.org/#media-progress
+ """
+
+ id_: Annotated[str, Alias("id")]
+ library_item_id: Annotated[str, Alias("libraryItemId")]
+ episode_id: Annotated[str, Alias("episodeId")]
+ duration: float # seconds
+ progress: float # percent 0->1
+ current_time: Annotated[float, Alias("currentTime")] # seconds
+ is_finished: Annotated[bool, Alias("isFinished")]
+ hide_from_continue_listening: Annotated[bool, Alias("hideFromContinueListening")]
+ last_update: Annotated[int, Alias("lastUpdate")] # ms epoch
+ started_at: Annotated[int, Alias("startedAt")] # ms epoch
+ finished_at: Annotated[int | None, Alias("finishedAt")] # ms epoch
+
+
+@dataclass
+class ABSAudioBookmark(BaseModel):
+ """ABSAudioBookmark."""
+
+ library_item_id: Annotated[str, Alias("libraryItemId")]
+ title: str
+ time: float # seconds
+ created_at: Annotated[int, Alias("createdAt")] # unix epoch ms
+
+
+@dataclass
+class ABSUserPermissions(BaseModel):
+ """ABSUserPermissions."""
+
+ download: bool
+ update: bool
+ delete: bool
+ upload: bool
+ access_all_libraries: Annotated[bool, Alias("accessAllLibraries")]
+ access_all_tags: Annotated[bool, Alias("accessAllTags")]
+ access_explicit_content: Annotated[bool, Alias("accessExplicitContent")]
+
+
+@dataclass
+class ABSUser(BaseModel):
+ """ABSUser.
+
+ only attributes we need for mass
+ https://api.audiobookshelf.org/#user
+ """
+
+ id_: Annotated[str, Alias("id")]
+ username: str
+ type_: Annotated[str, Alias("type")]
+ token: str
+ media_progress: Annotated[list[ABSMediaProgress], Alias("mediaProgress")]
+ series_hide_from_continue_listening: Annotated[
+ list[str], Alias("seriesHideFromContinueListening")
+ ]
+ bookmarks: list[ABSAudioBookmark]
+ is_active: Annotated[bool, Alias("isActive")]
+ is_locked: Annotated[bool, Alias("isLocked")]
+ last_seen: Annotated[int | None, Alias("lastSeen")]
+ created_at: Annotated[int, Alias("createdAt")]
+ permissions: ABSUserPermissions
+ libraries_accessible: Annotated[list[str], Alias("librariesAccessible")]
+
+ # this seems to be missing
+ # item_tags_accessible: Annotated[list[str], Alias("itemTagsAccessible")]
+
+
+@dataclass
+class ABSLoginResponse(BaseModel):
+ """ABSLoginResponse."""
+
+ user: ABSUser
+
+ # this seems to be missing
+ # user_default_library_id: Annotated[str, Alias("defaultLibraryId")]
+
+
+@dataclass
+class ABSLibrary(BaseModel):
+ """ABSLibrary.
+
+ Only attributes we need
+ """
+
+ id_: Annotated[str, Alias("id")]
+ name: str
+ # folders
+ # displayOrder: Integer
+ # icon: String
+ media_type: Annotated[str, Alias("mediaType")]
+ provider: str
+ # settings
+ created_at: Annotated[int, Alias("createdAt")]
+ last_update: Annotated[int, Alias("lastUpdate")]
+
+
+@dataclass
+class ABSLibrariesResponse(BaseModel):
+ """ABSLibrariesResponse."""
+
+ libraries: list[ABSLibrary]
+
+
+@dataclass
+class ABSLibraryItem(BaseModel):
+ """ABSLibraryItem."""
+
+ id_: Annotated[str, Alias("id")]
+
+
+@dataclass
+class ABSLibrariesItemsResponse(BaseModel):
+ """ABSLibrariesItemsResponse.
+
+ https://api.audiobookshelf.org/#get-a-library-39-s-items
+ """
+
+ results: list[ABSLibraryItem]