MediaItemType,
Playlist,
ProviderMapping,
+ RecommendationFolder,
SearchResults,
Track,
UniqueList,
CONF_PLEX_LIKE_RATING = "plex_like_rating"
CONF_PLEX_FAVORITE_THRESHOLD = "plex_favorite_threshold"
CONF_PLEX_UNLIKE_RATING = "plex_unlike_rating"
+CONF_HUB_ITEMS_LIMIT = "hub_items_limit"
FAKE_ARTIST_PREFIX = "_fake://"
ProviderFeature.ARTIST_ALBUMS,
ProviderFeature.ARTIST_TOPTRACKS,
ProviderFeature.SIMILAR_TRACKS,
+ ProviderFeature.RECOMMENDATIONS,
}
)
)
+ # Recommendation settings (advanced)
+ entries.append(
+ ConfigEntry(
+ key=CONF_HUB_ITEMS_LIMIT,
+ type=ConfigEntryType.INTEGER,
+ label="Items per hub",
+ description="Maximum number of items to load from each hub (default: 10)",
+ default_value=10,
+ category="advanced",
+ range=(1, 100),
+ )
+ )
+
# return all config entries
return tuple(entries)
self.logger.warning("Error getting similar tracks for %s: %s", prov_track_id, err)
return []
+ @use_cache(3600 * 3, cache_checksum="v2") # Cache for 3 hours
+ async def recommendations(self) -> list[RecommendationFolder]:
+ """Get recommendations from Plex hubs."""
+ try:
+ # Get the configured limit for items per hub
+ limit_value = self.config.get_value(CONF_HUB_ITEMS_LIMIT)
+ limit = int(limit_value) if isinstance(limit_value, (int, float, str)) else 10
+
+ # Fetch hubs from the music library section with count parameter
+ # The section's hubs() method uses /hubs/sections/{key}?includeStations=1
+ # We need to add the count parameter manually to limit items per hub
+ key = f"/hubs/sections/{self._plex_library.key}?includeStations=1&count={limit}"
+ hubs = await self._run_async(self._plex_library.fetchItems, key)
+
+ if not hubs:
+ self.logger.debug("No hubs available from Plex")
+ return []
+
+ self.logger.debug(
+ "Fetching %d hubs (limit: %d items per hub)",
+ len(hubs),
+ limit,
+ )
+
+ folders = []
+ for hub in hubs:
+ # Create a recommendation folder for each hub
+ folder = RecommendationFolder(
+ name=hub.title,
+ item_id=f"{self.instance_id}_{hub.hubIdentifier}",
+ provider=self.lookup_key,
+ icon="mdi-music",
+ )
+
+ # Parse each item based on its type (limit to configured max)
+ # Use _partialItems to respect the count limit from the hubs() call
+ # rather than hub.items() which fetches ALL items if more is True
+ # _partialItems is a cached property that's already loaded, so no need for async
+ hub_items = hub._partialItems
+ self.logger.debug(
+ "Processing hub '%s' (%s) with %d partial items",
+ hub.title,
+ hub.hubIdentifier,
+ len(hub_items),
+ )
+ for item in hub_items:
+ try:
+ # Skip items without type attribute
+ if not hasattr(item, "type"):
+ self.logger.debug(
+ "Skipping item in hub '%s': no type attribute",
+ hub.title,
+ )
+ continue
+
+ # Parse item based on its type
+ if item.type == "track":
+ folder.items.append(await self._parse_track(item))
+ elif item.type == "album":
+ folder.items.append(await self._parse_album(item))
+ elif item.type == "artist":
+ folder.items.append(await self._parse_artist(item))
+ elif item.type == "playlist":
+ folder.items.append(await self._parse_playlist(item))
+ # Try to parse other types generically
+ elif parsed_item := await self._parse(item):
+ folder.items.append(parsed_item) # type: ignore[arg-type]
+ else:
+ self.logger.debug(
+ "Skipping unsupported item type '%s' in hub '%s'",
+ item.type,
+ hub.title,
+ )
+ except Exception as err:
+ self.logger.debug(
+ "Failed to parse item (type: %s) in hub '%s': %s",
+ getattr(item, "type", "unknown"),
+ hub.title,
+ str(err),
+ )
+ continue
+
+ # Only add folder if it has items
+ if folder.items:
+ folders.append(folder)
+ self.logger.debug(
+ "Added hub '%s' (%s) with %d items",
+ hub.title,
+ hub.hubIdentifier,
+ len(folder.items),
+ )
+ else:
+ self.logger.debug(
+ "Skipping hub '%s' (%s): no items after parsing",
+ hub.title,
+ hub.hubIdentifier,
+ )
+
+ self.logger.debug("Retrieved %d recommendation folders from Plex", len(folders))
+ return folders
+
+ except Exception as err:
+ self.logger.warning("Error getting recommendations from Plex: %s", err)
+ return []
+
async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails:
"""Get streamdetails for a track."""
plex_track = await self._get_data(item_id, PlexTrack)