Parallelize Recommendation creation in Subsonic provider (#2254)
authorEric Munson <eric@munsonfam.org>
Wed, 2 Jul 2025 15:48:24 +0000 (11:48 -0400)
committerGitHub <noreply@github.com>
Wed, 2 Jul 2025 15:48:24 +0000 (11:48 -0400)
* Add Open Subsonic Cache category

We will be using this initially for podcast channels as they are slow to
look up for larger podcasts. This category, and caching in general
should be used across the provider (at a later date).

Signed-off-by: Eric B Munson <eric@munsonfam.org>
* Feat: Subsonic: Parallelize Recommendation creation

When all the recommendations are enabled the main page is rather slow to
load. Split each kind into a helper function and invoke each helper that
is enabled in parallel.

The newest podcast episode recommendation was the worst offender due to
the number of API calls for getting podcast channels. This was broken
into an initial pass over episodes to collect a set of channel ids, then
a parallel retrieval of the unique channel ids.

Signed-off-by: Eric B Munson <eric@munsonfam.org>
music_assistant/constants.py
music_assistant/providers/opensubsonic/sonic_provider.py

index d08dcc7fc0e0c89c5e48526f9bcfe9180c1f3f48..f1fdf1e5ede747085b7a9e384bded3c2ffe9eccc 100644 (file)
@@ -710,6 +710,7 @@ CACHE_CATEGORY_MEDIA_INFO: Final[int] = 8
 CACHE_CATEGORY_LIBRARY_ITEMS: Final[int] = 9
 CACHE_CATEGORY_PLAYERS: Final[int] = 10
 CACHE_CATEGORY_RECOMMENDATIONS: Final[int] = 11
+CACHE_CATEGORY_OPEN_SUBSONIC: Final[int] = 12
 
 # CACHE base keys
 CACHE_KEY_PLAYER_POWER: Final[str] = "player_power"
index 5a75e532f445f16a9bf3251b5db183c3ea369daf..95d309db8fdc5150965ccdb9df0d1bc450277545 100644 (file)
@@ -3,6 +3,7 @@
 from __future__ import annotations
 
 import asyncio
+from asyncio import TaskGroup
 from typing import TYPE_CHECKING, Any, ParamSpec, TypeVar
 
 from libopensonic.connection import Connection as SonicConnection
@@ -37,6 +38,7 @@ from music_assistant_models.media_items import (
 from music_assistant_models.streamdetails import StreamDetails
 
 from music_assistant.constants import (
+    CACHE_CATEGORY_OPEN_SUBSONIC,
     CONF_PASSWORD,
     CONF_PATH,
     CONF_PORT,
@@ -66,7 +68,6 @@ if TYPE_CHECKING:
     from libopensonic.media import Child as SonicSong
     from libopensonic.media import OpenSubsonicExtension
     from libopensonic.media import Playlist as SonicPlaylist
-    from libopensonic.media import PodcastChannel as SonicChannel
     from libopensonic.media import PodcastEpisode as SonicEpisode
 
 
@@ -95,6 +96,7 @@ class OpenSonicProvider(MusicProvider):
     _show_new: bool = True
     _show_played: bool = True
     _reco_limit: int = 10
+    _cache_base_key: str = ""
 
     async def handle_async_init(self) -> None:
         """Set up the music provider and test the connection."""
@@ -137,6 +139,7 @@ class OpenSonicProvider(MusicProvider):
         self._show_new = bool(self.config.get_value(CONF_NEW_ALBUMS))
         self._show_played = bool(self.config.get_value(CONF_PLAYED_ALBUMS))
         self._reco_limit = int(str(self.config.get_value(CONF_RECO_SIZE)))
+        self._cache_base_key = f"{self.instance_id}/"
 
     @property
     def supported_features(self) -> set[ProviderFeature]:
@@ -776,6 +779,83 @@ class OpenSonicProvider(MusicProvider):
 
         self.logger.debug("Done streaming %s", streamdetails.item_id)
 
+    async def _get_podcast_channel_async(self, chan_id: str, base_key: str) -> None:
+        chan = await self._run_async(self.conn.get_podcasts, inc_episodes=True, pid=chan_id)
+        if not chan:
+            return
+        await self.mass.cache.set(
+            key=chan_id,
+            data=chan[0],
+            base_key=base_key,
+            expiration=600,
+            category=CACHE_CATEGORY_OPEN_SUBSONIC,
+        )
+
+    async def _podcast_recommendations(self) -> RecommendationFolder:
+        podcasts: RecommendationFolder = RecommendationFolder(
+            item_id="subsonic_newest_podcasts",
+            provider=self.domain,
+            name="Newest Podcast Episodes",
+        )
+        sonic_episodes = await self._run_async(
+            self.conn.get_newest_podcasts, count=self._reco_limit
+        )
+        chan_ids = set()
+        chan_base_key = f"{self._cache_base_key}/podcast_channels/"
+        async with TaskGroup() as tg:
+            for ep in sonic_episodes:
+                if ep.channel_id in chan_ids:
+                    continue
+                tg.create_task(self._get_podcast_channel_async(ep.channel_id, chan_base_key))
+                chan_ids.add(ep.channel_id)
+
+        for ep in sonic_episodes:
+            chan = await self.mass.cache.get(
+                key=ep.channel_id, base_key=chan_base_key, category=CACHE_CATEGORY_OPEN_SUBSONIC
+            )
+            if not chan:
+                continue
+            podcasts.items.append(parse_epsiode(self.instance_id, ep, chan))
+        return podcasts
+
+    async def _favorites_recommendation(self) -> RecommendationFolder:
+        faves: RecommendationFolder = RecommendationFolder(
+            item_id="subsonic_starred_albums", provider=self.domain, name="Starred Items"
+        )
+        starred = await self._run_async(self.conn.get_starred2)
+        if starred.album:
+            for sonic_album in starred.album[: self._reco_limit]:
+                faves.items.append(parse_album(self.logger, self.instance_id, sonic_album))
+        if starred.artist:
+            for sonic_artist in starred.artist[: self._reco_limit]:
+                faves.items.append(parse_artist(self.instance_id, sonic_artist))
+        if starred.song:
+            for sonic_song in starred.song[: self._reco_limit]:
+                faves.items.append(parse_track(self.logger, self.instance_id, sonic_song))
+        return faves
+
+    async def _new_recommendations(self) -> RecommendationFolder:
+        new_stuff: RecommendationFolder = RecommendationFolder(
+            item_id="subsonic_new_albums", provider=self.domain, name="New Albums"
+        )
+        new_albums = await self._run_async(
+            self.conn.get_album_list2, ltype="newest", size=self._reco_limit
+        )
+        for sonic_album in new_albums:
+            new_stuff.items.append(parse_album(self.logger, self.instance_id, sonic_album))
+        return new_stuff
+
+    async def _played_recommendations(self) -> RecommendationFolder:
+        recent: RecommendationFolder = RecommendationFolder(
+            item_id="subsonic_most_played", provider=self.domain, name="Most Played Albums"
+        )
+        albums = await self._run_async(
+            self.conn.get_album_list2, ltype="frequent", size=self._reco_limit
+        )
+        for sonic_album in albums:
+            recent.items.append(parse_album(self.logger, self.instance_id, sonic_album))
+        return recent
+
     async def recommendations(self) -> list[RecommendationFolder]:
         """Provide recommendations.
 
@@ -784,67 +864,27 @@ class OpenSonicProvider(MusicProvider):
         """
         recos: list[RecommendationFolder] = []
 
-        if self._enable_podcasts:
-            podcasts: RecommendationFolder = RecommendationFolder(
-                item_id="subsonic_newest_podcasts",
-                provider=self.domain,
-                name="Newest Podcast Episodes",
-            )
-            sonic_episodes = await self._run_async(
-                self.conn.get_newest_podcasts, count=self._reco_limit
-            )
-            sonic_channel: SonicChannel | None = None
-            for ep in sonic_episodes:
-                if sonic_channel is None or sonic_channel.id != ep.channel_id:
-                    channels = await self._run_async(
-                        self.conn.get_podcasts, inc_episodes=True, pid=ep.channel_id
-                    )
-                    if not channels:
-                        self.logger.warning("Can't find podcast channel for id %s", ep.channel_id)
-                        continue
-                    sonic_channel = channels[0]
-                podcasts.items.append(parse_epsiode(self.instance_id, ep, sonic_channel))
-            recos.append(podcasts)
-
-        if self._show_faves:
-            faves: RecommendationFolder = RecommendationFolder(
-                item_id="subsonic_starred_albums", provider=self.domain, name="Starred Items"
-            )
-            starred = await self._run_async(self.conn.get_starred2)
-            if starred.album:
-                for sonic_album in starred.album[: self._reco_limit]:
-                    faves.items.append(parse_album(self.logger, self.instance_id, sonic_album))
-            if starred.artist:
-                for sonic_artist in starred.artist[: self._reco_limit]:
-                    faves.items.append(parse_artist(self.instance_id, sonic_artist))
-            if starred.song:
-                for sonic_song in starred.song[: self._reco_limit]:
-                    faves.items.append(parse_track(self.logger, self.instance_id, sonic_song))
-
-            recos.append(faves)
-
-        if self._show_new:
-            new_stuff: RecommendationFolder = RecommendationFolder(
-                item_id="subsonic_new_albums", provider=self.domain, name="New Albums"
-            )
-            new_albums = await self._run_async(
-                self.conn.get_album_list2, ltype="newest", size=self._reco_limit
-            )
-            for sonic_album in new_albums:
-                new_stuff.items.append(parse_album(self.logger, self.instance_id, sonic_album))
-
-            recos.append(new_stuff)
-
-        if self._show_played:
-            recent: RecommendationFolder = RecommendationFolder(
-                item_id="subsonic_most_played", provider=self.domain, name="Most Played Albums"
-            )
-            albums = await self._run_async(
-                self.conn.get_album_list2, ltype="frequent", size=self._reco_limit
-            )
-            for sonic_album in albums:
-                recent.items.append(parse_album(self.logger, self.instance_id, sonic_album))
-
-            recos.append(recent)
+        podcasts = None
+        faves = None
+        new_stuff = None
+        played = None
+        async with TaskGroup() as grp:
+            if self._enable_podcasts:
+                podcasts = grp.create_task(self._podcast_recommendations())
+            if self._show_faves:
+                faves = grp.create_task(self._favorites_recommendation())
+            if self._show_new:
+                new_stuff = grp.create_task(self._new_recommendations())
+            if self._show_played:
+                played = grp.create_task(self._played_recommendations())
+
+        if podcasts:
+            recos.append(podcasts.result())
+        if faves:
+            recos.append(faves.result())
+        if new_stuff:
+            recos.append(new_stuff.result())
+        if played:
+            recos.append(played.result())
 
         return recos