From 5307b3abbcfa62b397edfca0a864260cde3ca3da Mon Sep 17 00:00:00 2001 From: Eric Munson Date: Wed, 2 Jul 2025 11:48:24 -0400 Subject: [PATCH] Parallelize Recommendation creation in Subsonic provider (#2254) * Add Open Subsonic Cache category We will be using this initially for podcast channels as they are slow to look up for larger podcasts. This category, and caching in general should be used across the provider (at a later date). Signed-off-by: Eric B Munson * Feat: Subsonic: Parallelize Recommendation creation When all the recommendations are enabled the main page is rather slow to load. Split each kind into a helper function and invoke each helper that is enabled in parallel. The newest podcast episode recommendation was the worst offender due to the number of API calls for getting podcast channels. This was broken into an initial pass over episodes to collect a set of channel ids, then a parallel retrieval of the unique channel ids. Signed-off-by: Eric B Munson --- music_assistant/constants.py | 1 + .../providers/opensubsonic/sonic_provider.py | 166 +++++++++++------- 2 files changed, 104 insertions(+), 63 deletions(-) diff --git a/music_assistant/constants.py b/music_assistant/constants.py index d08dcc7f..f1fdf1e5 100644 --- a/music_assistant/constants.py +++ b/music_assistant/constants.py @@ -710,6 +710,7 @@ CACHE_CATEGORY_MEDIA_INFO: Final[int] = 8 CACHE_CATEGORY_LIBRARY_ITEMS: Final[int] = 9 CACHE_CATEGORY_PLAYERS: Final[int] = 10 CACHE_CATEGORY_RECOMMENDATIONS: Final[int] = 11 +CACHE_CATEGORY_OPEN_SUBSONIC: Final[int] = 12 # CACHE base keys CACHE_KEY_PLAYER_POWER: Final[str] = "player_power" diff --git a/music_assistant/providers/opensubsonic/sonic_provider.py b/music_assistant/providers/opensubsonic/sonic_provider.py index 5a75e532..95d309db 100644 --- a/music_assistant/providers/opensubsonic/sonic_provider.py +++ b/music_assistant/providers/opensubsonic/sonic_provider.py @@ -3,6 +3,7 @@ from __future__ import annotations import asyncio +from asyncio import TaskGroup from typing import TYPE_CHECKING, Any, ParamSpec, TypeVar from libopensonic.connection import Connection as SonicConnection @@ -37,6 +38,7 @@ from music_assistant_models.media_items import ( from music_assistant_models.streamdetails import StreamDetails from music_assistant.constants import ( + CACHE_CATEGORY_OPEN_SUBSONIC, CONF_PASSWORD, CONF_PATH, CONF_PORT, @@ -66,7 +68,6 @@ if TYPE_CHECKING: from libopensonic.media import Child as SonicSong from libopensonic.media import OpenSubsonicExtension from libopensonic.media import Playlist as SonicPlaylist - from libopensonic.media import PodcastChannel as SonicChannel from libopensonic.media import PodcastEpisode as SonicEpisode @@ -95,6 +96,7 @@ class OpenSonicProvider(MusicProvider): _show_new: bool = True _show_played: bool = True _reco_limit: int = 10 + _cache_base_key: str = "" async def handle_async_init(self) -> None: """Set up the music provider and test the connection.""" @@ -137,6 +139,7 @@ class OpenSonicProvider(MusicProvider): self._show_new = bool(self.config.get_value(CONF_NEW_ALBUMS)) self._show_played = bool(self.config.get_value(CONF_PLAYED_ALBUMS)) self._reco_limit = int(str(self.config.get_value(CONF_RECO_SIZE))) + self._cache_base_key = f"{self.instance_id}/" @property def supported_features(self) -> set[ProviderFeature]: @@ -776,6 +779,83 @@ class OpenSonicProvider(MusicProvider): self.logger.debug("Done streaming %s", streamdetails.item_id) + async def _get_podcast_channel_async(self, chan_id: str, base_key: str) -> None: + chan = await self._run_async(self.conn.get_podcasts, inc_episodes=True, pid=chan_id) + if not chan: + return + await self.mass.cache.set( + key=chan_id, + data=chan[0], + base_key=base_key, + expiration=600, + category=CACHE_CATEGORY_OPEN_SUBSONIC, + ) + + async def _podcast_recommendations(self) -> RecommendationFolder: + podcasts: RecommendationFolder = RecommendationFolder( + item_id="subsonic_newest_podcasts", + provider=self.domain, + name="Newest Podcast Episodes", + ) + sonic_episodes = await self._run_async( + self.conn.get_newest_podcasts, count=self._reco_limit + ) + chan_ids = set() + chan_base_key = f"{self._cache_base_key}/podcast_channels/" + async with TaskGroup() as tg: + for ep in sonic_episodes: + if ep.channel_id in chan_ids: + continue + tg.create_task(self._get_podcast_channel_async(ep.channel_id, chan_base_key)) + chan_ids.add(ep.channel_id) + + for ep in sonic_episodes: + chan = await self.mass.cache.get( + key=ep.channel_id, base_key=chan_base_key, category=CACHE_CATEGORY_OPEN_SUBSONIC + ) + if not chan: + continue + podcasts.items.append(parse_epsiode(self.instance_id, ep, chan)) + return podcasts + + async def _favorites_recommendation(self) -> RecommendationFolder: + faves: RecommendationFolder = RecommendationFolder( + item_id="subsonic_starred_albums", provider=self.domain, name="Starred Items" + ) + starred = await self._run_async(self.conn.get_starred2) + if starred.album: + for sonic_album in starred.album[: self._reco_limit]: + faves.items.append(parse_album(self.logger, self.instance_id, sonic_album)) + if starred.artist: + for sonic_artist in starred.artist[: self._reco_limit]: + faves.items.append(parse_artist(self.instance_id, sonic_artist)) + if starred.song: + for sonic_song in starred.song[: self._reco_limit]: + faves.items.append(parse_track(self.logger, self.instance_id, sonic_song)) + return faves + + async def _new_recommendations(self) -> RecommendationFolder: + new_stuff: RecommendationFolder = RecommendationFolder( + item_id="subsonic_new_albums", provider=self.domain, name="New Albums" + ) + new_albums = await self._run_async( + self.conn.get_album_list2, ltype="newest", size=self._reco_limit + ) + for sonic_album in new_albums: + new_stuff.items.append(parse_album(self.logger, self.instance_id, sonic_album)) + return new_stuff + + async def _played_recommendations(self) -> RecommendationFolder: + recent: RecommendationFolder = RecommendationFolder( + item_id="subsonic_most_played", provider=self.domain, name="Most Played Albums" + ) + albums = await self._run_async( + self.conn.get_album_list2, ltype="frequent", size=self._reco_limit + ) + for sonic_album in albums: + recent.items.append(parse_album(self.logger, self.instance_id, sonic_album)) + return recent + async def recommendations(self) -> list[RecommendationFolder]: """Provide recommendations. @@ -784,67 +864,27 @@ class OpenSonicProvider(MusicProvider): """ recos: list[RecommendationFolder] = [] - if self._enable_podcasts: - podcasts: RecommendationFolder = RecommendationFolder( - item_id="subsonic_newest_podcasts", - provider=self.domain, - name="Newest Podcast Episodes", - ) - sonic_episodes = await self._run_async( - self.conn.get_newest_podcasts, count=self._reco_limit - ) - sonic_channel: SonicChannel | None = None - for ep in sonic_episodes: - if sonic_channel is None or sonic_channel.id != ep.channel_id: - channels = await self._run_async( - self.conn.get_podcasts, inc_episodes=True, pid=ep.channel_id - ) - if not channels: - self.logger.warning("Can't find podcast channel for id %s", ep.channel_id) - continue - sonic_channel = channels[0] - podcasts.items.append(parse_epsiode(self.instance_id, ep, sonic_channel)) - recos.append(podcasts) - - if self._show_faves: - faves: RecommendationFolder = RecommendationFolder( - item_id="subsonic_starred_albums", provider=self.domain, name="Starred Items" - ) - starred = await self._run_async(self.conn.get_starred2) - if starred.album: - for sonic_album in starred.album[: self._reco_limit]: - faves.items.append(parse_album(self.logger, self.instance_id, sonic_album)) - if starred.artist: - for sonic_artist in starred.artist[: self._reco_limit]: - faves.items.append(parse_artist(self.instance_id, sonic_artist)) - if starred.song: - for sonic_song in starred.song[: self._reco_limit]: - faves.items.append(parse_track(self.logger, self.instance_id, sonic_song)) - - recos.append(faves) - - if self._show_new: - new_stuff: RecommendationFolder = RecommendationFolder( - item_id="subsonic_new_albums", provider=self.domain, name="New Albums" - ) - new_albums = await self._run_async( - self.conn.get_album_list2, ltype="newest", size=self._reco_limit - ) - for sonic_album in new_albums: - new_stuff.items.append(parse_album(self.logger, self.instance_id, sonic_album)) - - recos.append(new_stuff) - - if self._show_played: - recent: RecommendationFolder = RecommendationFolder( - item_id="subsonic_most_played", provider=self.domain, name="Most Played Albums" - ) - albums = await self._run_async( - self.conn.get_album_list2, ltype="frequent", size=self._reco_limit - ) - for sonic_album in albums: - recent.items.append(parse_album(self.logger, self.instance_id, sonic_album)) - - recos.append(recent) + podcasts = None + faves = None + new_stuff = None + played = None + async with TaskGroup() as grp: + if self._enable_podcasts: + podcasts = grp.create_task(self._podcast_recommendations()) + if self._show_faves: + faves = grp.create_task(self._favorites_recommendation()) + if self._show_new: + new_stuff = grp.create_task(self._new_recommendations()) + if self._show_played: + played = grp.create_task(self._played_recommendations()) + + if podcasts: + recos.append(podcasts.result()) + if faves: + recos.append(faves.result()) + if new_stuff: + recos.append(new_stuff.result()) + if played: + recos.append(played.result()) return recos -- 2.34.1