From 152c9f581b06fc17abb8c739f38fd4455a93b5f0 Mon Sep 17 00:00:00 2001 From: OzGav Date: Thu, 6 Nov 2025 01:26:20 +1000 Subject: [PATCH] Spotify: Refactor search method to reduce complexity (#2601) Refactor search method to reduce complexity by extracting helper methods --- music_assistant/providers/spotify/provider.py | 165 ++++++++++-------- 1 file changed, 94 insertions(+), 71 deletions(-) diff --git a/music_assistant/providers/spotify/provider.py b/music_assistant/providers/spotify/provider.py index 48a7f1d3..75883b24 100644 --- a/music_assistant/providers/spotify/provider.py +++ b/music_assistant/providers/spotify/provider.py @@ -202,7 +202,6 @@ class SpotifyProvider(MusicProvider): if item and item["id"]: yield parse_playlist(item, self) - # ruff: noqa: PLR0915 @use_cache() async def search( self, search_query: str, media_types: list[MediaType] | None = None, limit: int = 5 @@ -214,9 +213,32 @@ class SpotifyProvider(MusicProvider): :param limit: Number of items to return in the search (per type). """ searchresult = SearchResults() - searchtypes = [] if media_types is None: return searchresult + + searchtype = self._build_search_types(media_types) + if not searchtype: + return searchresult + + search_query = search_query.replace("'", "") + offset = 0 + page_limit = min(limit, 50) + + while True: + api_result = await self._get_data( + "search", q=search_query, type=searchtype, limit=page_limit, offset=offset + ) + items_received = self._process_search_results(api_result, searchresult) + + offset += page_limit + if offset >= limit or items_received < page_limit: + break + + return searchresult + + def _build_search_types(self, media_types: list[MediaType]) -> str: + """Build comma-separated search types string from media types.""" + searchtypes = [] if MediaType.ARTIST in media_types: searchtypes.append("artist") if MediaType.ALBUM in media_types: @@ -229,75 +251,76 @@ class SpotifyProvider(MusicProvider): searchtypes.append("show") if MediaType.AUDIOBOOK in media_types and self.audiobooks_supported: searchtypes.append("audiobook") - if not searchtypes: - return searchresult - searchtype = ",".join(searchtypes) - search_query = search_query.replace("'", "") - offset = 0 - page_limit = min(limit, 50) - while True: - items_received = 0 - api_result = await self._get_data( - "search", q=search_query, type=searchtype, limit=page_limit, offset=offset - ) - if "artists" in api_result: - artists = [ - parse_artist(item, self) - for item in api_result["artists"]["items"] - if (item and item["id"] and item["name"]) - ] - searchresult.artists = [*searchresult.artists, *artists] - items_received += len(api_result["artists"]["items"]) - if "albums" in api_result: - albums = [ - parse_album(item, self) - for item in api_result["albums"]["items"] - if (item and item["id"]) - ] - searchresult.albums = [*searchresult.albums, *albums] - items_received += len(api_result["albums"]["items"]) - if "tracks" in api_result: - tracks = [ - parse_track(item, self) - for item in api_result["tracks"]["items"] - if (item and item["id"]) - ] - searchresult.tracks = [*searchresult.tracks, *tracks] - items_received += len(api_result["tracks"]["items"]) - if "playlists" in api_result: - playlists = [ - parse_playlist(item, self) - for item in api_result["playlists"]["items"] - if (item and item["id"]) - ] - searchresult.playlists = [*searchresult.playlists, *playlists] - items_received += len(api_result["playlists"]["items"]) - if "shows" in api_result: - podcasts = [] - for item in api_result["shows"]["items"]: - if not (item and item["id"]): - continue - # Filter out audiobooks - they have a distinctive description format - description = item.get("description", "") - if description.startswith("Author(s):") and "Narrator(s):" in description: - continue - podcasts.append(parse_podcast(item, self)) - searchresult.podcasts = [*searchresult.podcasts, *podcasts] - items_received += len(api_result["shows"]["items"]) - if "audiobooks" in api_result and self.audiobooks_supported: - audiobooks = [ - parse_audiobook(item, self) - for item in api_result["audiobooks"]["items"] - if (item and item["id"]) - ] - searchresult.audiobooks = [*searchresult.audiobooks, *audiobooks] - items_received += len(api_result["audiobooks"]["items"]) - offset += page_limit - if offset >= limit: - break - if items_received < page_limit: - break - return searchresult + return ",".join(searchtypes) + + def _process_search_results( + self, api_result: dict[str, Any], searchresult: SearchResults + ) -> int: + """Process API search results and update searchresult object. + + Returns the total number of items received. + """ + items_received = 0 + + if "artists" in api_result: + artists = [ + parse_artist(item, self) + for item in api_result["artists"]["items"] + if (item and item["id"] and item["name"]) + ] + searchresult.artists = [*searchresult.artists, *artists] + items_received += len(api_result["artists"]["items"]) + + if "albums" in api_result: + albums = [ + parse_album(item, self) + for item in api_result["albums"]["items"] + if (item and item["id"]) + ] + searchresult.albums = [*searchresult.albums, *albums] + items_received += len(api_result["albums"]["items"]) + + if "tracks" in api_result: + tracks = [ + parse_track(item, self) + for item in api_result["tracks"]["items"] + if (item and item["id"]) + ] + searchresult.tracks = [*searchresult.tracks, *tracks] + items_received += len(api_result["tracks"]["items"]) + + if "playlists" in api_result: + playlists = [ + parse_playlist(item, self) + for item in api_result["playlists"]["items"] + if (item and item["id"]) + ] + searchresult.playlists = [*searchresult.playlists, *playlists] + items_received += len(api_result["playlists"]["items"]) + + if "shows" in api_result: + podcasts = [] + for item in api_result["shows"]["items"]: + if not (item and item["id"]): + continue + # Filter out audiobooks - they have a distinctive description format + description = item.get("description", "") + if description.startswith("Author(s):") and "Narrator(s):" in description: + continue + podcasts.append(parse_podcast(item, self)) + searchresult.podcasts = [*searchresult.podcasts, *podcasts] + items_received += len(api_result["shows"]["items"]) + + if "audiobooks" in api_result and self.audiobooks_supported: + audiobooks = [ + parse_audiobook(item, self) + for item in api_result["audiobooks"]["items"] + if (item and item["id"]) + ] + searchresult.audiobooks = [*searchresult.audiobooks, *audiobooks] + items_received += len(api_result["audiobooks"]["items"]) + + return items_received @use_cache() async def get_artist(self, prov_artist_id: str) -> Artist: -- 2.34.1