from __future__ import annotations
+import itertools
from collections.abc import AsyncGenerator, Sequence
from typing import TYPE_CHECKING
LibraryItemExpandedPodcast as AbsLibraryItemExpandedPodcast,
)
from aioaudiobookshelf.exceptions import LoginError as AbsLoginError
+from aioaudiobookshelf.schema.author import AuthorExpanded
from aioaudiobookshelf.schema.calls_authors import (
AuthorWithItemsAndSeries as AbsAuthorWithItemsAndSeries,
)
LibraryItemMinifiedPodcast,
)
from aioaudiobookshelf.schema.library import LibraryMediaType as AbsLibraryMediaType
+from aioaudiobookshelf.schema.shelf import (
+ SeriesShelf,
+ ShelfAuthors,
+ ShelfBook,
+ ShelfEpisode,
+ ShelfLibraryItemMinified,
+ ShelfPodcast,
+ ShelfSeries,
+)
+from aioaudiobookshelf.schema.shelf import (
+ ShelfId as AbsShelfId,
+)
+from aioaudiobookshelf.schema.shelf import ShelfType as AbsShelfType
from music_assistant_models.config_entries import ConfigEntry, ConfigValueType, ProviderConfig
from music_assistant_models.enums import (
ConfigEntryType,
MediaItemType,
MediaItemTypeOrItemMapping,
PodcastEpisode,
+ UniqueList,
)
+from music_assistant_models.media_items.media_item import RecommendationFolder
from music_assistant_models.streamdetails import StreamDetails
from music_assistant.models.music_provider import MusicProvider
)
from .constants import (
- ABSBROWSEITEMSTOPATH,
+ ABS_BROWSE_ITEMS_TO_PATH,
+ ABS_SHELF_ID_ICONS,
CACHE_CATEGORY_LIBRARIES,
CACHE_KEY_LIBRARIES,
CONF_HIDE_EMPTY_PODCASTS,
ProviderFeature.LIBRARY_PODCASTS,
ProviderFeature.LIBRARY_AUDIOBOOKS,
ProviderFeature.BROWSE,
+ ProviderFeature.RECOMMENDATIONS,
}
async def handle_async_init(self) -> None:
return False, 0
+ async def recommendations(self) -> list[RecommendationFolder]:
+ """Get recommendations."""
+ # We have to avoid "flooding" the home page, which becomes especially troublesome if users
+ # have multiple libraries. Instead we collect per ShelfId, and make sure, that we always get
+ # roughly the same amount of items per row, no matter the amount of libraries
+ # List of list (one list per lib) here, such that we can pick the items per lib later.
+ items_by_shelf_id: dict[AbsShelfId, list[list[MediaItemType]]] = {}
+
+ all_libraries = {**self.libraries.audiobooks, **self.libraries.podcasts}
+ max_items_per_row = 20
+ limit_items_per_lib = max_items_per_row // len(all_libraries)
+ limit_items_per_lib = 1 if limit_items_per_lib == 0 else limit_items_per_lib
+
+ for library_id in all_libraries:
+ shelves = await self._client.get_library_personalized_view(
+ library_id=library_id, limit=limit_items_per_lib
+ )
+ await self._recommendations_iter_shelves(shelves, library_id, items_by_shelf_id)
+
+ folders: list[RecommendationFolder] = []
+ for shelf_id, item_lists in items_by_shelf_id.items():
+ # we have something like [[A, B], [C, D, E], [F]]
+ # and want [A, C, F, B, D, E]
+ recommendation_items = [
+ x
+ for x in itertools.chain.from_iterable(itertools.zip_longest(*item_lists))
+ if x is not None
+ ][:max_items_per_row]
+
+ # shelf ids follow pattern:
+ # recently-added
+ # newest-episodes
+ # etc
+ name = f"{shelf_id.capitalize().replace('-', ' ')}"
+ folders.append(
+ RecommendationFolder(
+ item_id=f"{shelf_id}",
+ name=name,
+ icon=ABS_SHELF_ID_ICONS.get(shelf_id),
+ # translation_key=shelf.id_,
+ items=UniqueList(recommendation_items),
+ provider=self.lookup_key,
+ )
+ )
+
+ return folders
+
+ async def _recommendations_iter_shelves(
+ self,
+ shelves: list[ShelfBook | ShelfPodcast | ShelfAuthors | ShelfEpisode | ShelfSeries],
+ library_id: str,
+ items_by_shelf_id: dict[AbsShelfId, list[list[MediaItemType]]],
+ ) -> None:
+ for shelf in shelves:
+ media_type: MediaType
+ match shelf.type_:
+ case AbsShelfType.PODCAST:
+ media_type = MediaType.PODCAST
+ case AbsShelfType.EPISODE:
+ media_type = MediaType.PODCAST_EPISODE
+ case AbsShelfType.BOOK:
+ media_type = MediaType.AUDIOBOOK
+ case AbsShelfType.SERIES | AbsShelfType.AUTHORS:
+ media_type = MediaType.FOLDER
+ case _:
+ # this would be authors, currently
+ continue
+
+ items: list[MediaItemType] = []
+ # Recently added is the _only_ case, where we get a full podcast
+ # We have a podcast object with only the episodes matching the
+ # shelf.id_ otherwise.
+ match shelf.id_:
+ case (
+ AbsShelfId.RECENTLY_ADDED
+ | AbsShelfId.LISTEN_AGAIN
+ | AbsShelfId.DISCOVER
+ | AbsShelfId.NEWEST_EPISODES
+ | AbsShelfId.CONTINUE_LISTENING
+ ):
+ for entity in shelf.entities:
+ assert isinstance(entity, ShelfLibraryItemMinified)
+ item: MediaItemType | None = None
+ if media_type in [MediaType.PODCAST, MediaType.AUDIOBOOK]:
+ item = await self.mass.music.get_library_item_by_prov_id(
+ media_type=media_type,
+ provider_instance_id_or_domain=self.instance_id,
+ item_id=entity.id_,
+ )
+ elif media_type == MediaType.PODCAST_EPISODE:
+ podcast_id = entity.id_
+ if entity.recent_episode is None:
+ continue
+ # we only have a PodcastEpisode here, with limited information
+ item = parse_podcast_episode(
+ episode=entity.recent_episode,
+ prov_podcast_id=podcast_id,
+ lookup_key=self.lookup_key,
+ domain=self.domain,
+ instance_id=self.instance_id,
+ token=self._client.token,
+ base_url=str(self.config.get_value(CONF_URL)).rstrip("/"),
+ )
+ if item is not None:
+ items.append(item)
+ case AbsShelfId.RECENT_SERIES | AbsShelfId.CONTINUE_SERIES:
+ # we jump into a browse folder here, set path up as if browse function
+ # used.
+ for entity in shelf.entities:
+ assert isinstance(entity, SeriesShelf)
+ if len(entity.books) == 0:
+ continue
+ path = (
+ f"{self.instance_id}://"
+ f"{AbsBrowsePaths.LIBRARIES_BOOK} {library_id}/"
+ f"{AbsBrowsePaths.SERIES}/{entity.id_}"
+ )
+ items.append(
+ BrowseFolder(
+ item_id=entity.id_,
+ name=entity.name,
+ provider=self.lookup_key,
+ path=path,
+ )
+ )
+ case AbsShelfId.NEWEST_AUTHORS:
+ # same as for series, use a folder
+ for entity in shelf.entities:
+ assert isinstance(entity, AuthorExpanded)
+ if entity.num_books == 0:
+ continue
+ path = (
+ f"{self.instance_id}://"
+ f"{AbsBrowsePaths.LIBRARIES_BOOK} {library_id}/"
+ f"{AbsBrowsePaths.AUTHORS}/{entity.id_}"
+ )
+ items.append(
+ BrowseFolder(
+ item_id=entity.id_,
+ name=entity.name,
+ provider=self.lookup_key,
+ path=path,
+ )
+ )
+ if not items:
+ continue
+
+ # add collected items
+ assert isinstance(shelf.id_, AbsShelfId)
+ items_collected = items_by_shelf_id.get(shelf.id_, [])
+ items_collected.append(items)
+ items_by_shelf_id[shelf.id_] = items_collected
+
async def on_played(
self,
media_type: MediaType,
def _browse_lib_audiobooks(self, current_path: str) -> Sequence[MediaItemTypeOrItemMapping]:
items = []
for item_name in AbsBrowseItemsBook:
- path = current_path + "/" + ABSBROWSEITEMSTOPATH[item_name]
+ path = current_path + "/" + ABS_BROWSE_ITEMS_TO_PATH[item_name]
items.append(
BrowseFolder(
item_id=item_name.lower(),
LibraryItemPodcast as AbsLibraryItemPodcast,
)
from aioaudiobookshelf.schema.media_progress import MediaProgress as AbsMediaProgress
-from aioaudiobookshelf.schema.podcast import PodcastEpisodeExpanded as AbsPodcastEpisodeExpanded
+from aioaudiobookshelf.schema.podcast import PodcastEpisode as AbsPodcastEpisode
+from aioaudiobookshelf.schema.podcast import (
+ PodcastEpisodeExpanded as AbsPodcastEpisodeExpanded,
+)
from music_assistant_models.enums import ContentType, ImageType, MediaType
from music_assistant_models.media_items import Audiobook as MassAudiobook
from music_assistant_models.media_items import (
def parse_podcast_episode(
*,
- episode: AbsPodcastEpisodeExpanded,
+ episode: AbsPodcastEpisode | AbsPodcastEpisodeExpanded,
prov_podcast_id: str,
fallback_episode_cnt: int | None = None,
lookup_key: str,
For an episode the id is set to f"{podcast_id} {episode_id}".
ABS ids have no spaces, so we can split at a space to retrieve both
in other functions.
+
+ NOTE: We should always use a PodcastEpisodeExpanded when possible.
+ A PodcastEpisode has only limited information, and is currently only used
+ within the recommendations.
"""
- url = f"{base_url}{episode.audio_track.content_url}"
episode_id = f"{prov_podcast_id} {episode.id_}"
+ if isinstance(episode, AbsPodcastEpisodeExpanded):
+ url = f"{base_url}{episode.audio_track.content_url}"
+ duration = int(episode.duration)
+ provider_mappings = {
+ ProviderMapping(
+ item_id=episode_id,
+ provider_domain=domain,
+ provider_instance=instance_id,
+ audio_format=AudioFormat(
+ content_type=ContentType.UNKNOWN,
+ ),
+ url=url,
+ )
+ }
+ else:
+ # PodcastEpisode
+ duration = 0 # mass default
+ provider_mappings = {
+ ProviderMapping(
+ item_id=episode_id,
+ provider_domain=domain,
+ provider_instance=instance_id,
+ )
+ }
+
release_date: datetime | None = None
if episode.published_at is not None:
position = -episode.published_at
item_id=episode_id,
provider=lookup_key,
name=episode.title,
- duration=int(episode.duration),
+ duration=duration,
position=position,
podcast=ItemMapping(
item_id=prov_podcast_id,
name=episode.title,
media_type=MediaType.PODCAST,
),
- provider_mappings={
- ProviderMapping(
- item_id=episode_id,
- provider_domain=domain,
- provider_instance=instance_id,
- audio_format=AudioFormat(
- content_type=ContentType.UNKNOWN,
- ),
- url=url,
- )
- },
+ provider_mappings=provider_mappings,
)
mass_episode.metadata.release_date = release_date