Enhancement: Implement resume state syncing for ABS provider (#1971)
authorFabian Munkes <105975993+fmunkes@users.noreply.github.com>
Tue, 25 Feb 2025 22:31:40 +0000 (23:31 +0100)
committerGitHub <noreply@github.com>
Tue, 25 Feb 2025 22:31:40 +0000 (23:31 +0100)
music_assistant/providers/audiobookshelf/__init__.py
music_assistant/providers/audiobookshelf/helpers.py
music_assistant/providers/audiobookshelf/manifest.json
music_assistant/providers/audiobookshelf/parsers.py
requirements_all.txt

index 6d6dcdf1e42780104b2687dc1877cd0ed9ed70de..7058c00c38509cecde9b140753ed3db3182dfd26 100644 (file)
@@ -19,6 +19,7 @@ from aioaudiobookshelf.schema.library import (
     LibraryItemExpanded,
     LibraryItemExpandedBook,
     LibraryItemExpandedPodcast,
+    LibraryItemMinifiedPodcast,
 )
 from aioaudiobookshelf.schema.library import LibraryMediaType as AbsLibraryMediaType
 from music_assistant_models.config_entries import ConfigEntry, ConfigValueType, ProviderConfig
@@ -31,10 +32,12 @@ from music_assistant_models.enums import (
 )
 from music_assistant_models.errors import LoginFailed, MediaNotFoundError
 from music_assistant_models.media_items import (
+    Audiobook,
     AudioFormat,
     BrowseFolder,
     MediaItemType,
     MediaItemTypeOrItemMapping,
+    PodcastEpisode,
 )
 from music_assistant_models.streamdetails import StreamDetails
 
@@ -59,12 +62,13 @@ from .constants import (
     AbsBrowseItemsPodcast,
     AbsBrowsePaths,
 )
-from .helpers import LibrariesHelper, LibraryHelper
+from .helpers import LibrariesHelper, LibraryHelper, ProgressGuard
 
 if TYPE_CHECKING:
     from aioaudiobookshelf.schema.events_socket import LibraryItemRemoved
     from aioaudiobookshelf.schema.media_progress import MediaProgress
-    from music_assistant_models.media_items import Audiobook, Podcast, PodcastEpisode
+    from aioaudiobookshelf.schema.user import User
+    from music_assistant_models.media_items import Podcast
     from music_assistant_models.provider import ProviderManifest
 
     from music_assistant.mass import MusicAssistant
@@ -190,6 +194,17 @@ class Audiobookshelf(MusicProvider):
             on_items_updated=self._socket_abs_item_changed,
         )
 
+        self._client_socket.set_user_callbacks(
+            on_user_item_progress_updated=self._socket_abs_user_item_progress_updated,
+        )
+
+        # progress guard
+        self.progress_guard = ProgressGuard()
+
+        # update playlog information if just started
+        user = await self._client.get_my_user()
+        await self._set_playlog_from_user(user)
+
     async def unload(self, is_removed: bool = False) -> None:
         """
         Handle unload/close of the provider.
@@ -220,11 +235,14 @@ class Audiobookshelf(MusicProvider):
         await super().sync_library(media_type=media_type)
         await self._cache_set_helper_libraries()
 
+        # update playlog
+        user = await self._client.get_my_user()
+        await self._set_playlog_from_user(user)
+
     async def get_library_podcasts(self) -> AsyncGenerator[Podcast, None]:
         """Retrieve library/subscribed podcasts from the provider.
 
-        Minified podcast information is enough, but we take the full information
-        and rely on cache afterwards.
+        Minified podcast information is enough.
         """
         for pod_lib_id in self.libraries.podcasts:
             async for response in self._client.get_library_items(library_id=pod_lib_id):
@@ -233,12 +251,10 @@ class Audiobookshelf(MusicProvider):
                 podcast_ids = [x.id_ for x in response.results]
                 # store uuids
                 self.libraries.podcasts[pod_lib_id].item_ids.update(podcast_ids)
-                podcasts_expanded = await self._client.get_library_item_batch_podcast(
-                    item_ids=podcast_ids
-                )
-                for podcast_expanded in podcasts_expanded:
+                for podcast_minified in response.results:
+                    assert isinstance(podcast_minified, LibraryItemMinifiedPodcast)
                     mass_podcast = parse_podcast(
-                        abs_podcast=podcast_expanded,
+                        abs_podcast=podcast_minified,
                         lookup_key=self.lookup_key,
                         domain=self.domain,
                         instance_id=self.instance_id,
@@ -263,14 +279,7 @@ class Audiobookshelf(MusicProvider):
         return abs_podcast
 
     async def get_podcast(self, prov_podcast_id: str) -> Podcast:
-        """Get single podcast.
-
-        Basis information,
-        abs_podcast = await self._client.get_library_item_podcast(
-            podcast_id=prov_podcast_id, expanded=False
-        ),
-        would be sufficient, but we rely on cache.
-        """
+        """Get single podcast."""
         abs_podcast = await self._get_abs_expanded_podcast(prov_podcast_id=prov_podcast_id)
         return parse_podcast(
             abs_podcast=abs_podcast,
@@ -315,16 +324,20 @@ class Audiobookshelf(MusicProvider):
             episode_cnt += 1
         return episode_list
 
-    async def get_podcast_episode(self, prov_episode_id: str) -> PodcastEpisode:
+    async def get_podcast_episode(
+        self, prov_episode_id: str, add_progress: bool = True
+    ) -> PodcastEpisode:
         """Get single podcast episode."""
         prov_podcast_id, e_id = prov_episode_id.split(" ")
         abs_podcast = await self._get_abs_expanded_podcast(prov_podcast_id=prov_podcast_id)
         episode_cnt = 1
         for abs_episode in abs_podcast.media.episodes:
             if abs_episode.id_ == e_id:
-                progress = await self._client.get_my_media_progress(
-                    item_id=prov_podcast_id, episode_id=abs_episode.id_
-                )
+                progress = None
+                if add_progress:
+                    progress = await self._client.get_my_media_progress(
+                        item_id=prov_podcast_id, episode_id=abs_episode.id_
+                    )
                 return parse_podcast_episode(
                     episode=abs_episode,
                     prov_podcast_id=prov_podcast_id,
@@ -562,10 +575,30 @@ class Audiobookshelf(MusicProvider):
         """
         if media_type == MediaType.PODCAST_EPISODE:
             abs_podcast_id, abs_episode_id = prov_item_id.split(" ")
-            mass_podcast_episode = await self.get_podcast_episode(prov_item_id)
-            duration = mass_podcast_episode.duration
+
+            # guard, see progress guard class docstrings for explanation
+            if not self.progress_guard.guard_ok_mass(
+                item_id=abs_podcast_id, episode_id=abs_episode_id
+            ):
+                return
+            self.progress_guard.add_progress(item_id=abs_podcast_id, episode_id=abs_episode_id)
+
+            if media_item is None or not isinstance(media_item, PodcastEpisode):
+                return
+
+            if position == 0 and not fully_played:
+                # marked unplayed
+                mp = await self._client.get_my_media_progress(
+                    item_id=abs_podcast_id, episode_id=abs_episode_id
+                )
+                if mp is not None:
+                    await self._client.remove_my_media_progress(media_progress_id=mp.id_)
+                    self.logger.debug(f"Removed media progress of {media_type.value}.")
+                    return
+
+            duration = media_item.duration
             self.logger.debug(
-                f"Updating media progress of {media_type.value}, title {mass_podcast_episode.name}."
+                f"Updating media progress of {media_type.value}, title {media_item.name}."
             )
             await self._client.update_my_media_progress(
                 item_id=abs_podcast_id,
@@ -574,10 +607,26 @@ class Audiobookshelf(MusicProvider):
                 progress_seconds=position,
                 is_finished=fully_played,
             )
+
         if media_type == MediaType.AUDIOBOOK:
-            mass_audiobook = await self.get_audiobook(prov_item_id)
-            duration = mass_audiobook.duration
-            self.logger.debug(f"Updating {media_type.value} named {mass_audiobook.name} progress")
+            # guard, see progress guard class docstrings for explanation
+            if not self.progress_guard.guard_ok_mass(item_id=prov_item_id):
+                return
+            self.progress_guard.add_progress(item_id=prov_item_id)
+
+            if media_item is None or not isinstance(media_item, Audiobook):
+                return
+
+            if position == 0 and not fully_played:
+                # marked unplayed
+                mp = await self._client.get_my_media_progress(item_id=prov_item_id)
+                if mp is not None:
+                    await self._client.remove_my_media_progress(media_progress_id=mp.id_)
+                    self.logger.debug(f"Removed media progress of {media_type.value}.")
+                return
+
+            duration = media_item.duration
+            self.logger.debug(f"Updating {media_type.value} named {media_item.name} progress")
             await self._client.update_my_media_progress(
                 item_id=prov_item_id,
                 duration_seconds=duration,
@@ -963,6 +1012,111 @@ class Audiobookshelf(MusicProvider):
 
         await self._cache_set_helper_libraries()
 
+    async def _socket_abs_user_item_progress_updated(
+        self, id_: str, progress: MediaProgress
+    ) -> None:
+        """To update continue listening.
+
+        ABS reports every 15s and immediately on play state change.
+        This callback is called per item if a progress is changed:
+            - a change in position
+            - the item is finished
+        But it is _not_called, if a progress is reset/ discarded.
+        """
+        # guard, see progress guard class docstrings for explanation
+        if not self.progress_guard.guard_ok_abs(abs_progress=progress):
+            return
+
+        known_ids = self._get_all_known_item_ids()
+        if progress.library_item_id not in known_ids:
+            return
+
+        self.logger.debug(f"Updated progress of item {progress.library_item_id} via socket.")
+
+        if progress.episode_id is None:
+            await self._update_playlog_book(progress)
+            return
+        await self._update_playlog_episode(progress)
+
+    def _get_all_known_item_ids(self) -> set[str]:
+        known_ids = set()
+        for lib in self.libraries.podcasts.values():
+            known_ids.update(lib.item_ids)
+        for lib in self.libraries.audiobooks.values():
+            known_ids.update(lib.item_ids)
+
+        return known_ids
+
+    async def _set_playlog_from_user(self, user: User) -> None:
+        """Update on user callback.
+
+        User holds also all media progresses specific to that user.
+
+        The function 'guard_ok_abs' uses the timestamp of the last update in abs, thus after an
+        initial progress update, an unchanged update will not trigger a (useless) playlog update.
+
+        We do not sync removed progresses for the sake of simplicity.
+        """
+        await self._set_playlog_from_user_sync(user.media_progress)
+
+    async def _set_playlog_from_user_sync(self, progresses: list[MediaProgress]) -> None:
+        # for debugging
+        __updated_items = 0
+
+        known_ids = self._get_all_known_item_ids()
+
+        for progress in progresses:
+            # Guard. Also makes sure, that we don't write to db again if no state change happened.
+            # This is achieved by adding a Helper Progress in the update playlog functions, which
+            # then has the most recent timestamp. If a subsequent progress sent by abs has an older
+            # timestamp, we do not update again.
+            if not self.progress_guard.guard_ok_abs(progress):
+                continue
+            if not progress.current_time >= 30:
+                # same as mass default, only > 30s
+                continue
+            if progress.library_item_id not in known_ids:
+                continue
+            __updated_items += 1
+            if progress.episode_id is None:
+                await self._update_playlog_book(progress)
+            else:
+                await self._update_playlog_episode(progress)
+        self.logger.debug(f"Updated {__updated_items} from full playlog.")
+
+    async def _update_playlog_book(self, progress: MediaProgress) -> None:
+        # helper progress also ensures no useless progress updates,
+        # see comment above
+        self.progress_guard.add_progress(progress.library_item_id)
+        mass_audiobook = await self.mass.music.get_library_item_by_prov_id(
+            media_type=MediaType.AUDIOBOOK,
+            item_id=progress.library_item_id,
+            provider_instance_id_or_domain=self.instance_id,
+        )
+        if mass_audiobook is None:
+            return
+        await self.mass.music.mark_item_played(
+            mass_audiobook,
+            fully_played=progress.is_finished,
+            seconds_played=int(progress.current_time),
+        )
+
+    async def _update_playlog_episode(self, progress: MediaProgress) -> None:
+        # helper progress also ensures no useless progress updates,
+        # see comment above
+        self.progress_guard.add_progress(progress.library_item_id, progress.episode_id)
+        _episode_id = f"{progress.library_item_id} {progress.episode_id}"
+        try:
+            # need to obtain full podcast, and then search for episode
+            mass_episode = await self.get_podcast_episode(_episode_id, add_progress=False)
+        except MediaNotFoundError:
+            return
+        await self.mass.music.mark_item_played(
+            mass_episode,
+            fully_played=progress.is_finished,
+            seconds_played=int(progress.current_time),
+        )
+
     async def _cache_set_helper_libraries(self) -> None:
         await self.mass.cache.set(
             key=CACHE_KEY_LIBRARIES,
index 3390b5704a5a7a73c27313c3e08869584195cb61..530dfb316d832e4d14fe2909df2a5ae97646c53d 100644 (file)
@@ -1,7 +1,9 @@
 """Helpers for Audiobookshelf provider."""
 
+import time
 from dataclasses import dataclass, field
 
+from aioaudiobookshelf.schema.media_progress import MediaProgress
 from mashumaro.mixins.dict import DataClassDictMixin
 
 
@@ -22,3 +24,87 @@ class LibrariesHelper(DataClassDictMixin):
 
     audiobooks: dict[str, LibraryHelper] = field(default_factory=dict)
     podcasts: dict[str, LibraryHelper] = field(default_factory=dict)
+
+
+@dataclass(kw_only=True)
+class _ProgressHelper:
+    id_: str  # audiobook or podcast id
+    episode_id: str | None = None
+    last_update_ms: int  # last update in ms epoch (same as last_update in abs)
+
+
+class ProgressGuard:
+    """Class used to avoid ping pong between abs and mass.
+
+    We continuously update the progress from mass to abs with the provider's on_played function.
+    We also register callbacks for progress reports from abs to mass. This is not only triggered
+    on external updates, but also on our own update. To avoid messages going back and forth, this
+    class is used.
+    """
+
+    def __init__(self) -> None:
+        """Init."""
+        self._progresses: list[_ProgressHelper] = []
+        self._max_progresses = 100
+        # 12s have to have passed before we accept an external progress update
+        # abs updates every 15 s
+        self._min_time_between_updates_ms = 12000
+
+    def _get_progress(self, item_id: str, episode_id: str | None = None) -> _ProgressHelper | None:
+        """Get a helper progress."""
+        for x in self._progresses:
+            if x.id_ == item_id and x.episode_id == episode_id:
+                return x
+        return None
+
+    def _remove_oldest(self) -> None:
+        """Remove oldest helper progress."""
+        progresses = sorted(self._progresses, key=lambda x: x.last_update_ms)
+        if len(progresses) > 0:
+            self._progresses.remove(progresses[0])
+
+    def remove_progress(self, item_id: str, episode_id: str | None = None) -> None:
+        """Remove a helper progress."""
+        progress = self._get_progress(item_id=item_id, episode_id=episode_id)
+        if progress is not None:
+            self._progresses.remove(progress)
+
+    def add_progress(self, item_id: str, episode_id: str | None = None) -> None:
+        """Store a timestamp for the last update of an audiobook or podcast episode, mass ids."""
+        if len(self._progresses) > self._max_progresses:
+            self._remove_oldest()
+        self.remove_progress(item_id=item_id, episode_id=episode_id)
+        progress = _ProgressHelper(
+            id_=item_id, episode_id=episode_id, last_update_ms=int(time.time() * 1000)
+        )
+        self._progresses.append(progress)
+
+    def guard_ok_abs(self, abs_progress: MediaProgress) -> bool:
+        """Check, if we may update against an abs media progress.
+
+        The abs media progress has a property last_update_ms, which also reflects non
+        mass external updates. Here, we compare this property against a potential
+        stored one.
+        """
+        item_id = abs_progress.library_item_id
+        episode_id = abs_progress.episode_id
+        stored_progress = self._get_progress(item_id=item_id, episode_id=episode_id)
+        if stored_progress is None:
+            return True
+        return bool(
+            abs_progress.last_update - stored_progress.last_update_ms
+            >= self._min_time_between_updates_ms
+        )
+
+    def guard_ok_mass(self, item_id: str, episode_id: str | None = None) -> bool:
+        """Check, if we may update against a mass internal item.
+
+        Here, we use the current time and compare it against the stored time.
+        """
+        stored_progress = self._get_progress(item_id=item_id, episode_id=episode_id)
+        if stored_progress is None:
+            return True
+        return (
+            int(time.time() * 1000) - stored_progress.last_update_ms
+            >= self._min_time_between_updates_ms
+        )
index c7baa3c2f57caf97dbeb46ae84ca453f2388bbb3..444e59546d7ac90a0788eb4b66b67fca1029366d 100644 (file)
@@ -7,7 +7,7 @@
     "@fmunkes"
   ],
   "requirements": [
-    "aioaudiobookshelf==0.1.1"
+    "aioaudiobookshelf==0.1.2"
   ],
   "documentation": "https://music-assistant.io/music-providers/audiobookshelf",
   "multi_instance": true
index c028d518b193ae4569e72009005cd04b81f7318a..c017b3914f0748f054ceb93bdff506ca0accd620 100644 (file)
@@ -73,7 +73,7 @@ def parse_podcast(
         mass_podcast.metadata.genres = set(abs_podcast.media.metadata.genres)
     mass_podcast.metadata.release_date = abs_podcast.media.metadata.release_date
 
-    if isinstance(abs_podcast, AbsLibraryItemExpandedPodcast):
+    if isinstance(abs_podcast, AbsLibraryItemExpandedPodcast | AbsLibraryItemPodcast):
         mass_podcast.total_episodes = len(abs_podcast.media.episodes)
     elif isinstance(abs_podcast, AbsLibraryItemMinifiedPodcast):
         mass_podcast.total_episodes = abs_podcast.media.num_episodes
index f0c2f69d53ec6d29febde680023a064727c47c60..e5d7eb180685a81424da616d09e0ccf14fb151d6 100644 (file)
@@ -1,7 +1,7 @@
 # WARNING: this file is autogenerated!
 
 Brotli>=1.0.9
-aioaudiobookshelf==0.1.1
+aioaudiobookshelf==0.1.2
 aiodns>=3.2.0
 aiofiles==24.1.0
 aiohttp==3.11.12