From: Fabian Munkes <105975993+fmunkes@users.noreply.github.com> Date: Tue, 10 Feb 2026 17:30:04 +0000 (+0100) Subject: ABS: Use playback sessions, and optionally allow HLS stream (#3079) X-Git-Url: https://git.kitaultman.com/?a=commitdiff_plain;h=348b622e05256bb3e311d2dd8ac7bd7a127c3a44;p=music-assistant-server.git ABS: Use playback sessions, and optionally allow HLS stream (#3079) * option to use abs sessions * bump lib * more useful exception * use session progress if it is available * add session helper * playback hls * bump * remove redundant methods * add media type * more hls * cleanup & session creation lock * typo * stream socket event * bump lib * edge case --- diff --git a/music_assistant/providers/audiobookshelf/__init__.py b/music_assistant/providers/audiobookshelf/__init__.py index 776ce84e..4b6415a1 100644 --- a/music_assistant/providers/audiobookshelf/__init__.py +++ b/music_assistant/providers/audiobookshelf/__init__.py @@ -15,8 +15,12 @@ from aioaudiobookshelf.client.items import LibraryItemExpandedBook as AbsLibrary from aioaudiobookshelf.client.items import ( LibraryItemExpandedPodcast as AbsLibraryItemExpandedPodcast, ) +from aioaudiobookshelf.client.items import PlaybackSessionExpanded as AbsPlaybackSessionExpanded +from aioaudiobookshelf.client.items import PlaybackSessionParameters as AbsPlaybackSessionParameters +from aioaudiobookshelf.client.session import SyncOpenSessionParameters from aioaudiobookshelf.exceptions import LoginError as AbsLoginError from aioaudiobookshelf.exceptions import RefreshTokenExpiredError +from aioaudiobookshelf.exceptions import SessionNotFoundError as AbsSessionNotFoundError from aioaudiobookshelf.schema.author import AuthorExpanded from aioaudiobookshelf.schema.calls_authors import ( AuthorWithItemsAndSeries as AbsAuthorWithItemsAndSeries, @@ -29,6 +33,7 @@ from aioaudiobookshelf.schema.library import ( LibraryItemMinifiedPodcast, ) from aioaudiobookshelf.schema.library import LibraryMediaType as AbsLibraryMediaType +from aioaudiobookshelf.schema.session import DeviceInfo as AbsDeviceInfo from aioaudiobookshelf.schema.shelf import ( SeriesShelf, ShelfAuthors, @@ -41,7 +46,11 @@ from aioaudiobookshelf.schema.shelf import ( from aioaudiobookshelf.schema.shelf import ShelfId as AbsShelfId from aioaudiobookshelf.schema.shelf import ShelfType as AbsShelfType from aiohttp import web -from music_assistant_models.config_entries import ConfigEntry, ConfigValueType, ProviderConfig +from music_assistant_models.config_entries import ( + ConfigEntry, + ConfigValueType, + ProviderConfig, +) from music_assistant_models.enums import ( ConfigEntryType, ContentType, @@ -78,20 +87,25 @@ from .constants import ( CACHE_KEY_LIBRARIES, CONF_API_TOKEN, CONF_HIDE_EMPTY_PODCASTS, + CONF_HLS_FORMATS, CONF_OLD_TOKEN, CONF_PASSWORD, CONF_URL, + CONF_USE_HLS, CONF_USERNAME, CONF_VERIFY_SSL, + HLS_ALL_FORMATS, + HLS_FORMATS_SPLIT, AbsBrowseItemsBookTranslationKey, AbsBrowseItemsPodcastTranslationKey, AbsBrowsePaths, ) -from .helpers import LibrariesHelper, LibraryHelper, ProgressGuard +from .helpers import LibrariesHelper, LibraryHelper, ProgressGuard, SessionHelper if TYPE_CHECKING: from aioaudiobookshelf.schema.events_socket import LibraryItemRemoved from aioaudiobookshelf.schema.media_progress import MediaProgress + from aioaudiobookshelf.schema.streams import Stream as AbsStream from aioaudiobookshelf.schema.user import User from music_assistant_models.media_items import Podcast from music_assistant_models.provider import ProviderManifest @@ -175,6 +189,26 @@ async def get_config_entries( required=False, hidden=True, ), + ConfigEntry( + key=CONF_USE_HLS, + type=ConfigEntryType.BOOLEAN, + label="Stream via HLS from ABS.", + description="Use an HLS stream when streaming from audiobookshelf.", + required=False, + default_value=False, + advanced=True, + ), + ConfigEntry( + key=CONF_HLS_FORMATS, + type=ConfigEntryType.STRING, + label=f"Use HLS for these file extensions. Separate with ';'. Use {HLS_ALL_FORMATS} for" + " all formats.", + description="Use HLS only for these file extensions." + f" Separate with ;. E.g. m4b or m4b;aac or {HLS_ALL_FORMATS}", + required=False, + default_value="m4b", + advanced=True, + ), ConfigEntry( key=CONF_VERIFY_SSL, type=ConfigEntryType.BOOLEAN, @@ -226,6 +260,8 @@ class Audiobookshelf(MusicProvider): async def handle_async_init(self) -> None: """Pass config values to client and initialize.""" self._on_unload_callbacks: list[Callable[[], None]] = [] + self.sessions: dict[str, SessionHelper] = {} # key is the mass_item_id + self.create_session_lock = asyncio.Lock() base_url = str(self.config.get_value(CONF_URL)) username = str(self.config.get_value(CONF_USERNAME)) password = str(self.config.get_value(CONF_PASSWORD)) @@ -324,6 +360,8 @@ for more details. on_refresh_token_expired=self._socket_abs_refresh_token_expired ) + self._client_socket.set_stream_callbacks(on_stream_open=self._socket_stream_open) + # progress guard self.progress_guard = ProgressGuard() @@ -334,12 +372,7 @@ for more details. # register dynamic stream route for audiobook parts self._on_unload_callbacks.append( self.mass.streams.register_dynamic_route( - f"/{self.instance_id}_part_stream", self._handle_audiobook_part_request - ) - ) - self._on_unload_callbacks.append( - self.mass.streams.register_dynamic_route( - f"/{self.instance_id}_episode_stream", self._handle_episode_request + f"/{self.instance_id}_part_stream", self._handle_session_part_request ) ) @@ -553,31 +586,40 @@ for more details. async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails: """Get stream of item.""" - if media_type == MediaType.PODCAST_EPISODE: - return await self._get_stream_details_episode(item_id) - if media_type == MediaType.AUDIOBOOK: - abs_audiobook = await self._get_abs_expanded_audiobook(prov_audiobook_id=item_id) - return await self._get_stream_details_audiobook(abs_audiobook) + # We always create a playback session. The default is direct playback. + # In that case, session.tracks holds the exact same as the audiobook/ podcast.track, + # so we only use the session to update our progress. + # + # In the case of hls the session has an hls stream as track. + if media_type in (MediaType.PODCAST_EPISODE, MediaType.AUDIOBOOK): + session = await self._get_playback_session(mass_item_id=item_id) + return await self._get_stream_details_session( + session, session_helper=self.sessions[item_id], media_type=media_type + ) raise MediaNotFoundError("Stream unknown") - async def _get_stream_details_audiobook( - self, abs_audiobook: AbsLibraryItemExpandedBook + async def _get_stream_details_session( + self, + abs_session: AbsPlaybackSessionExpanded, + session_helper: SessionHelper, + media_type: MediaType, ) -> StreamDetails: """Streamdetails audiobook. We always use a custom stream type, also for single file, such that we can handle an ffmpeg error and refresh our tokens. """ - tracks = abs_audiobook.media.tracks + abs_base_url = str(self.config.get_value(CONF_URL)) + tracks = abs_session.audio_tracks + if len(tracks) == 0: - raise MediaNotFoundError("Stream not found") + raise MediaNotFoundError("Session has no tracks.") content_type = ContentType.UNKNOWN - if abs_audiobook.media.tracks[0].metadata is not None: - content_type = ContentType.try_parse(abs_audiobook.media.tracks[0].metadata.ext) + if abs_session.audio_tracks[0].metadata is not None: + content_type = ContentType.try_parse(abs_session.audio_tracks[0].metadata.ext) file_parts: list[MultiPartPath] = [] - abs_base_url = str(self.config.get_value(CONF_URL)) if self.is_token_user: self.logger.debug("Token User - Streams are direct.") for idx, track in enumerate(tracks): @@ -588,71 +630,115 @@ for more details. # to ensure token is always valid, we create a dynamic url # this ensures that we always get a fresh token on each part # without having to deal with a custom stream etc. - # we also use this for the first part, otherwise we can't seek + # we also use this for a single track/ hls stream, otherwise we can't seek stream_url = ( f"{self.mass.streams.base_url}/{self.instance_id}_part_stream?" - f"audiobook_id={abs_audiobook.id_}&part_id={idx}" + f"session_id={abs_session.id_}&part_id={idx}" ) file_parts.append(MultiPartPath(path=stream_url, duration=track.duration)) + stream_type = StreamType.HLS if "hls" in file_parts[0].path else StreamType.HTTP + if stream_type == StreamType.HLS: + # wait for stream to be ready + try: + await asyncio.wait_for(session_helper.hls_stream_open.wait(), 10) + except TimeoutError: + self.logger.warning( + "Did not receive HLS stream open event after 10s, continuing anyways." + ) + return StreamDetails( provider=self.instance_id, - item_id=abs_audiobook.id_, + item_id=abs_session.id_, audio_format=AudioFormat(content_type=content_type), - media_type=MediaType.AUDIOBOOK, - stream_type=StreamType.HTTP, - duration=int(abs_audiobook.media.duration), - path=file_parts, + media_type=media_type, + stream_type=stream_type, + duration=int(abs_session.duration), + path=file_parts[0].path if len(file_parts) == 1 else file_parts, can_seek=True, allow_seek=True, ) - async def _get_stream_details_episode(self, podcast_id: str) -> StreamDetails: - """Streamdetails of a podcast episode. - - There are no multi-file podcasts in abs, but we use a custom - stream to handle possible ffmpeg errors. - """ - abs_podcast_id, abs_episode_id = podcast_id.split(" ") - abs_episode = None - - abs_podcast = await self._get_abs_expanded_podcast(prov_podcast_id=abs_podcast_id) - for abs_episode in abs_podcast.media.episodes: - if abs_episode.id_ == abs_episode_id: - break - if abs_episode is None: - raise MediaNotFoundError("Stream not found") - content_type = ContentType.UNKNOWN - if abs_episode.audio_track.metadata is not None: - content_type = ContentType.try_parse(abs_episode.audio_track.metadata.ext) + async def _get_playback_session(self, mass_item_id: str) -> AbsPlaybackSessionExpanded: + """Either creates or returns an open abs session.""" + async with self.create_session_lock: + # check for an available open session + if session_helper := self.sessions.get(mass_item_id): + with suppress(AbsSessionNotFoundError): + return await self._client.get_open_session( + session_id=session_helper.abs_session_id + ) - if self.is_token_user: - self.logger.debug("Token User - Stream is direct.") - # long lived API token, no need for detour - abs_base_url = str(self.config.get_value(CONF_URL)) - stream_url = ( - f"{abs_base_url}{abs_episode.audio_track.content_url}?token={self._client.token}" + item_ids = mass_item_id.split(" ") + abs_item_id = item_ids[0] + episode_id = item_ids[1] if len(item_ids) == 2 else None + + # Create a new session + ## Check HLS usage + use_hls = bool(self.config.get_value(CONF_USE_HLS)) + hls_formats = str(self.config.get_value(CONF_HLS_FORMATS)) + if use_hls and hls_formats != HLS_ALL_FORMATS: + use_hls = False # only for certain formats + extensions = [x.lstrip(".") for x in hls_formats.split(HLS_FORMATS_SPLIT)] + if episode_id is None: + if ( + metadata := (await self._get_abs_expanded_audiobook(abs_item_id)) + .media.tracks[0] + .metadata + ): + if metadata.ext.lstrip(".") in extensions: + use_hls = True + else: + podcast = await self._get_abs_expanded_podcast(prov_podcast_id=abs_item_id) + episode = None + for episode in podcast.media.episodes: + if episode.id_ == episode_id: + break + if episode and (metadata := episode.audio_track.metadata): + if metadata.ext.lstrip(".") in extensions: + use_hls = True + + client_name = f"Music Assistant {self.instance_id}" + device_info = AbsDeviceInfo( + device_id=self.instance_id, + client_name=client_name, + client_version=self.mass.version, + manufacturer="", + model=self.mass.server_id, ) - else: - stream_url = ( - f"{self.mass.streams.base_url}/{self.instance_id}_episode_stream?" - f"podcast_id={abs_podcast.id_}&episode_id={abs_episode.id_}" + + session = await self._client.get_playback_session( + # These parameters give an hls if we don't enforce direct play stream, + # which is only a concat of the individual file's at abs + session_parameters=AbsPlaybackSessionParameters( + device_info=device_info, + force_direct_play=not use_hls, + force_transcode=use_hls, + # mimetypes are only checked for abs' internal "should transcode + # see https://github.com/advplyr/audiobookshelf/blob/master/server/managers/PlaybackSessionManager.js + supported_mime_types=[], + media_player=client_name, + ), + item_id=abs_item_id, + episode_id=episode_id, ) - return StreamDetails( - provider=self.instance_id, - item_id=podcast_id, - audio_format=AudioFormat( - content_type=content_type, - ), - media_type=MediaType.PODCAST_EPISODE, - stream_type=StreamType.HTTP, - can_seek=True, - allow_seek=True, - path=stream_url, - ) + if use_hls: + # Safety check. + track_url = session.audio_tracks[0].content_url + if track_url.split("/")[1] != "hls": + raise MediaNotFoundError("Did expect HLS stream for session playback") + self.logger.debug("Using an HLS stream for playback.") + + self.sessions[mass_item_id] = SessionHelper( + abs_session_id=session.id_, + last_sync_time=time.time(), + hls_stream_open=asyncio.Event(), + ) + return session - async def _handle_audiobook_part_request(self, request: web.Request) -> web.Response: + @handle_refresh_token + async def _handle_session_part_request(self, request: web.Request) -> web.Response: """ Handle dynamic audiobook part stream request. @@ -660,14 +746,20 @@ for more details. This is done because the token might expire, so we need to generate a fresh url on each part. """ - if not (audiobook_id := request.query.get("audiobook_id")): - return web.Response(status=400, text="Missing audiobook_id") + if not (session_id := request.query.get("session_id")): + return web.Response(status=400, text="Missing session_id") if not (part_id := request.query.get("part_id")): return web.Response(status=400, text="Missing part_id") - abs_audiobook = await self._get_abs_expanded_audiobook(prov_audiobook_id=audiobook_id) + self.logger.debug( + "Handling session part request for session %s and part %s", session_id, part_id + ) + try: + abs_session = await self._client.get_open_session(session_id=session_id) + except AbsSessionNotFoundError as err: + raise web.HTTPNotFound from err part_id = int(part_id) # type: ignore[assignment] try: - part_track = abs_audiobook.media.tracks[part_id] + part_track = abs_session.audio_tracks[part_id] except IndexError: return web.Response(status=404, text="Part not found") @@ -676,48 +768,15 @@ for more details. # redirect to the actual stream url raise web.HTTPFound(location=stream_url) - async def _handle_episode_request(self, request: web.Request) -> web.Response: - """Podcast episode request. - - For a podcast episode, we only have a single file, but the token might be expired should - user try to seek an episode. - """ - if not (abs_podcast_id := request.query.get("podcast_id")): - return web.Response(status=400, text="Missing podcast_id") - if not (abs_episode_id := request.query.get("episode_id")): - return web.Response(status=400, text="Missing episode_id") - abs_podcast = await self._get_abs_expanded_podcast(prov_podcast_id=abs_podcast_id) - abs_episode = None - for abs_episode in abs_podcast.media.episodes: - if abs_episode.id_ == abs_episode_id: - break - if abs_episode is None: - return web.Response(status=400, text="Stream not found") - - base_url = str(self.config.get_value(CONF_URL)) - stream_url = f"{base_url}{abs_episode.audio_track.content_url}?token={self._client.token}" - - # redirect to the actual stream url - raise web.HTTPFound(location=stream_url) - @handle_refresh_token async def get_resume_position(self, item_id: str, media_type: MediaType) -> tuple[bool, int]: """Return finished:bool, position_ms: int.""" - progress: None | MediaProgress = None - if media_type == MediaType.PODCAST_EPISODE: - abs_podcast_id, abs_episode_id = item_id.split(" ") - progress = await self._client.get_my_media_progress( - item_id=abs_podcast_id, episode_id=abs_episode_id - ) - - if media_type == MediaType.AUDIOBOOK: - progress = await self._client.get_my_media_progress(item_id=item_id) - - if progress is not None and progress.current_time is not None: - self.logger.debug("Resume position: obtained.") - return progress.is_finished, int(progress.current_time * 1000) - - return False, 0 + # this method is called _before_ get_stream_details, so the playback session + # is created here. + session = await self._get_playback_session(mass_item_id=item_id) + finished = session.current_time > session.duration - 30 + self.logger.debug("Resume position: obtained.") + return finished, int(session.current_time * 1000) @handle_refresh_token async def recommendations(self) -> list[RecommendationFolder]: @@ -945,6 +1004,25 @@ for more details. We ignore PODCAST (function is called on adding a podcast with position=None) """ + + async def _update_by_session(session_helper: SessionHelper, duration: int) -> bool: + now = time.time() + try: + await self._client.sync_open_session( + session_id=session_helper.abs_session_id, + parameters=SyncOpenSessionParameters( + current_time=position, + time_listened=now - session_helper.last_sync_time, + duration=duration, + ), + ) + session_helper.last_sync_time = now + self.logger.debug("Synced playback session, position %s s.", position) + return True + except AbsSessionNotFoundError: + self.logger.error("Was unable to sync session.") + return False + if media_type == MediaType.PODCAST_EPISODE: abs_podcast_id, abs_episode_id = prov_item_id.split(" ") @@ -958,6 +1036,12 @@ for more details. if media_item is None or not isinstance(media_item, PodcastEpisode): return + if fully_played and position < media_item.duration - 30: + # faulty position update + # occurs sometimes, if a player disconnects unexpectedly, or reports + # a false position - seen this for MC players, but not for sendspin + return + if position == 0 and not fully_played: # marked unplayed mp = await self._client.get_my_media_progress( @@ -969,16 +1053,20 @@ for more details. return duration = media_item.duration - self.logger.debug( - f"Updating media progress of {media_type.value}, title {media_item.name}." - ) - await self._client.update_my_media_progress( - item_id=abs_podcast_id, - episode_id=abs_episode_id, - duration_seconds=duration, - progress_seconds=position, - is_finished=fully_played, - ) + updated = False + if session_helper := self.sessions.get(prov_item_id): + updated = await _update_by_session(session_helper=session_helper, duration=duration) + if not updated: + self.logger.debug( + f"Updating media progress of {media_type.value}, title {media_item.name}." + ) + await self._client.update_my_media_progress( + item_id=abs_podcast_id, + episode_id=abs_episode_id, + duration_seconds=duration, + progress_seconds=position, + is_finished=fully_played, + ) if media_type == MediaType.AUDIOBOOK: # guard, see progress guard class docstrings for explanation @@ -989,6 +1077,10 @@ for more details. if media_item is None or not isinstance(media_item, Audiobook): return + if fully_played and position < media_item.duration - 30: + # faulty position update, see above + return + if position == 0 and not fully_played: # marked unplayed mp = await self._client.get_my_media_progress(item_id=prov_item_id) @@ -998,13 +1090,17 @@ for more details. return duration = media_item.duration - self.logger.debug(f"Updating {media_type.value} named {media_item.name} progress") - await self._client.update_my_media_progress( - item_id=prov_item_id, - duration_seconds=duration, - progress_seconds=position, - is_finished=fully_played, - ) + updated = False + if session_helper := self.sessions.get(prov_item_id): + updated = await _update_by_session(session_helper=session_helper, duration=duration) + if not updated: + self.logger.debug(f"Updating {media_type.value} named {media_item.name} progress") + await self._client.update_my_media_progress( + item_id=prov_item_id, + duration_seconds=duration, + progress_seconds=position, + is_finished=fully_played, + ) @handle_refresh_token async def browse(self, path: str) -> Sequence[MediaItemType | ItemMapping | BrowseFolder]: @@ -1429,6 +1525,13 @@ for more details. async def _socket_abs_refresh_token_expired(self) -> None: await self.reauthenticate() + async def _socket_stream_open(self, stream: AbsStream) -> None: + # stream's id is the same as the playback session id + for session_helper in self.sessions.values(): + if session_helper.abs_session_id == stream.id_: + session_helper.hls_stream_open.set() + break + async def reauthenticate(self) -> None: """Reauthorize the abs session config if refresh token expired.""" # some safe guarding should that function be called simultaneously diff --git a/music_assistant/providers/audiobookshelf/constants.py b/music_assistant/providers/audiobookshelf/constants.py index 4442cdf0..18d2bc45 100644 --- a/music_assistant/providers/audiobookshelf/constants.py +++ b/music_assistant/providers/audiobookshelf/constants.py @@ -18,6 +18,11 @@ CONF_API_TOKEN = "api_token" # with jwt api token (>= v2.26) CONF_VERIFY_SSL = "verify_ssl" # optionally hide podcasts with no episodes CONF_HIDE_EMPTY_PODCASTS = "hide_empty_podcasts" +# hls options +CONF_USE_HLS = "use_session_hls" +CONF_HLS_FORMATS = "hls_formats" +HLS_FORMATS_SPLIT = ";" +HLS_ALL_FORMATS = "all" # CACHE CACHE_CATEGORY_LIBRARIES = 0 diff --git a/music_assistant/providers/audiobookshelf/helpers.py b/music_assistant/providers/audiobookshelf/helpers.py index 530dfb31..47793a72 100644 --- a/music_assistant/providers/audiobookshelf/helpers.py +++ b/music_assistant/providers/audiobookshelf/helpers.py @@ -1,5 +1,6 @@ """Helpers for Audiobookshelf provider.""" +import asyncio import time from dataclasses import dataclass, field @@ -26,6 +27,15 @@ class LibrariesHelper(DataClassDictMixin): podcasts: dict[str, LibraryHelper] = field(default_factory=dict) +@dataclass(kw_only=True) +class SessionHelper: + """Helper class to store some session information.""" + + abs_session_id: str + last_sync_time: float + hls_stream_open: asyncio.Event # only used for hls_streams, otherwise ignored + + @dataclass(kw_only=True) class _ProgressHelper: id_: str # audiobook or podcast id diff --git a/music_assistant/providers/audiobookshelf/manifest.json b/music_assistant/providers/audiobookshelf/manifest.json index 3a4d7786..196d89a9 100644 --- a/music_assistant/providers/audiobookshelf/manifest.json +++ b/music_assistant/providers/audiobookshelf/manifest.json @@ -6,7 +6,7 @@ "description": "Stream audiobooks and podcasts from your personal Audiobookshelf server.", "codeowners": ["@fmunkes"], "credits": ["[aioaudiobookshelf](https://github.com/music-assistant/aioaudiobookshelf)"], - "requirements": ["aioaudiobookshelf==0.1.10"], + "requirements": ["aioaudiobookshelf==0.1.13"], "documentation": "https://music-assistant.io/music-providers/audiobookshelf", "multi_instance": true } diff --git a/requirements_all.txt b/requirements_all.txt index 61e1083f..c898c2d8 100644 --- a/requirements_all.txt +++ b/requirements_all.txt @@ -1,7 +1,7 @@ # WARNING: this file is autogenerated! Brotli>=1.0.9 -aioaudiobookshelf==0.1.10 +aioaudiobookshelf==0.1.13 aiodns>=3.2.0 aiofiles==24.1.0 aiohttp==3.13.3