LibraryItemExpanded,
LibraryItemExpandedBook,
LibraryItemExpandedPodcast,
+ LibraryItemMinifiedPodcast,
)
from aioaudiobookshelf.schema.library import LibraryMediaType as AbsLibraryMediaType
from music_assistant_models.config_entries import ConfigEntry, ConfigValueType, ProviderConfig
)
from music_assistant_models.errors import LoginFailed, MediaNotFoundError
from music_assistant_models.media_items import (
+ Audiobook,
AudioFormat,
BrowseFolder,
MediaItemType,
MediaItemTypeOrItemMapping,
+ PodcastEpisode,
)
from music_assistant_models.streamdetails import StreamDetails
AbsBrowseItemsPodcast,
AbsBrowsePaths,
)
-from .helpers import LibrariesHelper, LibraryHelper
+from .helpers import LibrariesHelper, LibraryHelper, ProgressGuard
if TYPE_CHECKING:
from aioaudiobookshelf.schema.events_socket import LibraryItemRemoved
from aioaudiobookshelf.schema.media_progress import MediaProgress
- from music_assistant_models.media_items import Audiobook, Podcast, PodcastEpisode
+ from aioaudiobookshelf.schema.user import User
+ from music_assistant_models.media_items import Podcast
from music_assistant_models.provider import ProviderManifest
from music_assistant.mass import MusicAssistant
on_items_updated=self._socket_abs_item_changed,
)
+ self._client_socket.set_user_callbacks(
+ on_user_item_progress_updated=self._socket_abs_user_item_progress_updated,
+ )
+
+ # progress guard
+ self.progress_guard = ProgressGuard()
+
+ # update playlog information if just started
+ user = await self._client.get_my_user()
+ await self._set_playlog_from_user(user)
+
async def unload(self, is_removed: bool = False) -> None:
"""
Handle unload/close of the provider.
await super().sync_library(media_type=media_type)
await self._cache_set_helper_libraries()
+ # update playlog
+ user = await self._client.get_my_user()
+ await self._set_playlog_from_user(user)
+
async def get_library_podcasts(self) -> AsyncGenerator[Podcast, None]:
"""Retrieve library/subscribed podcasts from the provider.
- Minified podcast information is enough, but we take the full information
- and rely on cache afterwards.
+ Minified podcast information is enough.
"""
for pod_lib_id in self.libraries.podcasts:
async for response in self._client.get_library_items(library_id=pod_lib_id):
podcast_ids = [x.id_ for x in response.results]
# store uuids
self.libraries.podcasts[pod_lib_id].item_ids.update(podcast_ids)
- podcasts_expanded = await self._client.get_library_item_batch_podcast(
- item_ids=podcast_ids
- )
- for podcast_expanded in podcasts_expanded:
+ for podcast_minified in response.results:
+ assert isinstance(podcast_minified, LibraryItemMinifiedPodcast)
mass_podcast = parse_podcast(
- abs_podcast=podcast_expanded,
+ abs_podcast=podcast_minified,
lookup_key=self.lookup_key,
domain=self.domain,
instance_id=self.instance_id,
return abs_podcast
async def get_podcast(self, prov_podcast_id: str) -> Podcast:
- """Get single podcast.
-
- Basis information,
- abs_podcast = await self._client.get_library_item_podcast(
- podcast_id=prov_podcast_id, expanded=False
- ),
- would be sufficient, but we rely on cache.
- """
+ """Get single podcast."""
abs_podcast = await self._get_abs_expanded_podcast(prov_podcast_id=prov_podcast_id)
return parse_podcast(
abs_podcast=abs_podcast,
episode_cnt += 1
return episode_list
- async def get_podcast_episode(self, prov_episode_id: str) -> PodcastEpisode:
+ async def get_podcast_episode(
+ self, prov_episode_id: str, add_progress: bool = True
+ ) -> PodcastEpisode:
"""Get single podcast episode."""
prov_podcast_id, e_id = prov_episode_id.split(" ")
abs_podcast = await self._get_abs_expanded_podcast(prov_podcast_id=prov_podcast_id)
episode_cnt = 1
for abs_episode in abs_podcast.media.episodes:
if abs_episode.id_ == e_id:
- progress = await self._client.get_my_media_progress(
- item_id=prov_podcast_id, episode_id=abs_episode.id_
- )
+ progress = None
+ if add_progress:
+ progress = await self._client.get_my_media_progress(
+ item_id=prov_podcast_id, episode_id=abs_episode.id_
+ )
return parse_podcast_episode(
episode=abs_episode,
prov_podcast_id=prov_podcast_id,
"""
if media_type == MediaType.PODCAST_EPISODE:
abs_podcast_id, abs_episode_id = prov_item_id.split(" ")
- mass_podcast_episode = await self.get_podcast_episode(prov_item_id)
- duration = mass_podcast_episode.duration
+
+ # guard, see progress guard class docstrings for explanation
+ if not self.progress_guard.guard_ok_mass(
+ item_id=abs_podcast_id, episode_id=abs_episode_id
+ ):
+ return
+ self.progress_guard.add_progress(item_id=abs_podcast_id, episode_id=abs_episode_id)
+
+ if media_item is None or not isinstance(media_item, PodcastEpisode):
+ return
+
+ if position == 0 and not fully_played:
+ # marked unplayed
+ mp = await self._client.get_my_media_progress(
+ item_id=abs_podcast_id, episode_id=abs_episode_id
+ )
+ if mp is not None:
+ await self._client.remove_my_media_progress(media_progress_id=mp.id_)
+ self.logger.debug(f"Removed media progress of {media_type.value}.")
+ return
+
+ duration = media_item.duration
self.logger.debug(
- f"Updating media progress of {media_type.value}, title {mass_podcast_episode.name}."
+ f"Updating media progress of {media_type.value}, title {media_item.name}."
)
await self._client.update_my_media_progress(
item_id=abs_podcast_id,
progress_seconds=position,
is_finished=fully_played,
)
+
if media_type == MediaType.AUDIOBOOK:
- mass_audiobook = await self.get_audiobook(prov_item_id)
- duration = mass_audiobook.duration
- self.logger.debug(f"Updating {media_type.value} named {mass_audiobook.name} progress")
+ # guard, see progress guard class docstrings for explanation
+ if not self.progress_guard.guard_ok_mass(item_id=prov_item_id):
+ return
+ self.progress_guard.add_progress(item_id=prov_item_id)
+
+ if media_item is None or not isinstance(media_item, Audiobook):
+ return
+
+ if position == 0 and not fully_played:
+ # marked unplayed
+ mp = await self._client.get_my_media_progress(item_id=prov_item_id)
+ if mp is not None:
+ await self._client.remove_my_media_progress(media_progress_id=mp.id_)
+ self.logger.debug(f"Removed media progress of {media_type.value}.")
+ return
+
+ duration = media_item.duration
+ self.logger.debug(f"Updating {media_type.value} named {media_item.name} progress")
await self._client.update_my_media_progress(
item_id=prov_item_id,
duration_seconds=duration,
await self._cache_set_helper_libraries()
+ async def _socket_abs_user_item_progress_updated(
+ self, id_: str, progress: MediaProgress
+ ) -> None:
+ """To update continue listening.
+
+ ABS reports every 15s and immediately on play state change.
+ This callback is called per item if a progress is changed:
+ - a change in position
+ - the item is finished
+ But it is _not_called, if a progress is reset/ discarded.
+ """
+ # guard, see progress guard class docstrings for explanation
+ if not self.progress_guard.guard_ok_abs(abs_progress=progress):
+ return
+
+ known_ids = self._get_all_known_item_ids()
+ if progress.library_item_id not in known_ids:
+ return
+
+ self.logger.debug(f"Updated progress of item {progress.library_item_id} via socket.")
+
+ if progress.episode_id is None:
+ await self._update_playlog_book(progress)
+ return
+ await self._update_playlog_episode(progress)
+
+ def _get_all_known_item_ids(self) -> set[str]:
+ known_ids = set()
+ for lib in self.libraries.podcasts.values():
+ known_ids.update(lib.item_ids)
+ for lib in self.libraries.audiobooks.values():
+ known_ids.update(lib.item_ids)
+
+ return known_ids
+
+ async def _set_playlog_from_user(self, user: User) -> None:
+ """Update on user callback.
+
+ User holds also all media progresses specific to that user.
+
+ The function 'guard_ok_abs' uses the timestamp of the last update in abs, thus after an
+ initial progress update, an unchanged update will not trigger a (useless) playlog update.
+
+ We do not sync removed progresses for the sake of simplicity.
+ """
+ await self._set_playlog_from_user_sync(user.media_progress)
+
+ async def _set_playlog_from_user_sync(self, progresses: list[MediaProgress]) -> None:
+ # for debugging
+ __updated_items = 0
+
+ known_ids = self._get_all_known_item_ids()
+
+ for progress in progresses:
+ # Guard. Also makes sure, that we don't write to db again if no state change happened.
+ # This is achieved by adding a Helper Progress in the update playlog functions, which
+ # then has the most recent timestamp. If a subsequent progress sent by abs has an older
+ # timestamp, we do not update again.
+ if not self.progress_guard.guard_ok_abs(progress):
+ continue
+ if not progress.current_time >= 30:
+ # same as mass default, only > 30s
+ continue
+ if progress.library_item_id not in known_ids:
+ continue
+ __updated_items += 1
+ if progress.episode_id is None:
+ await self._update_playlog_book(progress)
+ else:
+ await self._update_playlog_episode(progress)
+ self.logger.debug(f"Updated {__updated_items} from full playlog.")
+
+ async def _update_playlog_book(self, progress: MediaProgress) -> None:
+ # helper progress also ensures no useless progress updates,
+ # see comment above
+ self.progress_guard.add_progress(progress.library_item_id)
+ mass_audiobook = await self.mass.music.get_library_item_by_prov_id(
+ media_type=MediaType.AUDIOBOOK,
+ item_id=progress.library_item_id,
+ provider_instance_id_or_domain=self.instance_id,
+ )
+ if mass_audiobook is None:
+ return
+ await self.mass.music.mark_item_played(
+ mass_audiobook,
+ fully_played=progress.is_finished,
+ seconds_played=int(progress.current_time),
+ )
+
+ async def _update_playlog_episode(self, progress: MediaProgress) -> None:
+ # helper progress also ensures no useless progress updates,
+ # see comment above
+ self.progress_guard.add_progress(progress.library_item_id, progress.episode_id)
+ _episode_id = f"{progress.library_item_id} {progress.episode_id}"
+ try:
+ # need to obtain full podcast, and then search for episode
+ mass_episode = await self.get_podcast_episode(_episode_id, add_progress=False)
+ except MediaNotFoundError:
+ return
+ await self.mass.music.mark_item_played(
+ mass_episode,
+ fully_played=progress.is_finished,
+ seconds_played=int(progress.current_time),
+ )
+
async def _cache_set_helper_libraries(self) -> None:
await self.mass.cache.set(
key=CACHE_KEY_LIBRARIES,
"""Helpers for Audiobookshelf provider."""
+import time
from dataclasses import dataclass, field
+from aioaudiobookshelf.schema.media_progress import MediaProgress
from mashumaro.mixins.dict import DataClassDictMixin
audiobooks: dict[str, LibraryHelper] = field(default_factory=dict)
podcasts: dict[str, LibraryHelper] = field(default_factory=dict)
+
+
+@dataclass(kw_only=True)
+class _ProgressHelper:
+ id_: str # audiobook or podcast id
+ episode_id: str | None = None
+ last_update_ms: int # last update in ms epoch (same as last_update in abs)
+
+
+class ProgressGuard:
+ """Class used to avoid ping pong between abs and mass.
+
+ We continuously update the progress from mass to abs with the provider's on_played function.
+ We also register callbacks for progress reports from abs to mass. This is not only triggered
+ on external updates, but also on our own update. To avoid messages going back and forth, this
+ class is used.
+ """
+
+ def __init__(self) -> None:
+ """Init."""
+ self._progresses: list[_ProgressHelper] = []
+ self._max_progresses = 100
+ # 12s have to have passed before we accept an external progress update
+ # abs updates every 15 s
+ self._min_time_between_updates_ms = 12000
+
+ def _get_progress(self, item_id: str, episode_id: str | None = None) -> _ProgressHelper | None:
+ """Get a helper progress."""
+ for x in self._progresses:
+ if x.id_ == item_id and x.episode_id == episode_id:
+ return x
+ return None
+
+ def _remove_oldest(self) -> None:
+ """Remove oldest helper progress."""
+ progresses = sorted(self._progresses, key=lambda x: x.last_update_ms)
+ if len(progresses) > 0:
+ self._progresses.remove(progresses[0])
+
+ def remove_progress(self, item_id: str, episode_id: str | None = None) -> None:
+ """Remove a helper progress."""
+ progress = self._get_progress(item_id=item_id, episode_id=episode_id)
+ if progress is not None:
+ self._progresses.remove(progress)
+
+ def add_progress(self, item_id: str, episode_id: str | None = None) -> None:
+ """Store a timestamp for the last update of an audiobook or podcast episode, mass ids."""
+ if len(self._progresses) > self._max_progresses:
+ self._remove_oldest()
+ self.remove_progress(item_id=item_id, episode_id=episode_id)
+ progress = _ProgressHelper(
+ id_=item_id, episode_id=episode_id, last_update_ms=int(time.time() * 1000)
+ )
+ self._progresses.append(progress)
+
+ def guard_ok_abs(self, abs_progress: MediaProgress) -> bool:
+ """Check, if we may update against an abs media progress.
+
+ The abs media progress has a property last_update_ms, which also reflects non
+ mass external updates. Here, we compare this property against a potential
+ stored one.
+ """
+ item_id = abs_progress.library_item_id
+ episode_id = abs_progress.episode_id
+ stored_progress = self._get_progress(item_id=item_id, episode_id=episode_id)
+ if stored_progress is None:
+ return True
+ return bool(
+ abs_progress.last_update - stored_progress.last_update_ms
+ >= self._min_time_between_updates_ms
+ )
+
+ def guard_ok_mass(self, item_id: str, episode_id: str | None = None) -> bool:
+ """Check, if we may update against a mass internal item.
+
+ Here, we use the current time and compare it against the stored time.
+ """
+ stored_progress = self._get_progress(item_id=item_id, episode_id=episode_id)
+ if stored_progress is None:
+ return True
+ return (
+ int(time.time() * 1000) - stored_progress.last_update_ms
+ >= self._min_time_between_updates_ms
+ )