)
from music_assistant_models.streamdetails import StreamDetails
+from music_assistant.controllers.cache import use_cache
from music_assistant.helpers.app_vars import app_var # type: ignore[attr-defined]
from music_assistant.helpers.json import json_loads
from music_assistant.helpers.process import check_output
return str(self._sp_user["display_name"])
return None
+ # ruff: noqa: PLR0915
async def search(
self, search_query: str, media_types: list[MediaType] | None = None, limit: int = 5
) -> SearchResults:
searchresult.playlists = [*searchresult.playlists, *playlists]
items_received += len(api_result["playlists"]["items"])
if "shows" in api_result:
- podcasts = [
- parse_podcast(item, self)
- for item in api_result["shows"]["items"]
- if (item and item["id"])
- ]
+ podcasts = []
+ for item in api_result["shows"]["items"]:
+ if not (item and item["id"]):
+ continue
+ # Filter out audiobooks - they have a distinctive description format
+ description = item.get("description", "")
+ if description.startswith("Author(s):") and "Narrator(s):" in description:
+ continue
+ podcasts.append(parse_podcast(item, self))
searchresult.podcasts = [*searchresult.podcasts, *podcasts]
items_received += len(api_result["shows"]["items"])
offset += page_limit
"""Retrieve library podcasts from spotify."""
async for item in self._get_all_items("me/shows"):
if item["show"] and item["show"]["id"]:
- yield parse_podcast(item["show"], self)
+ show_obj = item["show"]
+ # Filter out audiobooks - they have a distinctive description format
+ description = show_obj.get("description", "")
+ if description.startswith("Author(s):") and "Narrator(s):" in description:
+ continue
+ yield parse_podcast(show_obj, self)
def _get_liked_songs_playlist_id(self) -> str:
return f"{LIKED_SONGS_FAKE_PLAYLIST_ID_PREFIX}-{self.instance_id}"
playlist_obj = await self._get_data(f"playlists/{prov_playlist_id}")
return parse_playlist(playlist_obj, self)
+ @use_cache(86400) # 24 hours
async def get_podcast(self, prov_podcast_id: str) -> Podcast:
"""Get full podcast details by id."""
podcast_obj = await self._get_data(f"shows/{prov_podcast_id}")
episode_position += 1
yield episode
+ @use_cache(86400) # 24 hours
async def get_podcast_episode(self, prov_episode_id: str) -> PodcastEpisode:
"""Get full podcast episode details by id."""
episode_obj = await self._get_data(f"episodes/{prov_episode_id}", market="from_token")