Spotify: Refactor search method to reduce complexity (#2601)
authorOzGav <gavnosp@hotmail.com>
Wed, 5 Nov 2025 15:26:20 +0000 (01:26 +1000)
committerGitHub <noreply@github.com>
Wed, 5 Nov 2025 15:26:20 +0000 (16:26 +0100)
Refactor search method to reduce complexity by extracting helper methods

music_assistant/providers/spotify/provider.py

index 48a7f1d372c0b701c173e9759f6621084befb9c4..75883b24b6d947c91943d556d9abdb205ce1b877 100644 (file)
@@ -202,7 +202,6 @@ class SpotifyProvider(MusicProvider):
             if item and item["id"]:
                 yield parse_playlist(item, self)
 
-    # ruff: noqa: PLR0915
     @use_cache()
     async def search(
         self, search_query: str, media_types: list[MediaType] | None = None, limit: int = 5
@@ -214,9 +213,32 @@ class SpotifyProvider(MusicProvider):
         :param limit: Number of items to return in the search (per type).
         """
         searchresult = SearchResults()
-        searchtypes = []
         if media_types is None:
             return searchresult
+
+        searchtype = self._build_search_types(media_types)
+        if not searchtype:
+            return searchresult
+
+        search_query = search_query.replace("'", "")
+        offset = 0
+        page_limit = min(limit, 50)
+
+        while True:
+            api_result = await self._get_data(
+                "search", q=search_query, type=searchtype, limit=page_limit, offset=offset
+            )
+            items_received = self._process_search_results(api_result, searchresult)
+
+            offset += page_limit
+            if offset >= limit or items_received < page_limit:
+                break
+
+        return searchresult
+
+    def _build_search_types(self, media_types: list[MediaType]) -> str:
+        """Build comma-separated search types string from media types."""
+        searchtypes = []
         if MediaType.ARTIST in media_types:
             searchtypes.append("artist")
         if MediaType.ALBUM in media_types:
@@ -229,75 +251,76 @@ class SpotifyProvider(MusicProvider):
             searchtypes.append("show")
         if MediaType.AUDIOBOOK in media_types and self.audiobooks_supported:
             searchtypes.append("audiobook")
-        if not searchtypes:
-            return searchresult
-        searchtype = ",".join(searchtypes)
-        search_query = search_query.replace("'", "")
-        offset = 0
-        page_limit = min(limit, 50)
-        while True:
-            items_received = 0
-            api_result = await self._get_data(
-                "search", q=search_query, type=searchtype, limit=page_limit, offset=offset
-            )
-            if "artists" in api_result:
-                artists = [
-                    parse_artist(item, self)
-                    for item in api_result["artists"]["items"]
-                    if (item and item["id"] and item["name"])
-                ]
-                searchresult.artists = [*searchresult.artists, *artists]
-                items_received += len(api_result["artists"]["items"])
-            if "albums" in api_result:
-                albums = [
-                    parse_album(item, self)
-                    for item in api_result["albums"]["items"]
-                    if (item and item["id"])
-                ]
-                searchresult.albums = [*searchresult.albums, *albums]
-                items_received += len(api_result["albums"]["items"])
-            if "tracks" in api_result:
-                tracks = [
-                    parse_track(item, self)
-                    for item in api_result["tracks"]["items"]
-                    if (item and item["id"])
-                ]
-                searchresult.tracks = [*searchresult.tracks, *tracks]
-                items_received += len(api_result["tracks"]["items"])
-            if "playlists" in api_result:
-                playlists = [
-                    parse_playlist(item, self)
-                    for item in api_result["playlists"]["items"]
-                    if (item and item["id"])
-                ]
-                searchresult.playlists = [*searchresult.playlists, *playlists]
-                items_received += len(api_result["playlists"]["items"])
-            if "shows" in api_result:
-                podcasts = []
-                for item in api_result["shows"]["items"]:
-                    if not (item and item["id"]):
-                        continue
-                    # Filter out audiobooks - they have a distinctive description format
-                    description = item.get("description", "")
-                    if description.startswith("Author(s):") and "Narrator(s):" in description:
-                        continue
-                    podcasts.append(parse_podcast(item, self))
-                searchresult.podcasts = [*searchresult.podcasts, *podcasts]
-                items_received += len(api_result["shows"]["items"])
-            if "audiobooks" in api_result and self.audiobooks_supported:
-                audiobooks = [
-                    parse_audiobook(item, self)
-                    for item in api_result["audiobooks"]["items"]
-                    if (item and item["id"])
-                ]
-                searchresult.audiobooks = [*searchresult.audiobooks, *audiobooks]
-                items_received += len(api_result["audiobooks"]["items"])
-            offset += page_limit
-            if offset >= limit:
-                break
-            if items_received < page_limit:
-                break
-        return searchresult
+        return ",".join(searchtypes)
+
+    def _process_search_results(
+        self, api_result: dict[str, Any], searchresult: SearchResults
+    ) -> int:
+        """Process API search results and update searchresult object.
+
+        Returns the total number of items received.
+        """
+        items_received = 0
+
+        if "artists" in api_result:
+            artists = [
+                parse_artist(item, self)
+                for item in api_result["artists"]["items"]
+                if (item and item["id"] and item["name"])
+            ]
+            searchresult.artists = [*searchresult.artists, *artists]
+            items_received += len(api_result["artists"]["items"])
+
+        if "albums" in api_result:
+            albums = [
+                parse_album(item, self)
+                for item in api_result["albums"]["items"]
+                if (item and item["id"])
+            ]
+            searchresult.albums = [*searchresult.albums, *albums]
+            items_received += len(api_result["albums"]["items"])
+
+        if "tracks" in api_result:
+            tracks = [
+                parse_track(item, self)
+                for item in api_result["tracks"]["items"]
+                if (item and item["id"])
+            ]
+            searchresult.tracks = [*searchresult.tracks, *tracks]
+            items_received += len(api_result["tracks"]["items"])
+
+        if "playlists" in api_result:
+            playlists = [
+                parse_playlist(item, self)
+                for item in api_result["playlists"]["items"]
+                if (item and item["id"])
+            ]
+            searchresult.playlists = [*searchresult.playlists, *playlists]
+            items_received += len(api_result["playlists"]["items"])
+
+        if "shows" in api_result:
+            podcasts = []
+            for item in api_result["shows"]["items"]:
+                if not (item and item["id"]):
+                    continue
+                # Filter out audiobooks - they have a distinctive description format
+                description = item.get("description", "")
+                if description.startswith("Author(s):") and "Narrator(s):" in description:
+                    continue
+                podcasts.append(parse_podcast(item, self))
+            searchresult.podcasts = [*searchresult.podcasts, *podcasts]
+            items_received += len(api_result["shows"]["items"])
+
+        if "audiobooks" in api_result and self.audiobooks_supported:
+            audiobooks = [
+                parse_audiobook(item, self)
+                for item in api_result["audiobooks"]["items"]
+                if (item and item["id"])
+            ]
+            searchresult.audiobooks = [*searchresult.audiobooks, *audiobooks]
+            items_received += len(api_result["audiobooks"]["items"])
+
+        return items_received
 
     @use_cache()
     async def get_artist(self, prov_artist_id: str) -> Artist: