from __future__ import annotations
import contextlib
+from datetime import datetime
from typing import TYPE_CHECKING, Any
from music_assistant_models.enums import AlbumType, ContentType, ExternalID, ImageType
AudioFormat,
MediaItemImage,
Playlist,
+ Podcast,
+ PodcastEpisode,
ProviderMapping,
Track,
)
from .provider import SpotifyProvider
+def parse_images(
+ images_list: list[dict[str, Any]], lookup_key: str, exclude_generic: bool = False
+) -> UniqueList[MediaItemImage]:
+ """Parse images list into MediaItemImage objects."""
+ if not images_list:
+ return UniqueList([])
+
+ for img in images_list:
+ img_url = img["url"]
+ # Skip generic placeholder images for artists if requested
+ if exclude_generic and "2a96cbd8b46e442fc41c2b86b821562f" in img_url:
+ continue
+ return UniqueList(
+ [
+ MediaItemImage(
+ type=ImageType.THUMB,
+ path=img_url,
+ provider=lookup_key,
+ remotely_accessible=True,
+ )
+ ]
+ )
+ return UniqueList([])
+
+
def parse_artist(artist_obj: dict[str, Any], provider: SpotifyProvider) -> Artist:
"""Parse spotify artist object to generic layout."""
artist = Artist(
)
if "genres" in artist_obj:
artist.metadata.genres = set(artist_obj["genres"])
- if artist_obj.get("images"):
- for img in artist_obj["images"]:
- img_url = img["url"]
- if "2a96cbd8b46e442fc41c2b86b821562f" not in img_url:
- artist.metadata.images = UniqueList(
- [
- MediaItemImage(
- type=ImageType.THUMB,
- path=img_url,
- provider=provider.lookup_key,
- remotely_accessible=True,
- )
- ]
- )
- break
+
+ # Use unified image parsing with generic exclusion
+ artist.metadata.images = parse_images(
+ artist_obj.get("images", []), provider.lookup_key, exclude_generic=True
+ )
return artist
if "genres" in album_obj:
album.metadata.genres = set(album_obj["genres"])
- if album_obj.get("images"):
- album.metadata.images = UniqueList(
- [
- MediaItemImage(
- type=ImageType.THUMB,
- path=album_obj["images"][0]["url"],
- provider=provider.lookup_key,
- remotely_accessible=True,
- )
- ]
- )
+
+ album.metadata.images = parse_images(album_obj.get("images", []), provider.lookup_key)
+
if "label" in album_obj:
album.metadata.label = album_obj["label"]
if album_obj.get("release_date"):
item_id=track_obj["id"],
provider_domain=provider.domain,
provider_instance=provider.instance_id,
- audio_format=AudioFormat(
- content_type=ContentType.OGG,
- bit_rate=320,
- ),
+ audio_format=AudioFormat(content_type=ContentType.OGG, bit_rate=320),
url=track_obj["external_urls"]["spotify"],
available=not track_obj["is_local"] and track_obj["is_playable"],
)
track.metadata.preview = track_obj["preview_url"]
if "album" in track_obj:
track.album = parse_album(track_obj["album"], provider)
- if track_obj["album"].get("images"):
- track.metadata.images = UniqueList(
- [
- MediaItemImage(
- type=ImageType.THUMB,
- path=track_obj["album"]["images"][0]["url"],
- provider=provider.lookup_key,
- remotely_accessible=True,
- )
- ]
- )
+ track.metadata.images = parse_images(
+ track_obj["album"].get("images", []), provider.lookup_key
+ )
if track_obj.get("copyright"):
track.metadata.copyright = track_obj["copyright"]
if track_obj.get("explicit"):
def parse_playlist(playlist_obj: dict[str, Any], provider: SpotifyProvider) -> Playlist:
"""Parse spotify playlist object to generic layout."""
is_editable = (
- playlist_obj["owner"]["id"] == provider._sp_user["id"] or playlist_obj["collaborative"]
- )
+ provider._sp_user is not None and playlist_obj["owner"]["id"] == provider._sp_user["id"]
+ ) or playlist_obj["collaborative"]
+
+ # Get owner name with fallback
+ owner_name = playlist_obj["owner"].get("display_name")
+ if owner_name is None and provider._sp_user is not None:
+ owner_name = provider._sp_user["display_name"]
+
playlist = Playlist(
item_id=playlist_obj["id"],
provider=provider.instance_id if is_editable else provider.lookup_key,
name=playlist_obj["name"],
- owner=playlist_obj["owner"]["display_name"],
+ owner=owner_name,
provider_mappings={
ProviderMapping(
item_id=playlist_obj["id"],
},
is_editable=is_editable,
)
- if playlist_obj.get("images"):
- playlist.metadata.images = UniqueList(
- [
- MediaItemImage(
- type=ImageType.THUMB,
- path=playlist_obj["images"][0]["url"],
- provider=provider.lookup_key,
- remotely_accessible=True,
- )
- ]
- )
- if playlist.owner is None:
- playlist.owner = provider._sp_user["display_name"]
+
+ playlist.metadata.images = parse_images(playlist_obj.get("images", []), provider.lookup_key)
playlist.cache_checksum = str(playlist_obj["snapshot_id"])
return playlist
+
+
+def parse_podcast(podcast_obj: dict[str, Any], provider: SpotifyProvider) -> Podcast:
+ """Parse spotify podcast (show) object to generic layout."""
+ podcast = Podcast(
+ item_id=podcast_obj["id"],
+ provider=provider.lookup_key,
+ name=podcast_obj["name"],
+ provider_mappings={
+ ProviderMapping(
+ item_id=podcast_obj["id"],
+ provider_domain=provider.domain,
+ provider_instance=provider.instance_id,
+ url=podcast_obj["external_urls"]["spotify"],
+ )
+ },
+ publisher=podcast_obj.get("publisher"),
+ total_episodes=podcast_obj.get("total_episodes"),
+ )
+
+ # Set metadata
+ if podcast_obj.get("description"):
+ podcast.metadata.description = podcast_obj["description"]
+
+ podcast.metadata.images = parse_images(podcast_obj.get("images", []), provider.lookup_key)
+
+ if "explicit" in podcast_obj:
+ podcast.metadata.explicit = podcast_obj["explicit"]
+
+ # Convert languages list to genres for categorization
+ if "languages" in podcast_obj:
+ podcast.metadata.genres = set(podcast_obj["languages"])
+
+ return podcast
+
+
+def parse_podcast_episode(
+ episode_obj: dict[str, Any], provider: SpotifyProvider, podcast: Podcast | None = None
+) -> PodcastEpisode:
+ """Parse spotify podcast episode object to generic layout."""
+ # Get or create a basic podcast reference if not provided
+ if podcast is None and "show" in episode_obj:
+ podcast = Podcast(
+ item_id=episode_obj["show"]["id"],
+ provider=provider.lookup_key,
+ name=episode_obj["show"]["name"],
+ provider_mappings={
+ ProviderMapping(
+ item_id=episode_obj["show"]["id"],
+ provider_domain=provider.domain,
+ provider_instance=provider.instance_id,
+ url=episode_obj["show"]["external_urls"]["spotify"],
+ )
+ },
+ )
+ elif podcast is None:
+ # Create a minimal podcast reference if none available
+ podcast = Podcast(
+ item_id="unknown",
+ provider=provider.lookup_key,
+ name="Unknown Podcast",
+ provider_mappings=set(),
+ )
+
+ episode = PodcastEpisode(
+ item_id=episode_obj["id"],
+ provider=provider.lookup_key,
+ name=episode_obj["name"],
+ duration=episode_obj["duration_ms"] // 1000 if episode_obj.get("duration_ms") else 0,
+ podcast=podcast,
+ position=0,
+ provider_mappings={
+ ProviderMapping(
+ item_id=episode_obj["id"],
+ provider_domain=provider.domain,
+ provider_instance=provider.instance_id,
+ audio_format=AudioFormat(content_type=ContentType.OGG, bit_rate=160),
+ url=episode_obj["external_urls"]["spotify"],
+ )
+ },
+ )
+
+ # Set description in metadata
+ if episode_obj.get("description"):
+ episode.metadata.description = episode_obj["description"]
+
+ # Add release date to metadata
+ if episode_obj.get("release_date"):
+ with contextlib.suppress(ValueError, TypeError):
+ date_str = episode_obj["release_date"].strip()
+
+ if len(date_str) == 4:
+ # Year only: "2023" -> "2023-01-01T00:00:00+00:00"
+ date_str = f"{date_str}-01-01T00:00:00+00:00"
+ elif len(date_str) == 10:
+ # Date only: "2023-12-25" -> "2023-12-25T00:00:00+00:00"
+ date_str = f"{date_str}T00:00:00+00:00"
+
+ episode.metadata.release_date = datetime.fromisoformat(date_str)
+
+ episode.metadata.images = parse_images(episode_obj.get("images", []), provider.lookup_key)
+
+ # Use podcast artwork if episode has none
+ if not episode.metadata.images and isinstance(podcast, Podcast) and podcast.metadata.images:
+ episode.metadata.images = podcast.metadata.images
+
+ if "explicit" in episode_obj:
+ episode.metadata.explicit = episode_obj["explicit"]
+
+ if "audio_preview_url" in episode_obj:
+ episode.metadata.preview = episode_obj["audio_preview_url"]
+
+ return episode
from collections.abc import AsyncGenerator
from typing import TYPE_CHECKING, Any
+import aiohttp
from music_assistant_models.enums import (
ContentType,
ImageType,
MediaItemImage,
MediaItemType,
Playlist,
+ Podcast,
+ PodcastEpisode,
ProviderMapping,
SearchResults,
Track,
+ UniqueList,
)
from music_assistant_models.streamdetails import StreamDetails
from .constants import (
CONF_CLIENT_ID,
+ CONF_PLAYED_THRESHOLD,
CONF_REFRESH_TOKEN,
+ CONF_SYNC_PLAYED_STATUS,
LIKED_SONGS_FAKE_PLAYLIST_ID_PREFIX,
+ SUPPORTED_FEATURES,
)
from .helpers import get_librespot_binary
-from .parsers import parse_album, parse_artist, parse_playlist, parse_track
+from .parsers import (
+ parse_album,
+ parse_artist,
+ parse_playlist,
+ parse_podcast,
+ parse_podcast_episode,
+ parse_track,
+)
from .streaming import LibrespotStreamer
if TYPE_CHECKING:
- from collections.abc import AsyncGenerator
+ from music_assistant_models.config_entries import ProviderConfig
+ from music_assistant_models.provider import ProviderManifest
+
+ from music_assistant import MusicAssistant
class SpotifyProvider(MusicProvider):
"""Implementation of a Spotify MusicProvider."""
- _auth_info: str | None = None
+ _auth_info: dict[str, Any] | None = None
_sp_user: dict[str, Any] | None = None
_librespot_bin: str | None = None
custom_client_id_active: bool = False
throttler: ThrottlerManager
+ def __init__(
+ self, mass: MusicAssistant, manifest: ProviderManifest, config: ProviderConfig
+ ) -> None:
+ """Initialize the provider."""
+ super().__init__(mass, manifest, config)
+
async def handle_async_init(self) -> None:
"""Handle async initialization of the provider."""
self.cache_dir = os.path.join(self.mass.cache_path, self.instance_id)
# try login which will raise if it fails
await self.login()
+ @property
+ def sync_played_status_enabled(self) -> bool:
+ """Check if played status sync is enabled."""
+ value = self.config.get_value(CONF_SYNC_PLAYED_STATUS, True)
+ return bool(value) if value is not None else True
+
+ @property
+ def played_threshold(self) -> float:
+ """Get the played threshold percentage."""
+ value = self.config.get_value(CONF_PLAYED_THRESHOLD, 90)
+ if isinstance(value, (int, float)):
+ # Convert from 1-100 percentage to 0.0-1.0 decimal
+ return float(value) / 100.0
+ elif isinstance(value, str):
+ try:
+ return float(value) / 100.0
+ except ValueError:
+ return 0.9 # fallback to default (90%)
+ else:
+ return 0.9 # fallback to default for any other type
+
@property
def supported_features(self) -> set[ProviderFeature]:
"""Return the features supported by this Provider."""
- base = {
- ProviderFeature.LIBRARY_ARTISTS,
- ProviderFeature.LIBRARY_ALBUMS,
- ProviderFeature.LIBRARY_TRACKS,
- ProviderFeature.LIBRARY_PLAYLISTS,
- ProviderFeature.LIBRARY_ARTISTS_EDIT,
- ProviderFeature.LIBRARY_ALBUMS_EDIT,
- ProviderFeature.LIBRARY_PLAYLISTS_EDIT,
- ProviderFeature.LIBRARY_TRACKS_EDIT,
- ProviderFeature.PLAYLIST_TRACKS_EDIT,
- ProviderFeature.PLAYLIST_CREATE,
- ProviderFeature.BROWSE,
- ProviderFeature.SEARCH,
- ProviderFeature.ARTIST_ALBUMS,
- ProviderFeature.ARTIST_TOPTRACKS,
- }
+ base = SUPPORTED_FEATURES.copy()
if not self.custom_client_id_active:
# Spotify has killed the similar tracks api for developers
# https://developer.spotify.com/blog/2024-11-27-changes-to-the-web-api
"""
searchresult = SearchResults()
searchtypes = []
+ if media_types is None:
+ return searchresult
if MediaType.ARTIST in media_types:
searchtypes.append("artist")
if MediaType.ALBUM in media_types:
searchtypes.append("track")
if MediaType.PLAYLIST in media_types:
searchtypes.append("playlist")
+ if MediaType.PODCAST in media_types:
+ searchtypes.append("show")
if not searchtypes:
return searchresult
searchtype = ",".join(searchtypes)
"search", q=search_query, type=searchtype, limit=page_limit, offset=offset
)
if "artists" in api_result:
- searchresult.artists += [
+ artists = [
parse_artist(item, self)
for item in api_result["artists"]["items"]
if (item and item["id"] and item["name"])
]
+ searchresult.artists = [*searchresult.artists, *artists]
items_received += len(api_result["artists"]["items"])
if "albums" in api_result:
- searchresult.albums += [
+ albums = [
parse_album(item, self)
for item in api_result["albums"]["items"]
if (item and item["id"])
]
+ searchresult.albums = [*searchresult.albums, *albums]
items_received += len(api_result["albums"]["items"])
if "tracks" in api_result:
- searchresult.tracks += [
+ tracks = [
parse_track(item, self)
for item in api_result["tracks"]["items"]
if (item and item["id"])
]
+ searchresult.tracks = [*searchresult.tracks, *tracks]
items_received += len(api_result["tracks"]["items"])
if "playlists" in api_result:
- searchresult.playlists += [
+ playlists = [
parse_playlist(item, self)
for item in api_result["playlists"]["items"]
if (item and item["id"])
]
+ searchresult.playlists = [*searchresult.playlists, *playlists]
items_received += len(api_result["playlists"]["items"])
+ if "shows" in api_result:
+ podcasts = [
+ parse_podcast(item, self)
+ for item in api_result["shows"]["items"]
+ if (item and item["id"])
+ ]
+ searchresult.podcasts = [*searchresult.podcasts, *podcasts]
+ items_received += len(api_result["shows"]["items"])
offset += page_limit
if offset >= limit:
break
if item and item["track"]["id"]:
yield parse_track(item["track"], self)
+ async def get_library_podcasts(self) -> AsyncGenerator[Podcast, None]:
+ """Retrieve library podcasts from spotify."""
+ async for item in self._get_all_items("me/shows"):
+ if item["show"] and item["show"]["id"]:
+ yield parse_podcast(item["show"], self)
+
def _get_liked_songs_playlist_id(self) -> str:
return f"{LIKED_SONGS_FAKE_PLAYLIST_ID_PREFIX}-{self.instance_id}"
async def _get_liked_songs_playlist(self) -> Playlist:
+ if self._sp_user is None:
+ raise LoginFailed("User info not available - not logged in")
+
liked_songs = Playlist(
item_id=self._get_liked_songs_playlist_id(),
provider=self.lookup_key,
liked_songs.is_editable = False # TODO Editing requires special endpoints
- liked_songs.metadata.images = [
- MediaItemImage(
- type=ImageType.THUMB,
- path="https://misc.scdn.co/liked-songs/liked-songs-64.png",
- provider=self.lookup_key,
- remotely_accessible=True,
- )
- ]
+ # Add image to the playlist metadata
+ image = MediaItemImage(
+ type=ImageType.THUMB,
+ path="https://misc.scdn.co/liked-songs/liked-songs-64.png",
+ provider=self.lookup_key,
+ remotely_accessible=True,
+ )
+ if liked_songs.metadata.images is None:
+ liked_songs.metadata.images = UniqueList([image])
+ else:
+ liked_songs.metadata.images.append(image)
liked_songs.cache_checksum = str(time.time())
playlist_obj = await self._get_data(f"playlists/{prov_playlist_id}")
return parse_playlist(playlist_obj, self)
+ async def get_podcast(self, prov_podcast_id: str) -> Podcast:
+ """Get full podcast details by id."""
+ podcast_obj = await self._get_data(f"shows/{prov_podcast_id}")
+ if not podcast_obj:
+ raise MediaNotFoundError(f"Podcast not found: {prov_podcast_id}")
+ return parse_podcast(podcast_obj, self)
+
+ async def get_podcast_episodes(
+ self, prov_podcast_id: str
+ ) -> AsyncGenerator[PodcastEpisode, None]:
+ """Get all podcast episodes."""
+ podcast = await self.get_podcast(prov_podcast_id)
+ episode_position = 1
+
+ async for item in self._get_all_items(
+ f"shows/{prov_podcast_id}/episodes", market="from_token"
+ ):
+ if not (item and item["id"]):
+ continue
+
+ episode = parse_podcast_episode(item, self, podcast=podcast)
+ episode.position = episode_position
+ episode_position += 1
+ yield episode
+
+ async def get_podcast_episode(self, prov_episode_id: str) -> PodcastEpisode:
+ """Get full podcast episode details by id."""
+ episode_obj = await self._get_data(f"episodes/{prov_episode_id}", market="from_token")
+ if not episode_obj:
+ raise MediaNotFoundError(f"Episode not found: {prov_episode_id}")
+ return parse_podcast_episode(episode_obj, self)
+
+ async def get_resume_position(self, item_id: str, media_type: MediaType) -> tuple[bool, int]:
+ """Get resume position for episode from Spotify."""
+ if media_type != MediaType.PODCAST_EPISODE:
+ raise NotImplementedError("Resume position only supported for podcast episodes")
+
+ if not self.sync_played_status_enabled:
+ raise NotImplementedError("Spotify resume sync disabled in settings")
+
+ episode_obj = await self._get_data(f"episodes/{item_id}", market="from_token")
+
+ if not episode_obj:
+ raise NotImplementedError("No episode data from Spotify")
+
+ if "resume_point" not in episode_obj or not episode_obj["resume_point"]:
+ raise NotImplementedError("No resume point data from Spotify")
+
+ try:
+ resume_point = episode_obj["resume_point"]
+ fully_played = resume_point.get("fully_played", False)
+ position_ms = resume_point.get("resume_position_ms", 0)
+
+ # Apply played threshold logic
+ if not fully_played and episode_obj.get("duration_ms", 0) > 0:
+ completion_ratio = position_ms / episode_obj["duration_ms"]
+ if completion_ratio >= self.played_threshold:
+ fully_played = True
+
+ return fully_played, position_ms
+ except (KeyError, TypeError, AttributeError) as e:
+ self.logger.debug(f"Invalid resume point data structure for {item_id}: {e}")
+ raise NotImplementedError("Invalid resume point data from Spotify")
+
+ async def on_played(
+ self,
+ media_type: MediaType,
+ prov_item_id: str,
+ fully_played: bool,
+ position: int,
+ media_item: MediaItemType,
+ is_playing: bool = False,
+ ) -> None:
+ """
+ Call when an episode is played in MA.
+
+ Note: This CANNOT sync back to Spotify as there's no API for it.
+ This is just for logging/monitoring purposes.
+ """
+ if media_type != MediaType.PODCAST_EPISODE:
+ return
+
+ if not isinstance(media_item, PodcastEpisode):
+ return
+
+ # Handle case where position might be None (e.g., when marked as played in UI)
+ safe_position = position or 0
+ if media_item.duration > 0:
+ completion_percentage = (safe_position / media_item.duration) * 100
+ else:
+ completion_percentage = 0
+
+ self.logger.debug(
+ f"Episode played in MA: {prov_item_id} "
+ f"({completion_percentage:.1f}%, fully_played: {fully_played}) "
+ f"- Cannot sync back to Spotify due to API limitations"
+ )
+
async def get_album_tracks(self, prov_album_id: str) -> list[Track]:
"""Get all album tracks for given album id."""
return [
await self._put_data("me/tracks", {"ids": [item.item_id]})
elif item.media_type == MediaType.PLAYLIST:
await self._put_data(f"playlists/{item.item_id}/followers", data={"public": False})
+ elif item.media_type == MediaType.PODCAST:
+ await self._put_data("me/shows", ids=item.item_id)
return True
async def library_remove(self, prov_item_id: str, media_type: MediaType) -> bool:
await self._delete_data("me/tracks", {"ids": [prov_item_id]})
elif media_type == MediaType.PLAYLIST:
await self._delete_data(f"playlists/{prov_item_id}/followers")
+ elif media_type == MediaType.PODCAST:
+ await self._delete_data("me/shows", ids=prov_item_id)
return True
async def add_playlist_tracks(self, prov_playlist_id: str, prov_track_ids: list[str]) -> None:
async def create_playlist(self, name: str) -> Playlist:
"""Create a new playlist on provider with given name."""
+ if self._sp_user is None:
+ raise LoginFailed("User info not available - not logged in")
data = {"name": name, "public": False}
new_playlist = await self._post_data(f"users/{self._sp_user['id']}/playlists", data=data)
self._fix_create_playlist_api_bug(new_playlist)
return [parse_track(item, self) for item in items["tracks"] if (item and item["id"])]
async def get_stream_details(self, item_id: str, media_type: MediaType) -> StreamDetails:
- """Return the content details for the given track when it will be streamed."""
+ """Return the content details for the given track/episode when it will be streamed."""
return StreamDetails(
item_id=item_id,
provider=self.lookup_key,
+ media_type=media_type,
audio_format=AudioFormat(
content_type=ContentType.OGG,
+ bit_rate=320,
),
stream_type=StreamType.CUSTOM,
allow_seek=True,
yield chunk
@lock
- async def login(self, force_refresh: bool = False) -> dict:
+ async def login(self, force_refresh: bool = False) -> dict[str, Any]:
"""Log-in Spotify and return Auth/token info."""
# return existing token if we have one in memory
if (
await asyncio.sleep(2)
continue
# if we reached this point, the token has been successfully refreshed
- auth_info = await response.json()
+ auth_info: dict[str, Any] = await response.json()
auth_info["expires_at"] = int(auth_info["expires_in"] + time.time())
self.logger.debug("Successfully refreshed access token")
break
else:
if self.available:
- self.mass.create_task(self.mass.unload_provider_with_error(self.instance_id))
+ self.mass.create_task(
+ self.mass.unload_provider_with_error(
+ self.instance_id, f"Failed to refresh access token: {err}"
+ )
+ )
raise LoginFailed(f"Failed to refresh access token: {err}")
# make sure that our updated creds get stored in memory + config
self._auth_info = auth_info
self.update_config_value(CONF_REFRESH_TOKEN, auth_info["refresh_token"], encrypted=True)
# check if librespot still has valid auth
+ if self._librespot_bin is None:
+ raise LoginFailed("Librespot binary not available")
args = [
self._librespot_bin,
"--cache",
headers["Accept-Language"] = f"{locale}, {language};q=0.9, *;q=0.5"
async with (
self.mass.http_session.get(
- url, headers=headers, params=kwargs, ssl=True, timeout=120
+ url,
+ headers=headers,
+ params=kwargs,
+ timeout=aiohttp.ClientTimeout(total=120),
) as response,
):
# handle spotify rate limiter
# so it will be retried (and the token refreshed)
if response.status == 401:
self._auth_info = None
- raise ResourceTemporarilyUnavailable("Token expired", backoff_time=0.05)
+ raise ResourceTemporarilyUnavailable("Token expired", backoff_time=1)
# handle 404 not found, convert to MediaNotFoundError
if response.status == 404:
raise MediaNotFoundError(f"{endpoint} not found")
response.raise_for_status()
- return await response.json(loads=json_loads)
+ result: dict[str, Any] = await response.json(loads=json_loads)
+ return result
@throttle_with_retries
async def _delete_data(self, endpoint: str, data: Any = None, **kwargs: Any) -> None:
auth_info = kwargs.pop("auth_info", await self.login())
headers = {"Authorization": f"Bearer {auth_info['access_token']}"}
async with self.mass.http_session.delete(
- url, headers=headers, params=kwargs, json=data, ssl=False
+ url, headers=headers, params=kwargs, json=data, ssl=True
) as response:
# handle spotify rate limiter
if response.status == 429:
# so it will be retried (and the token refreshed)
if response.status == 401:
self._auth_info = None
- raise ResourceTemporarilyUnavailable("Token expired", backoff_time=0.05)
+ raise ResourceTemporarilyUnavailable("Token expired", backoff_time=1)
# handle temporary server error
if response.status in (502, 503):
raise ResourceTemporarilyUnavailable(backoff_time=30)
auth_info = kwargs.pop("auth_info", await self.login())
headers = {"Authorization": f"Bearer {auth_info['access_token']}"}
async with self.mass.http_session.put(
- url, headers=headers, params=kwargs, json=data, ssl=False
+ url, headers=headers, params=kwargs, json=data, ssl=True
) as response:
# handle spotify rate limiter
if response.status == 429:
# so it will be retried (and the token refreshed)
if response.status == 401:
self._auth_info = None
- raise ResourceTemporarilyUnavailable("Token expired", backoff_time=0.05)
+ raise ResourceTemporarilyUnavailable("Token expired", backoff_time=1)
# handle temporary server error
if response.status in (502, 503):
auth_info = kwargs.pop("auth_info", await self.login())
headers = {"Authorization": f"Bearer {auth_info['access_token']}"}
async with self.mass.http_session.post(
- url, headers=headers, params=kwargs, json=data, ssl=False
+ url, headers=headers, params=kwargs, json=data, ssl=True
) as response:
# handle spotify rate limiter
if response.status == 429:
# so it will be retried (and the token refreshed)
if response.status == 401:
self._auth_info = None
- raise ResourceTemporarilyUnavailable("Token expired", backoff_time=0.05)
+ raise ResourceTemporarilyUnavailable("Token expired", backoff_time=1)
# handle temporary server error
if response.status in (502, 503):
raise ResourceTemporarilyUnavailable(backoff_time=30)
response.raise_for_status()
- return await response.json(loads=json_loads)
+ result: dict[str, Any] = await response.json(loads=json_loads)
+ return result
def _fix_create_playlist_api_bug(self, playlist_obj: dict[str, Any]) -> None:
"""Fix spotify API bug where incorrect owner id is returned from Create Playlist."""
+ if self._sp_user is None:
+ raise LoginFailed("User info not available - not logged in")
+
if playlist_obj["owner"]["id"] != self._sp_user["id"]:
playlist_obj["owner"]["id"] = self._sp_user["id"]
playlist_obj["owner"]["display_name"] = self._sp_user["display_name"]