from __future__ import annotations
-import logging
from collections.abc import AsyncGenerator, Sequence
from typing import TYPE_CHECKING
from music_assistant.providers.audiobookshelf.abs_client import ABSClient
from music_assistant.providers.audiobookshelf.abs_schema import (
ABSAudioBook,
+ ABSDeviceInfo,
ABSLibrary,
+ ABSPlaybackSessionExpanded,
ABSPodcast,
ABSPodcastEpisodeExpanded,
)
"""Pass config values to client and initialize."""
self._client = ABSClient()
base_url = str(self.config.get_value(CONF_URL))
+ username = str(self.config.get_value(CONF_USERNAME))
try:
await self._client.init(
session=self.mass.http_session,
base_url=base_url,
- username=str(self.config.get_value(CONF_USERNAME)),
+ username=username,
password=str(self.config.get_value(CONF_PASSWORD)),
+ logger=self.logger,
check_ssl=bool(self.config.get_value(CONF_VERIFY_SSL)),
)
except RuntimeError:
raise LoginFailed(f"Login to abs instance at {base_url} failed.")
await self._client.sync()
+ # this will be provided when creating sessions or receive already opened sessions
+ self.device_info = ABSDeviceInfo(
+ device_id=self.instance_id,
+ client_name="Music Assistant",
+ client_version=self.mass.version,
+ manufacturer="",
+ model=self.mass.server_id,
+ )
+
+ self.logger.debug(f"Our playback session device_id is {self.instance_id}")
+
async def unload(self, is_removed: bool = False) -> None:
"""
Handle unload/close of the provider.
Called when provider is deregistered (e.g. MA exiting or config reloading).
is_removed will be set to True when the provider is removed from the configuration.
"""
+ await self._client.close_all_playback_sessions()
await self._client.logout()
@property
abs_audiobook = await self._client.get_audiobook(prov_audiobook_id)
return await self._parse_audiobook(abs_audiobook)
+ async def get_streamdetails_from_playback_session(
+ self, session: ABSPlaybackSessionExpanded
+ ) -> StreamDetails:
+ """Give Streamdetails from given session."""
+ tracks = session.audio_tracks
+ if len(tracks) == 0:
+ raise RuntimeError("Playback session has no tracks to play")
+ track = tracks[0]
+ track_url = track.content_url
+ if track_url.split("/")[1] != "hls":
+ raise RuntimeError("Did expect HLS stream for session playback")
+ item_id = ""
+ if session.media_type == "podcast":
+ media_type = MediaType.PODCAST_EPISODE
+ podcast_id = session.library_item_id
+ session_id = session.id_
+ episode_id = session.episode_id
+ item_id = f"{podcast_id} {episode_id} {session_id}"
+ else:
+ media_type = MediaType.AUDIOBOOK
+ audiobook_id = session.library_item_id
+ session_id = session.id_
+ item_id = f"{audiobook_id} {session_id}"
+ token = self._client.token
+ base_url = str(self.config.get_value(CONF_URL))
+ media_url = track.content_url
+ stream_url = f"{base_url}{media_url}?token={token}"
+ return StreamDetails(
+ provider=self.instance_id,
+ item_id=item_id,
+ audio_format=AudioFormat(
+ content_type=ContentType.UNKNOWN,
+ ),
+ media_type=media_type,
+ stream_type=StreamType.HLS,
+ path=stream_url,
+ )
+
async def get_stream_details(
self, item_id: str, media_type: MediaType = MediaType.TRACK
) -> StreamDetails:
"""Get stream of item."""
+ # self.logger.debug(f"Streamdetails: {item_id}")
if media_type == MediaType.PODCAST_EPISODE:
return await self._get_stream_details_podcast_episode(item_id)
elif media_type == MediaType.AUDIOBOOK:
- return await self._get_stream_details_audiobook(item_id)
+ abs_audiobook = await self._client.get_audiobook(item_id)
+ tracks = abs_audiobook.media.tracks
+ if len(tracks) == 0:
+ raise MediaNotFoundError("Stream not found")
+ if len(tracks) > 1:
+ session = await self._client.get_playback_session_audiobook(
+ device_info=self.device_info, audiobook_id=item_id
+ )
+ return await self.get_streamdetails_from_playback_session(session)
+ return await self._get_stream_details_audiobook(abs_audiobook)
raise MediaNotFoundError("Stream unknown")
- async def _get_stream_details_audiobook(self, audiobook_id: str) -> StreamDetails:
+ async def _get_stream_details_audiobook(self, abs_audiobook: ABSAudioBook) -> StreamDetails:
"""Only single audio file in audiobook."""
- abs_audiobook = await self._client.get_audiobook(audiobook_id)
+ self.logger.debug(
+ f"Using direct playback for audiobook {abs_audiobook.media.metadata.title}"
+ )
tracks = abs_audiobook.media.tracks
- if len(tracks) == 0:
- raise MediaNotFoundError("Stream not found")
- if len(tracks) > 1:
- logging.warning("Music Assistant only supports single file base audiobooks")
token = self._client.token
base_url = str(self.config.get_value(CONF_URL))
media_url = tracks[0].content_url
# to lift unknown at some point.
return StreamDetails(
provider=self.lookup_key,
- item_id=audiobook_id,
+ item_id=abs_audiobook.id_,
audio_format=AudioFormat(
content_type=ContentType.UNKNOWN,
),
break
if abs_episode is None:
raise MediaNotFoundError("Stream not found")
+ self.logger.debug(f"Using direct playback for podcast episode {abs_episode.title}")
token = self._client.token
base_url = str(self.config.get_value(CONF_URL))
media_url = abs_episode.audio_track.content_url
async def on_played(
self, media_type: MediaType, item_id: str, fully_played: bool, position: int
) -> None:
- """Update progress in Audiobookshelf."""
+ """Update progress in Audiobookshelf.
+
+ In our case media_type may have 3 values:
+ - PODCAST
+ - PODCAST_EPISODE
+ - AUDIOBOOK
+ We ignore PODCAST (function is called on adding a podcast with position=None)
+
+ """
+ # self.logger.debug(f"on_played: {media_type=} {item_id=}, {fully_played=} {position=}")
if media_type == MediaType.PODCAST_EPISODE:
abs_podcast_id, abs_episode_id = item_id.split(" ")
mass_podcast_episode = await self.get_podcast_episode(item_id)
duration = mass_podcast_episode.duration
+ self.logger.debug(f"Updating of {media_type.value} named {mass_podcast_episode.name}")
await self._client.update_podcast_progress(
podcast_id=abs_podcast_id,
episode_id=abs_episode_id,
if media_type == MediaType.AUDIOBOOK:
mass_audiobook = await self.get_audiobook(item_id)
duration = mass_audiobook.duration
+ self.logger.debug(f"Updating {media_type.value} named {mass_audiobook.name} progress")
await self._client.update_audiobook_progress(
audiobook_id=item_id,
progress_s=position,
We only implement the functions necessary for mass.
"""
+import logging
from collections.abc import AsyncGenerator
from enum import Enum
from typing import Any
from aiohttp import ClientSession
+from music_assistant_models.media_items import UniqueList
from music_assistant.providers.audiobookshelf.abs_schema import (
ABSAudioBook,
+ ABSDeviceInfo,
ABSLibrariesItemsResponse,
ABSLibrariesResponse,
ABSLibrary,
ABSLibraryItem,
ABSLoginResponse,
ABSMediaProgress,
+ ABSPlaybackSession,
+ ABSPlaybackSessionExpanded,
+ ABSPlayRequest,
ABSPodcast,
+ ABSSessionsResponse,
+ ABSSessionUpdate,
ABSUser,
)
self.audiobook_libraries: list[ABSLibrary] = []
self.user: ABSUser
self.check_ssl: bool
+ # I would like to receive opened sessions via the API, however, it appears
+ # that this only possible for closed sessions. That's probably because
+ # abs expects only a single session per device
+ self.open_playback_session_ids: UniqueList[str] = UniqueList([])
async def init(
self,
base_url: str,
username: str,
password: str,
+ logger: logging.Logger | None = None,
check_ssl: bool = True,
) -> None:
"""Initialize."""
self.session = session
self.base_url = base_url
self.check_ssl = check_ssl
+
+ if logger is None:
+ self.logger = logging.getLogger(name="ABSClient")
+ self.logger.setLevel(logging.DEBUG)
+ else:
+ self.logger = logger
+
self.session_headers = {}
self.user = await self.login(username=username, password=password)
self.token: str = self.user.token
)
status = response.status
if status != ABSStatus.STATUS_OK.value:
- raise RuntimeError(f"API post call to {endpoint=} failed.")
+ raise RuntimeError(f"API post call to {endpoint=} failed with {status=}.")
return await response.read()
async def _get(self, endpoint: str, params: dict[str, str | int] | None = None) -> bytes:
data={"isFinished": is_finished},
)
if is_finished:
+ self.logger.debug(f"Marked played {endpoint}")
return
+ percentage = progress_seconds / duration_seconds
await self._patch(
endpoint,
- data={"progress": progress_seconds / duration_seconds},
+ data={"progress": percentage},
)
await self._patch(
endpoint,
data={"duration": duration_seconds, "currentTime": progress_seconds},
)
+ self.logger.debug(f"Updated to {percentage * 100:.0f}%")
async def update_podcast_progress(
self,
# this endpoint gives more audiobook extra data
audiobook = await self._get(f"items/{id_}?expanded=1")
return ABSAudioBook.from_json(audiobook)
+
+ async def get_playback_session_podcast(
+ self, device_info: ABSDeviceInfo, podcast_id: str, episode_id: str
+ ) -> ABSPlaybackSessionExpanded:
+ """Get Podcast playback session.
+
+ Returns an open session if it is already available.
+ """
+ endpoint = f"items/{podcast_id}/play/{episode_id}"
+ # by adding in the media item id, we can have several
+ # open sessions (i.e. we are able to stream more than a single
+ # audiobook/ podcast from abs at the same time)
+ # also fixes preload in playlist
+ device_info.device_id += f"/{podcast_id}/{episode_id}"
+ return await self._get_playback_session(endpoint, device_info=device_info)
+
+ async def get_playback_session_audiobook(
+ self, device_info: ABSDeviceInfo, audiobook_id: str
+ ) -> ABSPlaybackSessionExpanded:
+ """Get Audiobook playback session.
+
+ Returns an open session if it is already available.
+ """
+ endpoint = f"items/{audiobook_id}/play"
+ # see podcast comment above
+ device_info.device_id += f"/{audiobook_id}"
+ return await self._get_playback_session(endpoint, device_info=device_info)
+
+ async def get_open_playback_session(self, session_id: str) -> ABSPlaybackSessionExpanded | None:
+ """Return open playback session."""
+ data = await self._get(f"session/{session_id}")
+ if data:
+ return ABSPlaybackSessionExpanded.from_json(data)
+ else:
+ return None
+
+ async def _get_playback_session(
+ self, endpoint: str, device_info: ABSDeviceInfo
+ ) -> ABSPlaybackSessionExpanded:
+ """Get an ABS Playback Session.
+
+ You can only have a single session per device.
+ """
+ play_request = ABSPlayRequest(
+ device_info=device_info,
+ force_direct_play=False,
+ force_transcode=False,
+ # specifying no supported mime types makes abs send the file
+ # via hls but without transcoding to another format
+ supported_mime_types=[],
+ )
+ data = await self._post(endpoint, data=play_request.to_dict())
+ session = ABSPlaybackSessionExpanded.from_json(data)
+ self.logger.debug(
+ f"Got playback session {session.id_} "
+ f"for {session.media_type} named {session.display_title}"
+ )
+ self.open_playback_session_ids.append(session.id_)
+ return session
+
+ async def close_playback_session(self, playback_session_id: str) -> None:
+ """Close an open playback session."""
+ # optional data would be ABSSessionUpdate
+ self.logger.debug(f"Closing playback session {playback_session_id=}")
+ await self._post(f"session/{playback_session_id}/close")
+
+ async def sync_playback_session(
+ self, playback_session_id: str, update: ABSSessionUpdate
+ ) -> None:
+ """Sync an open playback session."""
+ await self._post(f"session/{playback_session_id}/sync", data=update.to_dict())
+
+ async def get_all_closed_playback_sessions(self) -> AsyncGenerator[ABSPlaybackSession]:
+ """Get library items with pagination.
+
+ This returns only sessions, which are already closed.
+ """
+ page_cnt = 0
+ while True:
+ data = await self._get(
+ "me/listening-sessions",
+ params={"itemsPerPage": LIMIT_ITEMS_PER_PAGE, "page": page_cnt},
+ )
+ page_cnt += 1
+
+ sessions = ABSSessionsResponse.from_json(data).sessions
+ self.logger.debug([session.device_info for session in sessions])
+ if sessions:
+ for session in sessions:
+ yield session
+ else:
+ return
+
+ async def close_all_playback_sessions(self) -> None:
+ """Cleanup all playback sessions opened by us."""
+ if self.open_playback_session_ids:
+ self.logger.debug("Closing our playback sessions.")
+ for session_id in self.open_playback_session_ids:
+ try:
+ await self.close_playback_session(session_id)
+ except RuntimeError:
+ self.logger.debug(f"Was unable to close session {session_id}")
https://api.audiobookshelf.org/
"""
-from dataclasses import dataclass
+from dataclasses import dataclass, field
+from enum import Enum
from typing import Annotated
from mashumaro.config import BaseConfig
class BaseModel(DataClassJSONMixin):
- """BaseModel for Schema part where we don't need all keys."""
+ """BaseModel for Schema.
+
+ forbid_extra_keys: response of API may have more keys than used by us
+ serialize_by_alias: when using to_json(), we get the Alias keys
+ """
class Config(BaseConfig):
- """Not all keys required."""
+ """Config."""
forbid_extra_keys = False
+ serialize_by_alias = True
@dataclass
"""
results: list[ABSLibraryItem]
+
+
+# Schema to enable sessions:
+@dataclass
+class ABSDeviceInfo(BaseModel):
+ """ABSDeviceInfo.
+
+ https://api.audiobookshelf.org/#device-info-parameters
+ https://api.audiobookshelf.org/#device-info
+ https://github.com/advplyr/audiobookshelf/blob/master/server/objects/DeviceInfo.js#L3
+ """
+
+ device_id: Annotated[str, Alias("deviceId")] = ""
+ client_name: Annotated[str, Alias("clientName")] = ""
+ client_version: Annotated[str, Alias("clientVersion")] = ""
+ manufacturer: str = ""
+ model: str = ""
+ # sdkVersion # meant for an Android client
+
+
+@dataclass
+class ABSPlayRequest(BaseModel):
+ """ABSPlayRequest.
+
+ https://api.audiobookshelf.org/#play-a-library-item-or-podcast-episode
+ """
+
+ device_info: Annotated[ABSDeviceInfo, Alias("deviceInfo")]
+ force_direct_play: Annotated[bool, Alias("forceDirectPlay")] = False
+ force_transcode: Annotated[bool, Alias("forceTranscode")] = False
+ supported_mime_types: Annotated[list[str], Alias("supportedMimeTypes")] = field(
+ default_factory=list
+ )
+ media_player: Annotated[str, Alias("mediaPlayer")] = "unknown"
+
+
+class ABSPlayMethod(Enum):
+ """Playback method in playback session."""
+
+ DIRECT_PLAY = 0
+ DIRECT_STREAM = 1
+ TRANSCODE = 2
+ LOCAL = 3
+
+
+@dataclass
+class ABSPlaybackSession(BaseModel):
+ """ABSPlaybackSessionExpanded.
+
+ https://api.audiobookshelf.org/#play-method
+ """
+
+ id_: Annotated[str, Alias("id")]
+ user_id: Annotated[str, Alias("userId")]
+ library_id: Annotated[str, Alias("libraryId")]
+ library_item_id: Annotated[str, Alias("libraryItemId")]
+ episode_id: Annotated[str | None, Alias("episodeId")]
+ media_type: Annotated[str, Alias("mediaType")]
+ # media_metadata: Annotated[ABSPodcastMetaData | ABSAudioBookMetaData, Alias("mediaMetadata")]
+ # chapters: list[ABSAudioBookChapter]
+ display_title: Annotated[str, Alias("displayTitle")]
+ display_author: Annotated[str, Alias("displayAuthor")]
+ cover_path: Annotated[str, Alias("coverPath")]
+ duration: float
+ # 0: direct play, 1: direct stream, 2: transcode, 3: local
+ play_method: Annotated[ABSPlayMethod, Alias("playMethod")]
+ media_player: Annotated[str, Alias("mediaPlayer")]
+ device_info: Annotated[ABSDeviceInfo, Alias("deviceInfo")]
+ server_version: Annotated[str, Alias("serverVersion")]
+ # YYYY-MM-DD
+ date: str
+ day_of_week: Annotated[str, Alias("dayOfWeek")]
+ time_listening: Annotated[float, Alias("timeListening")] # s
+ start_time: Annotated[float, Alias("startTime")] # s
+ current_time: Annotated[float, Alias("currentTime")] # s
+ started_at: Annotated[int, Alias("startedAt")] # ms since Unix Epoch
+ updated_at: Annotated[int, Alias("updatedAt")] # ms since Unix Epoch
+
+
+@dataclass
+class ABSPlaybackSessionExpanded(ABSPlaybackSession):
+ """ABSPlaybackSessionExpanded.
+
+ https://api.audiobookshelf.org/#play-method
+ """
+
+ audio_tracks: Annotated[list[ABSAudioTrack], Alias("audioTracks")]
+
+ # videoTrack:
+ # libraryItem:
+
+
+@dataclass
+class ABSSessionUpdate(BaseModel):
+ """
+ ABSSessionUpdate.
+
+ Can be used as optional data to sync or closing request.
+ unit is seconds
+ """
+
+ current_time: Annotated[float, Alias("currentTime")]
+ time_listened: Annotated[float, Alias("timeListened")]
+ duration: float
+
+
+@dataclass
+class ABSSessionsResponse(BaseModel):
+ """Response to GET http://abs.example.com/api/me/listening-sessions."""
+
+ total: int
+ num_pages: Annotated[int, Alias("numPages")]
+ items_per_page: Annotated[int, Alias("itemsPerPage")]
+ sessions: list[ABSPlaybackSession]