if item and item["id"]:
yield parse_playlist(item, self)
- # ruff: noqa: PLR0915
@use_cache()
async def search(
self, search_query: str, media_types: list[MediaType] | None = None, limit: int = 5
:param limit: Number of items to return in the search (per type).
"""
searchresult = SearchResults()
- searchtypes = []
if media_types is None:
return searchresult
+
+ searchtype = self._build_search_types(media_types)
+ if not searchtype:
+ return searchresult
+
+ search_query = search_query.replace("'", "")
+ offset = 0
+ page_limit = min(limit, 50)
+
+ while True:
+ api_result = await self._get_data(
+ "search", q=search_query, type=searchtype, limit=page_limit, offset=offset
+ )
+ items_received = self._process_search_results(api_result, searchresult)
+
+ offset += page_limit
+ if offset >= limit or items_received < page_limit:
+ break
+
+ return searchresult
+
+ def _build_search_types(self, media_types: list[MediaType]) -> str:
+ """Build comma-separated search types string from media types."""
+ searchtypes = []
if MediaType.ARTIST in media_types:
searchtypes.append("artist")
if MediaType.ALBUM in media_types:
searchtypes.append("show")
if MediaType.AUDIOBOOK in media_types and self.audiobooks_supported:
searchtypes.append("audiobook")
- if not searchtypes:
- return searchresult
- searchtype = ",".join(searchtypes)
- search_query = search_query.replace("'", "")
- offset = 0
- page_limit = min(limit, 50)
- while True:
- items_received = 0
- api_result = await self._get_data(
- "search", q=search_query, type=searchtype, limit=page_limit, offset=offset
- )
- if "artists" in api_result:
- artists = [
- parse_artist(item, self)
- for item in api_result["artists"]["items"]
- if (item and item["id"] and item["name"])
- ]
- searchresult.artists = [*searchresult.artists, *artists]
- items_received += len(api_result["artists"]["items"])
- if "albums" in api_result:
- albums = [
- parse_album(item, self)
- for item in api_result["albums"]["items"]
- if (item and item["id"])
- ]
- searchresult.albums = [*searchresult.albums, *albums]
- items_received += len(api_result["albums"]["items"])
- if "tracks" in api_result:
- tracks = [
- parse_track(item, self)
- for item in api_result["tracks"]["items"]
- if (item and item["id"])
- ]
- searchresult.tracks = [*searchresult.tracks, *tracks]
- items_received += len(api_result["tracks"]["items"])
- if "playlists" in api_result:
- playlists = [
- parse_playlist(item, self)
- for item in api_result["playlists"]["items"]
- if (item and item["id"])
- ]
- searchresult.playlists = [*searchresult.playlists, *playlists]
- items_received += len(api_result["playlists"]["items"])
- if "shows" in api_result:
- podcasts = []
- for item in api_result["shows"]["items"]:
- if not (item and item["id"]):
- continue
- # Filter out audiobooks - they have a distinctive description format
- description = item.get("description", "")
- if description.startswith("Author(s):") and "Narrator(s):" in description:
- continue
- podcasts.append(parse_podcast(item, self))
- searchresult.podcasts = [*searchresult.podcasts, *podcasts]
- items_received += len(api_result["shows"]["items"])
- if "audiobooks" in api_result and self.audiobooks_supported:
- audiobooks = [
- parse_audiobook(item, self)
- for item in api_result["audiobooks"]["items"]
- if (item and item["id"])
- ]
- searchresult.audiobooks = [*searchresult.audiobooks, *audiobooks]
- items_received += len(api_result["audiobooks"]["items"])
- offset += page_limit
- if offset >= limit:
- break
- if items_received < page_limit:
- break
- return searchresult
+ return ",".join(searchtypes)
+
+ def _process_search_results(
+ self, api_result: dict[str, Any], searchresult: SearchResults
+ ) -> int:
+ """Process API search results and update searchresult object.
+
+ Returns the total number of items received.
+ """
+ items_received = 0
+
+ if "artists" in api_result:
+ artists = [
+ parse_artist(item, self)
+ for item in api_result["artists"]["items"]
+ if (item and item["id"] and item["name"])
+ ]
+ searchresult.artists = [*searchresult.artists, *artists]
+ items_received += len(api_result["artists"]["items"])
+
+ if "albums" in api_result:
+ albums = [
+ parse_album(item, self)
+ for item in api_result["albums"]["items"]
+ if (item and item["id"])
+ ]
+ searchresult.albums = [*searchresult.albums, *albums]
+ items_received += len(api_result["albums"]["items"])
+
+ if "tracks" in api_result:
+ tracks = [
+ parse_track(item, self)
+ for item in api_result["tracks"]["items"]
+ if (item and item["id"])
+ ]
+ searchresult.tracks = [*searchresult.tracks, *tracks]
+ items_received += len(api_result["tracks"]["items"])
+
+ if "playlists" in api_result:
+ playlists = [
+ parse_playlist(item, self)
+ for item in api_result["playlists"]["items"]
+ if (item and item["id"])
+ ]
+ searchresult.playlists = [*searchresult.playlists, *playlists]
+ items_received += len(api_result["playlists"]["items"])
+
+ if "shows" in api_result:
+ podcasts = []
+ for item in api_result["shows"]["items"]:
+ if not (item and item["id"]):
+ continue
+ # Filter out audiobooks - they have a distinctive description format
+ description = item.get("description", "")
+ if description.startswith("Author(s):") and "Narrator(s):" in description:
+ continue
+ podcasts.append(parse_podcast(item, self))
+ searchresult.podcasts = [*searchresult.podcasts, *podcasts]
+ items_received += len(api_result["shows"]["items"])
+
+ if "audiobooks" in api_result and self.audiobooks_supported:
+ audiobooks = [
+ parse_audiobook(item, self)
+ for item in api_result["audiobooks"]["items"]
+ if (item and item["id"])
+ ]
+ searchresult.audiobooks = [*searchresult.audiobooks, *audiobooks]
+ items_received += len(api_result["audiobooks"]["items"])
+
+ return items_received
@use_cache()
async def get_artist(self, prov_artist_id: str) -> Artist: