from aioaudiobookshelf.client.items import (
LibraryItemExpandedPodcast as AbsLibraryItemExpandedPodcast,
)
+from aioaudiobookshelf.client.items import PlaybackSessionExpanded as AbsPlaybackSessionExpanded
+from aioaudiobookshelf.client.items import PlaybackSessionParameters as AbsPlaybackSessionParameters
+from aioaudiobookshelf.client.session import SyncOpenSessionParameters
from aioaudiobookshelf.exceptions import LoginError as AbsLoginError
from aioaudiobookshelf.exceptions import RefreshTokenExpiredError
+from aioaudiobookshelf.exceptions import SessionNotFoundError as AbsSessionNotFoundError
from aioaudiobookshelf.schema.author import AuthorExpanded
from aioaudiobookshelf.schema.calls_authors import (
AuthorWithItemsAndSeries as AbsAuthorWithItemsAndSeries,
LibraryItemMinifiedPodcast,
)
from aioaudiobookshelf.schema.library import LibraryMediaType as AbsLibraryMediaType
+from aioaudiobookshelf.schema.session import DeviceInfo as AbsDeviceInfo
from aioaudiobookshelf.schema.shelf import (
SeriesShelf,
ShelfAuthors,
from aioaudiobookshelf.schema.shelf import ShelfId as AbsShelfId
from aioaudiobookshelf.schema.shelf import ShelfType as AbsShelfType
from aiohttp import web
-from music_assistant_models.config_entries import ConfigEntry, ConfigValueType, ProviderConfig
+from music_assistant_models.config_entries import (
+ ConfigEntry,
+ ConfigValueType,
+ ProviderConfig,
+)
from music_assistant_models.enums import (
ConfigEntryType,
ContentType,
CACHE_KEY_LIBRARIES,
CONF_API_TOKEN,
CONF_HIDE_EMPTY_PODCASTS,
+ CONF_HLS_FORMATS,
CONF_OLD_TOKEN,
CONF_PASSWORD,
CONF_URL,
+ CONF_USE_HLS,
CONF_USERNAME,
CONF_VERIFY_SSL,
+ HLS_ALL_FORMATS,
+ HLS_FORMATS_SPLIT,
AbsBrowseItemsBookTranslationKey,
AbsBrowseItemsPodcastTranslationKey,
AbsBrowsePaths,
)
-from .helpers import LibrariesHelper, LibraryHelper, ProgressGuard
+from .helpers import LibrariesHelper, LibraryHelper, ProgressGuard, SessionHelper
if TYPE_CHECKING:
from aioaudiobookshelf.schema.events_socket import LibraryItemRemoved
from aioaudiobookshelf.schema.media_progress import MediaProgress
+ from aioaudiobookshelf.schema.streams import Stream as AbsStream
from aioaudiobookshelf.schema.user import User
from music_assistant_models.media_items import Podcast
from music_assistant_models.provider import ProviderManifest
required=False,
hidden=True,
),
+ ConfigEntry(
+ key=CONF_USE_HLS,
+ type=ConfigEntryType.BOOLEAN,
+ label="Stream via HLS from ABS.",
+ description="Use an HLS stream when streaming from audiobookshelf.",
+ required=False,
+ default_value=False,
+ advanced=True,
+ ),
+ ConfigEntry(
+ key=CONF_HLS_FORMATS,
+ type=ConfigEntryType.STRING,
+ label=f"Use HLS for these file extensions. Separate with ';'. Use {HLS_ALL_FORMATS} for"
+ " all formats.",
+ description="Use HLS only for these file extensions."
+ f" Separate with ;. E.g. m4b or m4b;aac or {HLS_ALL_FORMATS}",
+ required=False,
+ default_value="m4b",
+ advanced=True,
+ ),
ConfigEntry(
key=CONF_VERIFY_SSL,
type=ConfigEntryType.BOOLEAN,
async def handle_async_init(self) -> None:
"""Pass config values to client and initialize."""
self._on_unload_callbacks: list[Callable[[], None]] = []
+ self.sessions: dict[str, SessionHelper] = {} # key is the mass_item_id
+ self.create_session_lock = asyncio.Lock()
base_url = str(self.config.get_value(CONF_URL))
username = str(self.config.get_value(CONF_USERNAME))
password = str(self.config.get_value(CONF_PASSWORD))
on_refresh_token_expired=self._socket_abs_refresh_token_expired
)
+ self._client_socket.set_stream_callbacks(on_stream_open=self._socket_stream_open)
+
# progress guard
self.progress_guard = ProgressGuard()
# register dynamic stream route for audiobook parts
self._on_unload_callbacks.append(
self.mass.streams.register_dynamic_route(
- f"/{self.instance_id}_part_stream", self._handle_audiobook_part_request
- )
- )
- self._on_unload_callbacks.append(
- self.mass.streams.register_dynamic_route(
- f"/{self.instance_id}_episode_stream", self._handle_episode_request
+ f"/{self.instance_id}_part_stream", self._handle_session_part_request
)
)
async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails:
"""Get stream of item."""
- if media_type == MediaType.PODCAST_EPISODE:
- return await self._get_stream_details_episode(item_id)
- if media_type == MediaType.AUDIOBOOK:
- abs_audiobook = await self._get_abs_expanded_audiobook(prov_audiobook_id=item_id)
- return await self._get_stream_details_audiobook(abs_audiobook)
+ # We always create a playback session. The default is direct playback.
+ # In that case, session.tracks holds the exact same as the audiobook/ podcast.track,
+ # so we only use the session to update our progress.
+ #
+ # In the case of hls the session has an hls stream as track.
+ if media_type in (MediaType.PODCAST_EPISODE, MediaType.AUDIOBOOK):
+ session = await self._get_playback_session(mass_item_id=item_id)
+ return await self._get_stream_details_session(
+ session, session_helper=self.sessions[item_id], media_type=media_type
+ )
raise MediaNotFoundError("Stream unknown")
- async def _get_stream_details_audiobook(
- self, abs_audiobook: AbsLibraryItemExpandedBook
+ async def _get_stream_details_session(
+ self,
+ abs_session: AbsPlaybackSessionExpanded,
+ session_helper: SessionHelper,
+ media_type: MediaType,
) -> StreamDetails:
"""Streamdetails audiobook.
We always use a custom stream type, also for single file, such
that we can handle an ffmpeg error and refresh our tokens.
"""
- tracks = abs_audiobook.media.tracks
+ abs_base_url = str(self.config.get_value(CONF_URL))
+ tracks = abs_session.audio_tracks
+
if len(tracks) == 0:
- raise MediaNotFoundError("Stream not found")
+ raise MediaNotFoundError("Session has no tracks.")
content_type = ContentType.UNKNOWN
- if abs_audiobook.media.tracks[0].metadata is not None:
- content_type = ContentType.try_parse(abs_audiobook.media.tracks[0].metadata.ext)
+ if abs_session.audio_tracks[0].metadata is not None:
+ content_type = ContentType.try_parse(abs_session.audio_tracks[0].metadata.ext)
file_parts: list[MultiPartPath] = []
- abs_base_url = str(self.config.get_value(CONF_URL))
if self.is_token_user:
self.logger.debug("Token User - Streams are direct.")
for idx, track in enumerate(tracks):
# to ensure token is always valid, we create a dynamic url
# this ensures that we always get a fresh token on each part
# without having to deal with a custom stream etc.
- # we also use this for the first part, otherwise we can't seek
+ # we also use this for a single track/ hls stream, otherwise we can't seek
stream_url = (
f"{self.mass.streams.base_url}/{self.instance_id}_part_stream?"
- f"audiobook_id={abs_audiobook.id_}&part_id={idx}"
+ f"session_id={abs_session.id_}&part_id={idx}"
)
file_parts.append(MultiPartPath(path=stream_url, duration=track.duration))
+ stream_type = StreamType.HLS if "hls" in file_parts[0].path else StreamType.HTTP
+ if stream_type == StreamType.HLS:
+ # wait for stream to be ready
+ try:
+ await asyncio.wait_for(session_helper.hls_stream_open.wait(), 10)
+ except TimeoutError:
+ self.logger.warning(
+ "Did not receive HLS stream open event after 10s, continuing anyways."
+ )
+
return StreamDetails(
provider=self.instance_id,
- item_id=abs_audiobook.id_,
+ item_id=abs_session.id_,
audio_format=AudioFormat(content_type=content_type),
- media_type=MediaType.AUDIOBOOK,
- stream_type=StreamType.HTTP,
- duration=int(abs_audiobook.media.duration),
- path=file_parts,
+ media_type=media_type,
+ stream_type=stream_type,
+ duration=int(abs_session.duration),
+ path=file_parts[0].path if len(file_parts) == 1 else file_parts,
can_seek=True,
allow_seek=True,
)
- async def _get_stream_details_episode(self, podcast_id: str) -> StreamDetails:
- """Streamdetails of a podcast episode.
-
- There are no multi-file podcasts in abs, but we use a custom
- stream to handle possible ffmpeg errors.
- """
- abs_podcast_id, abs_episode_id = podcast_id.split(" ")
- abs_episode = None
-
- abs_podcast = await self._get_abs_expanded_podcast(prov_podcast_id=abs_podcast_id)
- for abs_episode in abs_podcast.media.episodes:
- if abs_episode.id_ == abs_episode_id:
- break
- if abs_episode is None:
- raise MediaNotFoundError("Stream not found")
- content_type = ContentType.UNKNOWN
- if abs_episode.audio_track.metadata is not None:
- content_type = ContentType.try_parse(abs_episode.audio_track.metadata.ext)
+ async def _get_playback_session(self, mass_item_id: str) -> AbsPlaybackSessionExpanded:
+ """Either creates or returns an open abs session."""
+ async with self.create_session_lock:
+ # check for an available open session
+ if session_helper := self.sessions.get(mass_item_id):
+ with suppress(AbsSessionNotFoundError):
+ return await self._client.get_open_session(
+ session_id=session_helper.abs_session_id
+ )
- if self.is_token_user:
- self.logger.debug("Token User - Stream is direct.")
- # long lived API token, no need for detour
- abs_base_url = str(self.config.get_value(CONF_URL))
- stream_url = (
- f"{abs_base_url}{abs_episode.audio_track.content_url}?token={self._client.token}"
+ item_ids = mass_item_id.split(" ")
+ abs_item_id = item_ids[0]
+ episode_id = item_ids[1] if len(item_ids) == 2 else None
+
+ # Create a new session
+ ## Check HLS usage
+ use_hls = bool(self.config.get_value(CONF_USE_HLS))
+ hls_formats = str(self.config.get_value(CONF_HLS_FORMATS))
+ if use_hls and hls_formats != HLS_ALL_FORMATS:
+ use_hls = False # only for certain formats
+ extensions = [x.lstrip(".") for x in hls_formats.split(HLS_FORMATS_SPLIT)]
+ if episode_id is None:
+ if (
+ metadata := (await self._get_abs_expanded_audiobook(abs_item_id))
+ .media.tracks[0]
+ .metadata
+ ):
+ if metadata.ext.lstrip(".") in extensions:
+ use_hls = True
+ else:
+ podcast = await self._get_abs_expanded_podcast(prov_podcast_id=abs_item_id)
+ episode = None
+ for episode in podcast.media.episodes:
+ if episode.id_ == episode_id:
+ break
+ if episode and (metadata := episode.audio_track.metadata):
+ if metadata.ext.lstrip(".") in extensions:
+ use_hls = True
+
+ client_name = f"Music Assistant {self.instance_id}"
+ device_info = AbsDeviceInfo(
+ device_id=self.instance_id,
+ client_name=client_name,
+ client_version=self.mass.version,
+ manufacturer="",
+ model=self.mass.server_id,
)
- else:
- stream_url = (
- f"{self.mass.streams.base_url}/{self.instance_id}_episode_stream?"
- f"podcast_id={abs_podcast.id_}&episode_id={abs_episode.id_}"
+
+ session = await self._client.get_playback_session(
+ # These parameters give an hls if we don't enforce direct play stream,
+ # which is only a concat of the individual file's at abs
+ session_parameters=AbsPlaybackSessionParameters(
+ device_info=device_info,
+ force_direct_play=not use_hls,
+ force_transcode=use_hls,
+ # mimetypes are only checked for abs' internal "should transcode
+ # see https://github.com/advplyr/audiobookshelf/blob/master/server/managers/PlaybackSessionManager.js
+ supported_mime_types=[],
+ media_player=client_name,
+ ),
+ item_id=abs_item_id,
+ episode_id=episode_id,
)
- return StreamDetails(
- provider=self.instance_id,
- item_id=podcast_id,
- audio_format=AudioFormat(
- content_type=content_type,
- ),
- media_type=MediaType.PODCAST_EPISODE,
- stream_type=StreamType.HTTP,
- can_seek=True,
- allow_seek=True,
- path=stream_url,
- )
+ if use_hls:
+ # Safety check.
+ track_url = session.audio_tracks[0].content_url
+ if track_url.split("/")[1] != "hls":
+ raise MediaNotFoundError("Did expect HLS stream for session playback")
+ self.logger.debug("Using an HLS stream for playback.")
+
+ self.sessions[mass_item_id] = SessionHelper(
+ abs_session_id=session.id_,
+ last_sync_time=time.time(),
+ hls_stream_open=asyncio.Event(),
+ )
+ return session
- async def _handle_audiobook_part_request(self, request: web.Request) -> web.Response:
+ @handle_refresh_token
+ async def _handle_session_part_request(self, request: web.Request) -> web.Response:
"""
Handle dynamic audiobook part stream request.
This is done because the token might expire, so we need to
generate a fresh url on each part.
"""
- if not (audiobook_id := request.query.get("audiobook_id")):
- return web.Response(status=400, text="Missing audiobook_id")
+ if not (session_id := request.query.get("session_id")):
+ return web.Response(status=400, text="Missing session_id")
if not (part_id := request.query.get("part_id")):
return web.Response(status=400, text="Missing part_id")
- abs_audiobook = await self._get_abs_expanded_audiobook(prov_audiobook_id=audiobook_id)
+ self.logger.debug(
+ "Handling session part request for session %s and part %s", session_id, part_id
+ )
+ try:
+ abs_session = await self._client.get_open_session(session_id=session_id)
+ except AbsSessionNotFoundError as err:
+ raise web.HTTPNotFound from err
part_id = int(part_id) # type: ignore[assignment]
try:
- part_track = abs_audiobook.media.tracks[part_id]
+ part_track = abs_session.audio_tracks[part_id]
except IndexError:
return web.Response(status=404, text="Part not found")
# redirect to the actual stream url
raise web.HTTPFound(location=stream_url)
- async def _handle_episode_request(self, request: web.Request) -> web.Response:
- """Podcast episode request.
-
- For a podcast episode, we only have a single file, but the token might be expired should
- user try to seek an episode.
- """
- if not (abs_podcast_id := request.query.get("podcast_id")):
- return web.Response(status=400, text="Missing podcast_id")
- if not (abs_episode_id := request.query.get("episode_id")):
- return web.Response(status=400, text="Missing episode_id")
- abs_podcast = await self._get_abs_expanded_podcast(prov_podcast_id=abs_podcast_id)
- abs_episode = None
- for abs_episode in abs_podcast.media.episodes:
- if abs_episode.id_ == abs_episode_id:
- break
- if abs_episode is None:
- return web.Response(status=400, text="Stream not found")
-
- base_url = str(self.config.get_value(CONF_URL))
- stream_url = f"{base_url}{abs_episode.audio_track.content_url}?token={self._client.token}"
-
- # redirect to the actual stream url
- raise web.HTTPFound(location=stream_url)
-
@handle_refresh_token
async def get_resume_position(self, item_id: str, media_type: MediaType) -> tuple[bool, int]:
"""Return finished:bool, position_ms: int."""
- progress: None | MediaProgress = None
- if media_type == MediaType.PODCAST_EPISODE:
- abs_podcast_id, abs_episode_id = item_id.split(" ")
- progress = await self._client.get_my_media_progress(
- item_id=abs_podcast_id, episode_id=abs_episode_id
- )
-
- if media_type == MediaType.AUDIOBOOK:
- progress = await self._client.get_my_media_progress(item_id=item_id)
-
- if progress is not None and progress.current_time is not None:
- self.logger.debug("Resume position: obtained.")
- return progress.is_finished, int(progress.current_time * 1000)
-
- return False, 0
+ # this method is called _before_ get_stream_details, so the playback session
+ # is created here.
+ session = await self._get_playback_session(mass_item_id=item_id)
+ finished = session.current_time > session.duration - 30
+ self.logger.debug("Resume position: obtained.")
+ return finished, int(session.current_time * 1000)
@handle_refresh_token
async def recommendations(self) -> list[RecommendationFolder]:
We ignore PODCAST (function is called on adding a podcast with position=None)
"""
+
+ async def _update_by_session(session_helper: SessionHelper, duration: int) -> bool:
+ now = time.time()
+ try:
+ await self._client.sync_open_session(
+ session_id=session_helper.abs_session_id,
+ parameters=SyncOpenSessionParameters(
+ current_time=position,
+ time_listened=now - session_helper.last_sync_time,
+ duration=duration,
+ ),
+ )
+ session_helper.last_sync_time = now
+ self.logger.debug("Synced playback session, position %s s.", position)
+ return True
+ except AbsSessionNotFoundError:
+ self.logger.error("Was unable to sync session.")
+ return False
+
if media_type == MediaType.PODCAST_EPISODE:
abs_podcast_id, abs_episode_id = prov_item_id.split(" ")
if media_item is None or not isinstance(media_item, PodcastEpisode):
return
+ if fully_played and position < media_item.duration - 30:
+ # faulty position update
+ # occurs sometimes, if a player disconnects unexpectedly, or reports
+ # a false position - seen this for MC players, but not for sendspin
+ return
+
if position == 0 and not fully_played:
# marked unplayed
mp = await self._client.get_my_media_progress(
return
duration = media_item.duration
- self.logger.debug(
- f"Updating media progress of {media_type.value}, title {media_item.name}."
- )
- await self._client.update_my_media_progress(
- item_id=abs_podcast_id,
- episode_id=abs_episode_id,
- duration_seconds=duration,
- progress_seconds=position,
- is_finished=fully_played,
- )
+ updated = False
+ if session_helper := self.sessions.get(prov_item_id):
+ updated = await _update_by_session(session_helper=session_helper, duration=duration)
+ if not updated:
+ self.logger.debug(
+ f"Updating media progress of {media_type.value}, title {media_item.name}."
+ )
+ await self._client.update_my_media_progress(
+ item_id=abs_podcast_id,
+ episode_id=abs_episode_id,
+ duration_seconds=duration,
+ progress_seconds=position,
+ is_finished=fully_played,
+ )
if media_type == MediaType.AUDIOBOOK:
# guard, see progress guard class docstrings for explanation
if media_item is None or not isinstance(media_item, Audiobook):
return
+ if fully_played and position < media_item.duration - 30:
+ # faulty position update, see above
+ return
+
if position == 0 and not fully_played:
# marked unplayed
mp = await self._client.get_my_media_progress(item_id=prov_item_id)
return
duration = media_item.duration
- self.logger.debug(f"Updating {media_type.value} named {media_item.name} progress")
- await self._client.update_my_media_progress(
- item_id=prov_item_id,
- duration_seconds=duration,
- progress_seconds=position,
- is_finished=fully_played,
- )
+ updated = False
+ if session_helper := self.sessions.get(prov_item_id):
+ updated = await _update_by_session(session_helper=session_helper, duration=duration)
+ if not updated:
+ self.logger.debug(f"Updating {media_type.value} named {media_item.name} progress")
+ await self._client.update_my_media_progress(
+ item_id=prov_item_id,
+ duration_seconds=duration,
+ progress_seconds=position,
+ is_finished=fully_played,
+ )
@handle_refresh_token
async def browse(self, path: str) -> Sequence[MediaItemType | ItemMapping | BrowseFolder]:
async def _socket_abs_refresh_token_expired(self) -> None:
await self.reauthenticate()
+ async def _socket_stream_open(self, stream: AbsStream) -> None:
+ # stream's id is the same as the playback session id
+ for session_helper in self.sessions.values():
+ if session_helper.abs_session_id == stream.id_:
+ session_helper.hls_stream_open.set()
+ break
+
async def reauthenticate(self) -> None:
"""Reauthorize the abs session config if refresh token expired."""
# some safe guarding should that function be called simultaneously