From: Fabian Munkes <105975993+fmunkes@users.noreply.github.com> Date: Tue, 25 Feb 2025 22:31:40 +0000 (+0100) Subject: Enhancement: Implement resume state syncing for ABS provider (#1971) X-Git-Url: https://git.kitaultman.com/?a=commitdiff_plain;h=bd65a361d281eaf23b42144751ebb5b8823e0738;p=music-assistant-server.git Enhancement: Implement resume state syncing for ABS provider (#1971) --- diff --git a/music_assistant/providers/audiobookshelf/__init__.py b/music_assistant/providers/audiobookshelf/__init__.py index 6d6dcdf1..7058c00c 100644 --- a/music_assistant/providers/audiobookshelf/__init__.py +++ b/music_assistant/providers/audiobookshelf/__init__.py @@ -19,6 +19,7 @@ from aioaudiobookshelf.schema.library import ( LibraryItemExpanded, LibraryItemExpandedBook, LibraryItemExpandedPodcast, + LibraryItemMinifiedPodcast, ) from aioaudiobookshelf.schema.library import LibraryMediaType as AbsLibraryMediaType from music_assistant_models.config_entries import ConfigEntry, ConfigValueType, ProviderConfig @@ -31,10 +32,12 @@ from music_assistant_models.enums import ( ) from music_assistant_models.errors import LoginFailed, MediaNotFoundError from music_assistant_models.media_items import ( + Audiobook, AudioFormat, BrowseFolder, MediaItemType, MediaItemTypeOrItemMapping, + PodcastEpisode, ) from music_assistant_models.streamdetails import StreamDetails @@ -59,12 +62,13 @@ from .constants import ( AbsBrowseItemsPodcast, AbsBrowsePaths, ) -from .helpers import LibrariesHelper, LibraryHelper +from .helpers import LibrariesHelper, LibraryHelper, ProgressGuard if TYPE_CHECKING: from aioaudiobookshelf.schema.events_socket import LibraryItemRemoved from aioaudiobookshelf.schema.media_progress import MediaProgress - from music_assistant_models.media_items import Audiobook, Podcast, PodcastEpisode + from aioaudiobookshelf.schema.user import User + from music_assistant_models.media_items import Podcast from music_assistant_models.provider import ProviderManifest from music_assistant.mass import MusicAssistant @@ -190,6 +194,17 @@ class Audiobookshelf(MusicProvider): on_items_updated=self._socket_abs_item_changed, ) + self._client_socket.set_user_callbacks( + on_user_item_progress_updated=self._socket_abs_user_item_progress_updated, + ) + + # progress guard + self.progress_guard = ProgressGuard() + + # update playlog information if just started + user = await self._client.get_my_user() + await self._set_playlog_from_user(user) + async def unload(self, is_removed: bool = False) -> None: """ Handle unload/close of the provider. @@ -220,11 +235,14 @@ class Audiobookshelf(MusicProvider): await super().sync_library(media_type=media_type) await self._cache_set_helper_libraries() + # update playlog + user = await self._client.get_my_user() + await self._set_playlog_from_user(user) + async def get_library_podcasts(self) -> AsyncGenerator[Podcast, None]: """Retrieve library/subscribed podcasts from the provider. - Minified podcast information is enough, but we take the full information - and rely on cache afterwards. + Minified podcast information is enough. """ for pod_lib_id in self.libraries.podcasts: async for response in self._client.get_library_items(library_id=pod_lib_id): @@ -233,12 +251,10 @@ class Audiobookshelf(MusicProvider): podcast_ids = [x.id_ for x in response.results] # store uuids self.libraries.podcasts[pod_lib_id].item_ids.update(podcast_ids) - podcasts_expanded = await self._client.get_library_item_batch_podcast( - item_ids=podcast_ids - ) - for podcast_expanded in podcasts_expanded: + for podcast_minified in response.results: + assert isinstance(podcast_minified, LibraryItemMinifiedPodcast) mass_podcast = parse_podcast( - abs_podcast=podcast_expanded, + abs_podcast=podcast_minified, lookup_key=self.lookup_key, domain=self.domain, instance_id=self.instance_id, @@ -263,14 +279,7 @@ class Audiobookshelf(MusicProvider): return abs_podcast async def get_podcast(self, prov_podcast_id: str) -> Podcast: - """Get single podcast. - - Basis information, - abs_podcast = await self._client.get_library_item_podcast( - podcast_id=prov_podcast_id, expanded=False - ), - would be sufficient, but we rely on cache. - """ + """Get single podcast.""" abs_podcast = await self._get_abs_expanded_podcast(prov_podcast_id=prov_podcast_id) return parse_podcast( abs_podcast=abs_podcast, @@ -315,16 +324,20 @@ class Audiobookshelf(MusicProvider): episode_cnt += 1 return episode_list - async def get_podcast_episode(self, prov_episode_id: str) -> PodcastEpisode: + async def get_podcast_episode( + self, prov_episode_id: str, add_progress: bool = True + ) -> PodcastEpisode: """Get single podcast episode.""" prov_podcast_id, e_id = prov_episode_id.split(" ") abs_podcast = await self._get_abs_expanded_podcast(prov_podcast_id=prov_podcast_id) episode_cnt = 1 for abs_episode in abs_podcast.media.episodes: if abs_episode.id_ == e_id: - progress = await self._client.get_my_media_progress( - item_id=prov_podcast_id, episode_id=abs_episode.id_ - ) + progress = None + if add_progress: + progress = await self._client.get_my_media_progress( + item_id=prov_podcast_id, episode_id=abs_episode.id_ + ) return parse_podcast_episode( episode=abs_episode, prov_podcast_id=prov_podcast_id, @@ -562,10 +575,30 @@ class Audiobookshelf(MusicProvider): """ if media_type == MediaType.PODCAST_EPISODE: abs_podcast_id, abs_episode_id = prov_item_id.split(" ") - mass_podcast_episode = await self.get_podcast_episode(prov_item_id) - duration = mass_podcast_episode.duration + + # guard, see progress guard class docstrings for explanation + if not self.progress_guard.guard_ok_mass( + item_id=abs_podcast_id, episode_id=abs_episode_id + ): + return + self.progress_guard.add_progress(item_id=abs_podcast_id, episode_id=abs_episode_id) + + if media_item is None or not isinstance(media_item, PodcastEpisode): + return + + if position == 0 and not fully_played: + # marked unplayed + mp = await self._client.get_my_media_progress( + item_id=abs_podcast_id, episode_id=abs_episode_id + ) + if mp is not None: + await self._client.remove_my_media_progress(media_progress_id=mp.id_) + self.logger.debug(f"Removed media progress of {media_type.value}.") + return + + duration = media_item.duration self.logger.debug( - f"Updating media progress of {media_type.value}, title {mass_podcast_episode.name}." + f"Updating media progress of {media_type.value}, title {media_item.name}." ) await self._client.update_my_media_progress( item_id=abs_podcast_id, @@ -574,10 +607,26 @@ class Audiobookshelf(MusicProvider): progress_seconds=position, is_finished=fully_played, ) + if media_type == MediaType.AUDIOBOOK: - mass_audiobook = await self.get_audiobook(prov_item_id) - duration = mass_audiobook.duration - self.logger.debug(f"Updating {media_type.value} named {mass_audiobook.name} progress") + # guard, see progress guard class docstrings for explanation + if not self.progress_guard.guard_ok_mass(item_id=prov_item_id): + return + self.progress_guard.add_progress(item_id=prov_item_id) + + if media_item is None or not isinstance(media_item, Audiobook): + return + + if position == 0 and not fully_played: + # marked unplayed + mp = await self._client.get_my_media_progress(item_id=prov_item_id) + if mp is not None: + await self._client.remove_my_media_progress(media_progress_id=mp.id_) + self.logger.debug(f"Removed media progress of {media_type.value}.") + return + + duration = media_item.duration + self.logger.debug(f"Updating {media_type.value} named {media_item.name} progress") await self._client.update_my_media_progress( item_id=prov_item_id, duration_seconds=duration, @@ -963,6 +1012,111 @@ class Audiobookshelf(MusicProvider): await self._cache_set_helper_libraries() + async def _socket_abs_user_item_progress_updated( + self, id_: str, progress: MediaProgress + ) -> None: + """To update continue listening. + + ABS reports every 15s and immediately on play state change. + This callback is called per item if a progress is changed: + - a change in position + - the item is finished + But it is _not_called, if a progress is reset/ discarded. + """ + # guard, see progress guard class docstrings for explanation + if not self.progress_guard.guard_ok_abs(abs_progress=progress): + return + + known_ids = self._get_all_known_item_ids() + if progress.library_item_id not in known_ids: + return + + self.logger.debug(f"Updated progress of item {progress.library_item_id} via socket.") + + if progress.episode_id is None: + await self._update_playlog_book(progress) + return + await self._update_playlog_episode(progress) + + def _get_all_known_item_ids(self) -> set[str]: + known_ids = set() + for lib in self.libraries.podcasts.values(): + known_ids.update(lib.item_ids) + for lib in self.libraries.audiobooks.values(): + known_ids.update(lib.item_ids) + + return known_ids + + async def _set_playlog_from_user(self, user: User) -> None: + """Update on user callback. + + User holds also all media progresses specific to that user. + + The function 'guard_ok_abs' uses the timestamp of the last update in abs, thus after an + initial progress update, an unchanged update will not trigger a (useless) playlog update. + + We do not sync removed progresses for the sake of simplicity. + """ + await self._set_playlog_from_user_sync(user.media_progress) + + async def _set_playlog_from_user_sync(self, progresses: list[MediaProgress]) -> None: + # for debugging + __updated_items = 0 + + known_ids = self._get_all_known_item_ids() + + for progress in progresses: + # Guard. Also makes sure, that we don't write to db again if no state change happened. + # This is achieved by adding a Helper Progress in the update playlog functions, which + # then has the most recent timestamp. If a subsequent progress sent by abs has an older + # timestamp, we do not update again. + if not self.progress_guard.guard_ok_abs(progress): + continue + if not progress.current_time >= 30: + # same as mass default, only > 30s + continue + if progress.library_item_id not in known_ids: + continue + __updated_items += 1 + if progress.episode_id is None: + await self._update_playlog_book(progress) + else: + await self._update_playlog_episode(progress) + self.logger.debug(f"Updated {__updated_items} from full playlog.") + + async def _update_playlog_book(self, progress: MediaProgress) -> None: + # helper progress also ensures no useless progress updates, + # see comment above + self.progress_guard.add_progress(progress.library_item_id) + mass_audiobook = await self.mass.music.get_library_item_by_prov_id( + media_type=MediaType.AUDIOBOOK, + item_id=progress.library_item_id, + provider_instance_id_or_domain=self.instance_id, + ) + if mass_audiobook is None: + return + await self.mass.music.mark_item_played( + mass_audiobook, + fully_played=progress.is_finished, + seconds_played=int(progress.current_time), + ) + + async def _update_playlog_episode(self, progress: MediaProgress) -> None: + # helper progress also ensures no useless progress updates, + # see comment above + self.progress_guard.add_progress(progress.library_item_id, progress.episode_id) + _episode_id = f"{progress.library_item_id} {progress.episode_id}" + try: + # need to obtain full podcast, and then search for episode + mass_episode = await self.get_podcast_episode(_episode_id, add_progress=False) + except MediaNotFoundError: + return + await self.mass.music.mark_item_played( + mass_episode, + fully_played=progress.is_finished, + seconds_played=int(progress.current_time), + ) + async def _cache_set_helper_libraries(self) -> None: await self.mass.cache.set( key=CACHE_KEY_LIBRARIES, diff --git a/music_assistant/providers/audiobookshelf/helpers.py b/music_assistant/providers/audiobookshelf/helpers.py index 3390b570..530dfb31 100644 --- a/music_assistant/providers/audiobookshelf/helpers.py +++ b/music_assistant/providers/audiobookshelf/helpers.py @@ -1,7 +1,9 @@ """Helpers for Audiobookshelf provider.""" +import time from dataclasses import dataclass, field +from aioaudiobookshelf.schema.media_progress import MediaProgress from mashumaro.mixins.dict import DataClassDictMixin @@ -22,3 +24,87 @@ class LibrariesHelper(DataClassDictMixin): audiobooks: dict[str, LibraryHelper] = field(default_factory=dict) podcasts: dict[str, LibraryHelper] = field(default_factory=dict) + + +@dataclass(kw_only=True) +class _ProgressHelper: + id_: str # audiobook or podcast id + episode_id: str | None = None + last_update_ms: int # last update in ms epoch (same as last_update in abs) + + +class ProgressGuard: + """Class used to avoid ping pong between abs and mass. + + We continuously update the progress from mass to abs with the provider's on_played function. + We also register callbacks for progress reports from abs to mass. This is not only triggered + on external updates, but also on our own update. To avoid messages going back and forth, this + class is used. + """ + + def __init__(self) -> None: + """Init.""" + self._progresses: list[_ProgressHelper] = [] + self._max_progresses = 100 + # 12s have to have passed before we accept an external progress update + # abs updates every 15 s + self._min_time_between_updates_ms = 12000 + + def _get_progress(self, item_id: str, episode_id: str | None = None) -> _ProgressHelper | None: + """Get a helper progress.""" + for x in self._progresses: + if x.id_ == item_id and x.episode_id == episode_id: + return x + return None + + def _remove_oldest(self) -> None: + """Remove oldest helper progress.""" + progresses = sorted(self._progresses, key=lambda x: x.last_update_ms) + if len(progresses) > 0: + self._progresses.remove(progresses[0]) + + def remove_progress(self, item_id: str, episode_id: str | None = None) -> None: + """Remove a helper progress.""" + progress = self._get_progress(item_id=item_id, episode_id=episode_id) + if progress is not None: + self._progresses.remove(progress) + + def add_progress(self, item_id: str, episode_id: str | None = None) -> None: + """Store a timestamp for the last update of an audiobook or podcast episode, mass ids.""" + if len(self._progresses) > self._max_progresses: + self._remove_oldest() + self.remove_progress(item_id=item_id, episode_id=episode_id) + progress = _ProgressHelper( + id_=item_id, episode_id=episode_id, last_update_ms=int(time.time() * 1000) + ) + self._progresses.append(progress) + + def guard_ok_abs(self, abs_progress: MediaProgress) -> bool: + """Check, if we may update against an abs media progress. + + The abs media progress has a property last_update_ms, which also reflects non + mass external updates. Here, we compare this property against a potential + stored one. + """ + item_id = abs_progress.library_item_id + episode_id = abs_progress.episode_id + stored_progress = self._get_progress(item_id=item_id, episode_id=episode_id) + if stored_progress is None: + return True + return bool( + abs_progress.last_update - stored_progress.last_update_ms + >= self._min_time_between_updates_ms + ) + + def guard_ok_mass(self, item_id: str, episode_id: str | None = None) -> bool: + """Check, if we may update against a mass internal item. + + Here, we use the current time and compare it against the stored time. + """ + stored_progress = self._get_progress(item_id=item_id, episode_id=episode_id) + if stored_progress is None: + return True + return ( + int(time.time() * 1000) - stored_progress.last_update_ms + >= self._min_time_between_updates_ms + ) diff --git a/music_assistant/providers/audiobookshelf/manifest.json b/music_assistant/providers/audiobookshelf/manifest.json index c7baa3c2..444e5954 100644 --- a/music_assistant/providers/audiobookshelf/manifest.json +++ b/music_assistant/providers/audiobookshelf/manifest.json @@ -7,7 +7,7 @@ "@fmunkes" ], "requirements": [ - "aioaudiobookshelf==0.1.1" + "aioaudiobookshelf==0.1.2" ], "documentation": "https://music-assistant.io/music-providers/audiobookshelf", "multi_instance": true diff --git a/music_assistant/providers/audiobookshelf/parsers.py b/music_assistant/providers/audiobookshelf/parsers.py index c028d518..c017b391 100644 --- a/music_assistant/providers/audiobookshelf/parsers.py +++ b/music_assistant/providers/audiobookshelf/parsers.py @@ -73,7 +73,7 @@ def parse_podcast( mass_podcast.metadata.genres = set(abs_podcast.media.metadata.genres) mass_podcast.metadata.release_date = abs_podcast.media.metadata.release_date - if isinstance(abs_podcast, AbsLibraryItemExpandedPodcast): + if isinstance(abs_podcast, AbsLibraryItemExpandedPodcast | AbsLibraryItemPodcast): mass_podcast.total_episodes = len(abs_podcast.media.episodes) elif isinstance(abs_podcast, AbsLibraryItemMinifiedPodcast): mass_podcast.total_episodes = abs_podcast.media.num_episodes diff --git a/requirements_all.txt b/requirements_all.txt index f0c2f69d..e5d7eb18 100644 --- a/requirements_all.txt +++ b/requirements_all.txt @@ -1,7 +1,7 @@ # WARNING: this file is autogenerated! Brotli>=1.0.9 -aioaudiobookshelf==0.1.1 +aioaudiobookshelf==0.1.2 aiodns>=3.2.0 aiofiles==24.1.0 aiohttp==3.11.12